Friday 25 January 2013

Further Adventures With CAS Instructions And Micro Benchmarking

In a previous article I reported what appeared to be a performance issue with CAS/LOCK instructions on the Sandy Bridge microarchitecture compared to the previous Nehalem microarchitecture.  Since then I've worked with the good people of Intel to understand what was going on and I'm now pleased to be able to shine some light on the previous results.

I observed a small drop in throughput with the uncontended single-thread case, and an order-of-magnitude decrease in throughput once multiple threads contend when performing updates.  This testing spawned out of observations testing Java Queue implementations and the Disruptor for the multi-producer case.  I was initially puzzled by these findings because almost every other performance test I applied to Sandy Bridge indicated a major step forward for this microarchitecture.

After digging deeper into this issue it has come to light that my tests have once again fallen fowl of the difficulties in micro-benchmarking.  My test is not a good means of testing throughput and it is actually testing fairness in a roundabout manner.  Let's revisit the code and work through what is going on.

Test Code
#include <time.h>
#include <pthread.h>
#include <stdlib.h>
#include <iostream>

typedef unsigned long long uint64;

const uint64 COUNT = 500 * 1000 * 1000;

volatile uint64 counter = 0;

void* run_add(void* numThreads)
{
    register uint64 value = (COUNT / *((int*)numThreads)) + 1;

    while (--value != 0)
    {
        __sync_add_and_fetch(&counter, 1);
    }
}

void* run_xadd(void*)
{
    register uint64 value = counter;

    while (value < COUNT)
    {
        value = __sync_add_and_fetch(&counter, 1);
    }
}

void* run_cas(void*)
{
    register uint64 value = 0;

    while (value < COUNT)
    {
        do
        {
            value = counter;
        }
        while (!__sync_bool_compare_and_swap(&counter, value, value + 1));
    }
}

void* run_cas2(void*)
{
    register uint64 value = 0;
    register uint64 next = 0;

    while (value < COUNT)
    {
        value = counter;
        do
        {
            next = value + 1;
            value = __sync_val_compare_and_swap(&counter, value, next);
        }
        while (value != next);
    }
}

int main (int argc, char *argv[])
{
    const int NUM_THREADS = atoi(argv[1]);
    const int TESTCASE = atoi(argv[2]);

    pthread_t threads[NUM_THREADS];
    void* status;

    timespec ts_start;
    timespec ts_finish;
    clock_gettime(CLOCK_MONOTONIC, &ts_start);


    for (int i = 0; i < NUM_THREADS; i++)
    {
        switch (TESTCASE)
        {
            case 1:
                std::cout << "LOCK ADD" << std::endl;
                pthread_create(&threads[i], NULL, run_add, (void*)&NUM_THREADS);
                break;

            case 2:
                std::cout << "LOCK XADD" << std::endl;
                pthread_create(&threads[i], NULL, run_xadd, (void*)&NUM_THREADS);
                break;

            case 3:
                std::cout << "LOCK CMPXCHG BOOL" << std::endl;
                pthread_create(&threads[i], NULL, run_cas, (void*)&NUM_THREADS);
                break;

            case 4:
                std::cout << "LOCK CMPXCHG VAL" << std::endl;
                pthread_create(&threads[i], NULL, run_cas2, (void*)&NUM_THREADS);
                break;

            default:
                exit(1);
        }
    }

    for (int i = 0; i < NUM_THREADS; i++)
    {
        pthread_join(threads[i], &status);
    }

    clock_gettime(CLOCK_MONOTONIC, &ts_finish);

    uint64 start = (ts_start.tv_sec * 1000000000) + ts_start.tv_nsec;
    uint64 finish = (ts_finish.tv_sec * 1000000000) + ts_finish.tv_nsec;
    uint64 duration = finish - start;

    std::cout << "threads = " << NUM_THREADS << std::endl;
    std::cout << "duration = " <<  duration << std::endl;
    std::cout << "ns per op = " << (duration / (COUNT * 2)) << std::endl;
    std::cout << "op/sec = " << ((COUNT * 2 * 1000 * 1000 * 1000) / duration) << std::endl;
    std::cout << "counter = " << counter << std::endl;

    return 0;
}
The code above makes it possible to test the major CAS based techniques on x86. For full clarity an objdump -d of the binary reveals the compiler generated assembly instructions for the above methods. The "lock" instruction in each section is where the atomic update is happening.
0000000000400dc0 <_z8run_cas2pv>:
  400dc0: 48 8b 05 d9 07 20 00  mov    0x2007d9(%rip),%rax      
  400dc7: 66 0f 1f 84 00 00 00  nopw   0x0(%rax,%rax,1)
  400dce: 00 00 
  400dd0: 48 8d 50 01           lea    0x1(%rax),%rdx
  400dd4: f0 48 0f b1 15 c3 07  lock cmpxchg %rdx,0x2007c3(%rip)
  400ddb: 20 00 
  400ddd: 48 39 c2              cmp    %rax,%rdx
  400de0: 75 ee                 jne    400dd0 <_z8run_cas2pv>
  400de2: 48 3d ff 64 cd 1d     cmp    $0x1dcd64ff,%rax
  400de8: 76 d6                 jbe    400dc0 <_z8run_cas2pv>
  400dea: f3 c3                 repz retq 
  400dec: 0f 1f 40 00           nopl   0x0(%rax)

0000000000400df0 <_z7run_caspv>:
  400df0: 48 8b 15 a9 07 20 00  mov    0x2007a9(%rip),%rdx     
  400df7: 48 8d 4a 01           lea    0x1(%rdx),%rcx
  400dfb: 48 89 d0              mov    %rdx,%rax
  400dfe: f0 48 0f b1 0d 99 07  lock cmpxchg %rcx,0x200799(%rip)  
  400e05: 20 00 
  400e07: 75 e7                 jne    400df0 <_z7run_caspv>
  400e09: 48 81 fa ff 64 cd 1d  cmp    $0x1dcd64ff,%rdx
  400e10: 76 de                 jbe    400df0 <_z7run_caspv>
  400e12: f3 c3                 repz retq 
  400e14: 66 66 66 2e 0f 1f 84  data32 data32 nopw %cs:0x0(%rax,%rax,1)
  400e1b: 00 00 00 00 00 

0000000000400e20 <_z8run_xaddpv>:
  400e20: 48 8b 05 79 07 20 00  mov    0x200779(%rip),%rax    
  400e27: 48 3d ff 64 cd 1d     cmp    $0x1dcd64ff,%rax
  400e2d: 77 1b                 ja     400e4a <_z8run_xaddpv>
  400e2f: 90                    nop
  400e30: b8 01 00 00 00        mov    $0x1,%eax
  400e35: f0 48 0f c1 05 62 07  lock xadd %rax,0x200762(%rip) 
  400e3c: 20 00 
  400e3e: 48 83 c0 01           add    $0x1,%rax
  400e42: 48 3d ff 64 cd 1d     cmp    $0x1dcd64ff,%rax
  400e48: 76 e6                 jbe    400e30 <_z8run_xaddp>
  400e4a: f3 c3                 repz retq 
  400e4c: 0f 1f 40 00           nopl   0x0(%rax)

0000000000400e50 <_z7run_addpv>:
  400e50: 48 63 0f              movslq (%rdi),%rcx
  400e53: 31 d2                 xor    %edx,%edx
  400e55: b8 00 65 cd 1d        mov    $0x1dcd6500,%eax
  400e5a: 48 f7 f1              div    %rcx
  400e5d: 48 85 c0              test   %rax,%rax
  400e60: 74 15                 je     400e77 <_z7run_addpv>
  400e62: 66 0f 1f 44 00 00     nopw   0x0(%rax,%rax,1)
  400e68: f0 48 83 05 2f 07 20  lock addq $0x1,0x20072f(%rip)    
  400e6f: 00 01 
  400e71: 48 83 e8 01           sub    $0x1,%rax
  400e75: 75 f1                 jne    400e68 <_z7run_addpv>
  400e77: f3 c3                 repz retq 
  400e79: 90                    nop
  400e7a: 90                    nop
  400e7b: 90                    nop
  400e7c: 90                    nop
  400e7d: 90                    nop
  400e7e: 90                    nop
  400e7f: 90                    nop
To purely isolate the performance of the CAS operation the test should be run using the lock xadd option for an atomic increment in hardware.  This instruction lets us avoid the spin-retry loop of a pure software CAS that can dirty the experiment.

I repeated the experiment from the previous article and got very similar results.  Previously, I thought I'd observed a throughput drop even in the uncontended single-threaded case.  So I focused in on this to confirm.  To do this I had to find two processors that once Turbo Boost had kicked in then the clock speeds would be comparable.  I found this by using a 2.8GHz Nehalem and 2.4GHz Sandy Bridge.  For the single-threaded case they are both operating at ~3.4GHz.
Nehalem 2.8GHz
==============
$ perf stat ./atomic_inc 1 2
LOCK XADD
threads = 1
duration = 3090445546
ns per op = 3
op/sec = 323577938

 Performance counter stats for './atomic_inc 1 2':

       3085.466216 task-clock                #    0.997 CPUs utilized          
               331 context-switches          #    0.107 K/sec                  
                 4 CPU-migrations            #    0.001 K/sec                  
               360 page-faults               #    0.117 K/sec                  
    10,527,264,923 cycles                    #    3.412 GHz                 
     9,394,575,677 stalled-cycles-frontend   #   89.24% frontend cycles idle
     7,423,070,202 stalled-cycles-backend    #   70.51% backend  cycles idle 
     2,517,668,293 instructions              #    0.24  insns per cycle        
                                             #    3.73  stalled cycles per insn
       503,526,119 branches                  #  163.193 M/sec                  
           110,695 branch-misses             #    0.02% of all branches       

       3.093402966 seconds time elapsed

Sandy Bridge 2.4GHz
===================
$ perf stat ./atomic_inc 1 2
LOCK XADD
threads = 1
duration = 3394221940
ns per op = 3
op/sec = 294618330

 Performance counter stats for './atomic_inc 1 2':

       3390.404400 task-clock                #    0.998 CPUs utilized          
               357 context-switches          #    0.105 K/sec                  
                 1 CPU-migrations            #    0.000 K/sec                  
               358 page-faults               #    0.106 K/sec                  
    11,522,932,068 cycles                    #    3.399 GHz                 
     9,542,667,311 stalled-cycles-frontend   #   82.81% frontend cycles idle  
     6,721,330,874 stalled-cycles-backend    #   58.33% backend  cycles idle  
     2,518,638,461 instructions              #    0.22  insns per cycle        
                                             #    3.79  stalled cycles per insn
       502,490,710 branches                  #  148.210 M/sec                  
            36,955 branch-misses             #    0.01% of all branches        

       3.398206155 seconds time elapsed

Analysis

So repeating the tests with comparable clock speeds confirmed the previous results.  The single-threaded case shows a ~10% drop in throughput and the multi-threaded contended case displays an order-of-magnitude difference in throughput.

Now the big question is what is going on and why has throughput dropped?  Well the single-threaded case suggests nothing major has happened to number of cycles required to execute the instruction when uncontended.  The small differences could be attributed to system noise or the changes in the CPU front-end for Sandy Bridge with introduction of the additional load address generation unit.

For the multi-threaded case we found an interesting surprise when Intel monitored what the instructions are doing.  We found that each thread on Nehalem was able to perform more updates in a batch before loosing the exclusive state on the cacheline containing the counter.  This is because the inter-core latency has improved with Sandy Bridge so other threads are able to faster claim the cacheline containing the counter to do their own updates.  What we are actually measuring with this micro-benchmark is how long a core can hold a cacheline before it is released to another core.  Sandy Bridge is exhibiting greater fairness which is what you'd want in a real world application.

This micro-benchmark is very unrealistic for a real world application.  Normally between performing counter updates a core would be doing a lot of other work.  At the point when the counter needs to be updated the reduced latency inter-core would then be a benefit.

In all my macro application benchmarks Sandy Bridge has proved to have better performance than Nehalem at comparable clock speeds.

Conclusion

What did I learn from this?  Well once again that writing micro-benchmarks is notoriously difficult.  It is so hard to know what you are measuring and what effects can come into play.  To illustrate how difficult it is to recognise such a flaw, for all those who have read this blog, no one has identified the issue and fed this back to me.

It also shows that what on first blush can be considered a performance bug is actually the opposite.  This shows how it is possible to have a second order effect when a performance improvement can make a specific work case run more slowly.

Wednesday 19 December 2012

Mechanical Sympathy Discussion Group

Lately a number of people have suggested I start a discussion group on the subject of mechanical sympathy, so I've taken the plunge and done it!  The group can be a place to discuss topics related to writing software which works in harmony with the underlying hardware to gain great performance.


Wednesday 17 October 2012

Compact Off-Heap Structures/Tuples In Java

In my last post I detailed the implications of the access patterns your code takes to main memory.  Since then I've had a lot of questions about what can be done in Java to enable more predictable memory layout.  There are patterns that can be applied using array backed structures which I will discuss in another post.   This post will explore how to simulate a feature sorely missing in Java - arrays of structures similar to what C has to offer.

Structures are very useful, both on the stack and the heap.  To my knowledge it is not possible to simulate this feature on the Java stack.  Not being able to do this on the stack is such as shame because it greatly limits the performance of some parallel algorithms, however that is a rant for another day.

In Java, all user defined types have to exist on the heap.  The Java heap is managed by the garbage collector in the general case, however there is more to the wider heap in a Java process.  With the introduction of direct ByteBuffer, memory can be allocated which is not tracked by the garbage collector because it can be available to native code for tasks like avoiding the copying of data to and from the kernel for IO.  So one method of managing structures is to fake them within a ByteBuffer as a reasonable approach.  This can allow compact data representations, but has performance and size limitations.  For example, it is not possible to have a ByteBuffer greater than 2GB, and all access is bounds checked which impacts performance.  An alternative exists using Unsafe that is both faster and and not size constrained like ByteBuffer.

The approach I'm about to detail is not traditional Java.  If your problem space is dealing with big data, or extreme performance, then there are benefits to be had.  If your data sets are small, and performance is not an issue, then run away now to avoid getting sucked into the dark arts of native memory management.

The benefits of the approach I'm about to detail are:
  1. Significantly improved performance 
  2. More compact data representation
  3. Ability to work with very large data sets while avoiding nasty GC pauses[1]
With all choices there are consequences.  By taking the approach detailed below you take responsibility for some of the memory managment yourself.  Getting it wrong can lead to memory leaks, or worse, you can crash the JVM!  Proceed with caution...

Suitable Example - Trade Data

A common challenge faced in finance applications is capturing and working with very large volumes of order and trade data.  For the example I will create a large table of in-memory trade data that can have analysis queries run against it.  This table will be built using 2 contrasting approaches.  Firstly, I'll take the traditional Java approach of creating a large array and reference individual Trade objects.  Secondly, I keep the usage code identical but replace the large array and Trade objects with an off-heap array of structures that can be manipulated via a Flyweight pattern.

If for the traditional Java approach I used some other data structure, such as a Map or Tree, then the memory footprint would be even greater and the performance lower.

Traditional Java Approach
public class TestJavaMemoryLayout
{
    private static final int NUM_RECORDS = 50 * 1000 * 1000;

    private static JavaMemoryTrade[] trades;

    public static void main(final String[] args)
    {
        for (int i = 0; i < 5; i++)
        {
            System.gc();
            perfRun(i);
        }
    }

    private static void perfRun(final int runNum)
    {
        long start = System.currentTimeMillis();

        init();

        System.out.format("Memory %,d total, %,d free\n",
                          Runtime.getRuntime().totalMemory(),
                          Runtime.getRuntime().freeMemory());

        long buyCost = 0;
        long sellCost = 0;

        for (int i = 0; i < NUM_RECORDS; i++)
        {
            final JavaMemoryTrade trade = get(i);

            if (trade.getSide() == 'B')
            {
                buyCost += (trade.getPrice() * trade.getQuantity());
            }
            else
            {
                sellCost += (trade.getPrice() * trade.getQuantity());
            }
        }

        long duration = System.currentTimeMillis() - start;
        System.out.println(runNum + " - duration " + duration + "ms");
        System.out.println("buyCost = " + buyCost + " sellCost = " + sellCost);
    }

    private static JavaMemoryTrade get(final int index)
    {
        return trades[index];
    }

    public static void init()
    {
        trades = new JavaMemoryTrade[NUM_RECORDS];

        final byte[] londonStockExchange = {'X', 'L', 'O', 'N'};
        final int venueCode = pack(londonStockExchange);

        final byte[] billiton = {'B', 'H', 'P'};
        final int instrumentCode = pack( billiton);

        for (int i = 0; i < NUM_RECORDS; i++)
        {
            JavaMemoryTrade trade = new JavaMemoryTrade();
            trades[i] = trade;

            trade.setTradeId(i);
            trade.setClientId(1);
            trade.setVenueCode(venueCode);
            trade.setInstrumentCode(instrumentCode);

            trade.setPrice(i);
            trade.setQuantity(i);

            trade.setSide((i & 1) == 0 ? 'B' : 'S');
        }
    }

    private static int pack(final byte[] value)
    {
        int result = 0;
        switch (value.length)
        {
            case 4:
                result = (value[3]);
            case 3:
                result |= ((int)value[2] << 8);
            case 2:
                result |= ((int)value[1] << 16);
            case 1:
                result |= ((int)value[0] << 24);
                break;

            default:
                throw new IllegalArgumentException("Invalid array size");
        }

        return result;
    }

    private static class JavaMemoryTrade
    {
        private long tradeId;
        private long clientId;
        private int venueCode;
        private int instrumentCode;
        private long price;
        private long quantity;
        private char side;

        public long getTradeId()
        {
            return tradeId;
        }

        public void setTradeId(final long tradeId)
        {
            this.tradeId = tradeId;
        }

        public long getClientId()
        {
            return clientId;
        }

        public void setClientId(final long clientId)
        {
            this.clientId = clientId;
        }

        public int getVenueCode()
        {
            return venueCode;
        }

        public void setVenueCode(final int venueCode)
        {
            this.venueCode = venueCode;
        }

        public int getInstrumentCode()
        {
            return instrumentCode;
        }

        public void setInstrumentCode(final int instrumentCode)
        {
            this.instrumentCode = instrumentCode;
        }

        public long getPrice()
        {
            return price;
        }

        public void setPrice(final long price)
        {
            this.price = price;
        }

        public long getQuantity()
        {
            return quantity;
        }

        public void setQuantity(final long quantity)
        {
            this.quantity = quantity;
        }

        public char getSide()
        {
            return side;
        }

        public void setSide(final char side)
        {
            this.side = side;
        }
    }
}
Compact Off-Heap Structures
import sun.misc.Unsafe;

import java.lang.reflect.Field;

public class TestDirectMemoryLayout
{
    private static final Unsafe unsafe;
    static
    {
        try
        {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            unsafe = (Unsafe)field.get(null);
        }
        catch (Exception e)
        {
            throw new RuntimeException(e);
        }
    }

    private static final int NUM_RECORDS = 50 * 1000 * 1000;

    private static long address;
    private static final DirectMemoryTrade flyweight = new DirectMemoryTrade();

    public static void main(final String[] args)
    {
        for (int i = 0; i < 5; i++)
        {
            System.gc();
            perfRun(i);
        }
    }

    private static void perfRun(final int runNum)
    {
        long start = System.currentTimeMillis();

        init();

        System.out.format("Memory %,d total, %,d free\n",
                          Runtime.getRuntime().totalMemory(),
                          Runtime.getRuntime().freeMemory());

        long buyCost = 0;
        long sellCost = 0;

        for (int i = 0; i < NUM_RECORDS; i++)
        {
            final DirectMemoryTrade trade = get(i);

            if (trade.getSide() == 'B')
            {
                buyCost += (trade.getPrice() * trade.getQuantity());
            }
            else
            {
                sellCost += (trade.getPrice() * trade.getQuantity());
            }
        }

        long duration = System.currentTimeMillis() - start;
        System.out.println(runNum + " - duration " + duration + "ms");
        System.out.println("buyCost = " + buyCost + " sellCost = " + sellCost);

        destroy();
    }

    private static DirectMemoryTrade get(final int index)
    {
        final long offset = address + (index * DirectMemoryTrade.getObjectSize());
        flyweight.setObjectOffset(offset);
        return flyweight;
    }

    public static void init()
    {
        final long requiredHeap = NUM_RECORDS * DirectMemoryTrade.getObjectSize();
        address = unsafe.allocateMemory(requiredHeap);

        final byte[] londonStockExchange = {'X', 'L', 'O', 'N'};
        final int venueCode = pack(londonStockExchange);

        final byte[] billiton = {'B', 'H', 'P'};
        final int instrumentCode = pack( billiton);

        for (int i = 0; i < NUM_RECORDS; i++)
        {
            DirectMemoryTrade trade = get(i);

            trade.setTradeId(i);
            trade.setClientId(1);
            trade.setVenueCode(venueCode);
            trade.setInstrumentCode(instrumentCode);

            trade.setPrice(i);
            trade.setQuantity(i);

            trade.setSide((i & 1) == 0 ? 'B' : 'S');
        }
    }

    private static void destroy()
    {
        unsafe.freeMemory(address);
    }

    private static int pack(final byte[] value)
    {
        int result = 0;
        switch (value.length)
        {
            case 4:
                result |= (value[3]);
            case 3:
                result |= ((int)value[2] << 8);
            case 2:
                result |= ((int)value[1] << 16);
            case 1:
                result |= ((int)value[0] << 24);
                break;

            default:
                throw new IllegalArgumentException("Invalid array size");
        }

        return result;
    }

    private static class DirectMemoryTrade
    {
        private static long offset = 0;

        private static final long tradeIdOffset = offset += 0;
        private static final long clientIdOffset = offset += 8;
        private static final long venueCodeOffset = offset += 8;
        private static final long instrumentCodeOffset = offset += 4;
        private static final long priceOffset = offset += 4;
        private static final long quantityOffset = offset += 8;
        private static final long sideOffset = offset += 8;

        private static final long objectSize = offset += 2;

        private long objectOffset;

        public static long getObjectSize()
        {
            return objectSize;
        }

        void setObjectOffset(final long objectOffset)
        {
            this.objectOffset = objectOffset;
        }

        public long getTradeId()
        {
            return unsafe.getLong(objectOffset + tradeIdOffset);
        }

        public void setTradeId(final long tradeId)
        {
            unsafe.putLong(objectOffset + tradeIdOffset, tradeId);
        }

        public long getClientId()
        {
            return unsafe.getLong(objectOffset + clientIdOffset);
        }

        public void setClientId(final long clientId)
        {
            unsafe.putLong(objectOffset + clientIdOffset, clientId);
        }

        public int getVenueCode()
        {
            return unsafe.getInt(objectOffset + venueCodeOffset);
        }

        public void setVenueCode(final int venueCode)
        {
            unsafe.putInt(objectOffset + venueCodeOffset, venueCode);
        }

        public int getInstrumentCode()
        {
            return unsafe.getInt(objectOffset + instrumentCodeOffset);
        }

        public void setInstrumentCode(final int instrumentCode)
        {
            unsafe.putInt(objectOffset + instrumentCodeOffset, instrumentCode);
        }

        public long getPrice()
        {
            return unsafe.getLong(objectOffset + priceOffset);
        }

        public void setPrice(final long price)
        {
            unsafe.putLong(objectOffset + priceOffset, price);
        }

        public long getQuantity()
        {
            return unsafe.getLong(objectOffset + quantityOffset);
        }

        public void setQuantity(final long quantity)
        {
            unsafe.putLong(objectOffset + quantityOffset, quantity);
        }

        public char getSide()
        {
            return unsafe.getChar(objectOffset + sideOffset);
        }

        public void setSide(final char side)
        {
            unsafe.putChar(objectOffset + sideOffset, side);
        }
    }
}
Results
Intel i7-860 @ 2.8GHz, 8GB RAM DDR3 1333MHz, 
Windows 7 64-bit, Java 1.7.0_07
=============================================
java -server -Xms4g -Xmx4g TestJavaMemoryLayout
Memory 4,116,054,016 total, 1,108,901,104 free
0 - duration 19334ms
Memory 4,116,054,016 total, 1,109,964,752 free
1 - duration 14295ms
Memory 4,116,054,016 total, 1,108,455,504 free
2 - duration 14272ms
Memory 3,817,799,680 total, 815,308,600 free
3 - duration 28358ms
Memory 3,817,799,680 total, 810,552,816 free
4 - duration 32487ms

java -server TestDirectMemoryLayout
Memory 128,647,168 total, 126,391,384 free
0 - duration 983ms
Memory 128,647,168 total, 126,992,160 free
1 - duration 958ms
Memory 128,647,168 total, 127,663,408 free
2 - duration 873ms
Memory 128,647,168 total, 127,663,408 free
3 - duration 886ms
Memory 128,647,168 total, 127,663,408 free
4 - duration 884ms

Intel i7-2760QM @ 2.40GHz, 8GB RAM DDR3 1600MHz, 
Linux 3.4.11 kernel 64-bit, Java 1.7.0_07
=================================================
java -server -Xms4g -Xmx4g TestJavaMemoryLayout
Memory 4,116,054,016 total, 1,108,912,960 free
0 - duration 12262ms
Memory 4,116,054,016 total, 1,109,962,832 free
1 - duration 9822ms
Memory 4,116,054,016 total, 1,108,458,720 free
2 - duration 10239ms
Memory 3,817,799,680 total, 815,307,640 free
3 - duration 21558ms
Memory 3,817,799,680 total, 810,551,856 free
4 - duration 23074ms

java -server TestDirectMemoryLayout 
Memory 123,994,112 total, 121,818,528 free
0 - duration 634ms
Memory 123,994,112 total, 122,455,944 free
1 - duration 619ms
Memory 123,994,112 total, 123,103,320 free
2 - duration 546ms
Memory 123,994,112 total, 123,103,320 free
3 - duration 547ms
Memory 123,994,112 total, 123,103,320 free
4 - duration 534ms
Analysis

Let's compare the results to the 3 benefits promised above.

1.  Significantly improved performance

The evidence here is pretty clear cut.  Using the off-heap structures approach is more than an order of magnitude faster.  At the most extreme, look at the 5th run on a Sandy Bridge processor, we have 43.2 times difference in duration to complete the task.  It is also a nice illustration of how well Sandy Bridge does with predictable access patterns to data.  Not only is the performance significantly better it is also more consistent.  As the heap becomes fragmented, and thus access patterns become more random, the performance degrades as can be seen in the later runs with standard Java approach.

2.  More compact data representation

For our off-heap representation each object requires 42-bytes.  To store 50 million of these, as in the example, we require 2,100,000,000 bytes.  The memory required by the JVM heap is:

   memory required = total memory - free memory - base JVM needs 

     2,883,248,712 = 3,817,799,680 - 810,551,856 - 123,999,112

This implies the JVM needs ~40% more memory to represent the same data.  The reason for this overhead is the array of references to the Java objects plus the object headers.  In a previous post I discussed object layout in Java.

When working with very large data sets this overhead can become a significant limiting factor.

3.  Ability to work with very large data sets while avoiding nasty GC pauses

The sample code above forces a GC cycle before each run and can improve the consistency of the results in some cases.  Feel free to remove the call to System.gc() and observe the implications for yourself.  If you run the tests adding the following command line arguments then the garbage collector will output in painful detail what happened.

-XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -XX:+PrintHeapAtGC -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCApplicationStoppedTime -XX:+PrintSafepointStatistics

From analysing the output I can see the application underwent a total of 29 GC cycles.  Pause times are listed below by extracting the lines from the output indicating when the application threads are stopped.
With System.gc() before each run
================================
Total time for which application threads were stopped: 0.0085280 seconds
Total time for which application threads were stopped: 0.7280530 seconds
Total time for which application threads were stopped: 8.1703460 seconds
Total time for which application threads were stopped: 5.6112210 seconds
Total time for which application threads were stopped: 1.2531370 seconds
Total time for which application threads were stopped: 7.6392250 seconds
Total time for which application threads were stopped: 5.7847050 seconds
Total time for which application threads were stopped: 1.3070470 seconds
Total time for which application threads were stopped: 8.2520880 seconds
Total time for which application threads were stopped: 6.0949910 seconds
Total time for which application threads were stopped: 1.3988480 seconds
Total time for which application threads were stopped: 8.1793240 seconds
Total time for which application threads were stopped: 6.4138720 seconds
Total time for which application threads were stopped: 4.4991670 seconds
Total time for which application threads were stopped: 4.5612290 seconds
Total time for which application threads were stopped: 0.3598490 seconds
Total time for which application threads were stopped: 0.7111000 seconds
Total time for which application threads were stopped: 1.4426750 seconds
Total time for which application threads were stopped: 1.5931500 seconds
Total time for which application threads were stopped: 10.9484920 seconds
Total time for which application threads were stopped: 7.0707230 seconds

Without System.gc() before each run
===================================
Test run times
0 - duration 12120ms
1 - duration 9439ms
2 - duration 9844ms
3 - duration 20933ms
4 - duration 23041ms

Total time for which application threads were stopped: 0.0170860 seconds
Total time for which application threads were stopped: 0.7915350 seconds
Total time for which application threads were stopped: 10.7153320 seconds
Total time for which application threads were stopped: 5.6234650 seconds
Total time for which application threads were stopped: 1.2689950 seconds
Total time for which application threads were stopped: 7.6238170 seconds
Total time for which application threads were stopped: 6.0114540 seconds
Total time for which application threads were stopped: 1.2990070 seconds
Total time for which application threads were stopped: 7.9918480 seconds
Total time for which application threads were stopped: 5.9997920 seconds
Total time for which application threads were stopped: 1.3430040 seconds
Total time for which application threads were stopped: 8.0759940 seconds
Total time for which application threads were stopped: 6.3980610 seconds
Total time for which application threads were stopped: 4.5572100 seconds
Total time for which application threads were stopped: 4.6193830 seconds
Total time for which application threads were stopped: 0.3877930 seconds
Total time for which application threads were stopped: 0.7429270 seconds
Total time for which application threads were stopped: 1.5248070 seconds
Total time for which application threads were stopped: 1.5312130 seconds
Total time for which application threads were stopped: 10.9120250 seconds
Total time for which application threads were stopped: 7.3528590 seconds
It can been seen from the output that a significant proportion of the time is spent in the garbage collector.  When your threads are stopped your application is not responsive.  These tests have been done with default GC settings.  It is possible to tune the GC for better results but this can be a highly skilled and significant effort.  The only JVM I know that copes well by not imposing long pause times, even under high-throughput conditions, is the Azul concurrent compacting collector.

When profiling this application, I can see that the majority of the time is spent allocating the objects and promoting them to the old generation because they do not fit in the young generation.  The initialisation costs can be removed from the timing but that is not realistic.  If the traditional Java approach is taken the state needs to be built up before the query can take place.  The end user of an application has to wait for the state to be built up and the query executed.

This test is really quite trivial.  Imagine working with similar data sets but at the 100 GB scale.

Note: When the garbage collector compacts a region, then objects that were next to each other can be moved far apart.  This can result in TLB and other cache misses.

Side Note On Serialization

A huge benefit of using off-heap structures in this manner is how they can be very easily serialised to network, or storage, by a simple memory copy as I have shown in the previous post.  This way we can completely bypass intermediate buffer and object allocation.

Conclusion

If you are willing to do some C style programming for large datasets it is possible to control the memory layout in Java by going off-heap.  If you do, the benefits in performance, compactness, and avoiding GC issues are significant.  However this is an approach that should not be used for all applications.  Its benefits are only noticable for very large datasets, or the extremes of performance in throughput and/or latency. 

I hope the Java community can collectively realise the importance of supporting structures both on the heap and the stack.  John Rose has done some excellent work in this area defining how tuples could be added to the JVM.  His talk on Arrays 2.0 from the JVM Language Summit this year is really worth a watch.  John discusses options for arrays of structures, and structures of arrays, in his talk.  If the tuples, as proposed by John, were available then the test described here could have comparable performance and be a more pleasant programming style.  The whole array of structures could be allocated in a single action thus bypassing the copy of individual objects across generations, and it would be stored in a compact contiguous fashion.  This would remove the significant GC issues for this class of problem.

Lately, I was comparing standard data structures between Java and .Net.  In some cases I observed a 6-10X performance advantage to .Net for things like maps and dictionaries when .Net used native structure support.  Let's get this into Java as soon as possible!

It is also pretty obvious from the results that if we are to use Java for real-time analysis on big data, then our standard garbage collectors need to significantly improve and support true concurrent operations.

[1] - To my knowledge the only JVM that deals well with very large heaps is Azul Zing

Sunday 5 August 2012

Memory Access Patterns Are Important

In high-performance computing it is often said that the cost of a cache-miss is the largest performance penalty for an algorithm.  For many years the increase in speed of our processors has greatly outstripped latency gains to main-memory.  Bandwidth to main-memory has greatly increased via wider, and multi-channel, buses however the latency has not significantly reduced.  To hide this latency our processors employ evermore complex cache sub-systems that have many layers.

The 1994 paper "Hitting the memory wall: implications of the obvious" describes the problem and goes on to argue that caches do not ultimately help because of compulsory cache-misses.  I aim to show that by using access patterns which display consideration for the cache hierarchy, this conclusion is not inevitable.

Let's start putting the problem in context with some examples.  Our hardware tries to hide the main-memory latency via a number of techniques.  Basically three major bets are taken on memory access patterns:
  1. Temporal: Memory accessed recently will likely be required again soon.
  2. Spatial: Adjacent memory is likely to be required soon. 
  3. Striding: Memory access is likely to follow a predictable pattern.
To illustrate these three bets in action let's write some code and measure the results.
  1. Walk through memory in a linear fashion being completely predictable.
  2. Pseudo randomly walk round memory within a restricted area then move on.  This restricted area is what is commonly known as an operating system page of memory.
  3. Pseudo randomly walk around a large area of the heap.
Code

The following code should be run with the -Xmx4g JVM option.
public class TestMemoryAccessPatterns
{
    private static final int LONG_SIZE = 8;
    private static final int PAGE_SIZE = 2 * 1024 * 1024;
    private static final int ONE_GIG = 1024 * 1024 * 1024;
    private static final long TWO_GIG = 2L * ONE_GIG;

    private static final int ARRAY_SIZE = (int)(TWO_GIG / LONG_SIZE);
    private static final int WORDS_PER_PAGE = PAGE_SIZE / LONG_SIZE;

    private static final int ARRAY_MASK = ARRAY_SIZE - 1;
    private static final int PAGE_MASK = WORDS_PER_PAGE - 1;

    private static final int PRIME_INC = 514229;

    private static final long[] memory = new long[ARRAY_SIZE];

    static
    {
        for (int i = 0; i < ARRAY_SIZE; i++)
        {
            memory[i] = 777;
        }
    }

    public enum StrideType
    {
        LINEAR_WALK
        {
            public int next(final int pageOffset, final int wordOffset, final int pos)
            {
                return (pos + 1) & ARRAY_MASK;
            }
        },

        RANDOM_PAGE_WALK
        {
            public int next(final int pageOffset, final int wordOffset, final int pos)
            {
                return pageOffset + ((pos + PRIME_INC) & PAGE_MASK);
            }
        },

        RANDOM_HEAP_WALK
        {
            public int next(final int pageOffset, final int wordOffset, final int pos)
            {
                return (pos + PRIME_INC) & ARRAY_MASK;
            }
        };

        public abstract int next(int pageOffset, int wordOffset, int pos);
    }

    public static void main(final String[] args)
    {
        final StrideType strideType;
        switch (Integer.parseInt(args[0]))
        {
            case 1:
                strideType = StrideType.LINEAR_WALK;
                break;

            case 2:
                strideType = StrideType.RANDOM_PAGE_WALK;
                break;

            case 3:
                strideType = StrideType.RANDOM_HEAP_WALK;
                break;

            default:
                throw new IllegalArgumentException("Unknown StrideType");
        }

        for (int i = 0; i < 5; i++)
        {
            perfTest(i, strideType);
        }
    }

    private static void perfTest(final int runNumber, final StrideType strideType)
    {
        final long start = System.nanoTime();

        int pos = -1;
        long result = 0;
        for (int pageOffset = 0; pageOffset < ARRAY_SIZE; pageOffset += WORDS_PER_PAGE)
        {
            for (int wordOffset = pageOffset, limit = pageOffset + WORDS_PER_PAGE;
                 wordOffset < limit;
                 wordOffset++)
            {
                pos = strideType.next(pageOffset, wordOffset, pos);
                result += memory[pos];
            }
        }

        final long duration = System.nanoTime() - start;
        final double nsOp = duration / (double)ARRAY_SIZE;

        if (208574349312L != result)
        {
            throw new IllegalStateException();
        }

        System.out.format("%d - %.2fns %s\n",
                          Integer.valueOf(runNumber),
                          Double.valueOf(nsOp),
                          strideType);
    }
}
Results
Intel U4100 @ 1.3GHz, 4GB RAM DDR2 800MHz, 
Windows 7 64-bit, Java 1.7.0_05
===========================================
0 - 2.38ns LINEAR_WALK
1 - 2.41ns LINEAR_WALK
2 - 2.35ns LINEAR_WALK
3 - 2.36ns LINEAR_WALK
4 - 2.39ns LINEAR_WALK

0 - 12.45ns RANDOM_PAGE_WALK
1 - 12.27ns RANDOM_PAGE_WALK
2 - 12.17ns RANDOM_PAGE_WALK
3 - 12.22ns RANDOM_PAGE_WALK
4 - 12.18ns RANDOM_PAGE_WALK

0 - 152.86ns RANDOM_HEAP_WALK
1 - 151.80ns RANDOM_HEAP_WALK
2 - 151.72ns RANDOM_HEAP_WALK
3 - 151.91ns RANDOM_HEAP_WALK
4 - 151.36ns RANDOM_HEAP_WALK

Intel i7-860 @ 2.8GHz, 8GB RAM DDR3 1333MHz, 
Windows 7 64-bit, Java 1.7.0_05
=============================================
0 - 1.06ns LINEAR_WALK
1 - 1.05ns LINEAR_WALK
2 - 0.98ns LINEAR_WALK
3 - 1.00ns LINEAR_WALK
4 - 1.00ns LINEAR_WALK

0 - 3.80ns RANDOM_PAGE_WALK
1 - 3.85ns RANDOM_PAGE_WALK
2 - 3.79ns RANDOM_PAGE_WALK
3 - 3.65ns RANDOM_PAGE_WALK
4 - 3.64ns RANDOM_PAGE_WALK

0 - 30.04ns RANDOM_HEAP_WALK
1 - 29.05ns RANDOM_HEAP_WALK
2 - 29.14ns RANDOM_HEAP_WALK
3 - 28.88ns RANDOM_HEAP_WALK
4 - 29.57ns RANDOM_HEAP_WALK

Intel i7-2760QM @ 2.40GHz, 8GB RAM DDR3 1600MHz, 
Linux 3.4.6 kernel 64-bit, Java 1.7.0_05
=================================================
0 - 0.91ns LINEAR_WALK
1 - 0.92ns LINEAR_WALK
2 - 0.88ns LINEAR_WALK
3 - 0.89ns LINEAR_WALK
4 - 0.89ns LINEAR_WALK

0 - 3.29ns RANDOM_PAGE_WALK
1 - 3.35ns RANDOM_PAGE_WALK
2 - 3.33ns RANDOM_PAGE_WALK
3 - 3.31ns RANDOM_PAGE_WALK
4 - 3.30ns RANDOM_PAGE_WALK

0 - 9.58ns RANDOM_HEAP_WALK
1 - 9.20ns RANDOM_HEAP_WALK
2 - 9.44ns RANDOM_HEAP_WALK
3 - 9.46ns RANDOM_HEAP_WALK
4 - 9.47ns RANDOM_HEAP_WALK
Analysis

I ran the code on 3 different CPU architectures illustrating generational steps forward for Intel.  It is clear from the results that each generation has become progressively better at hiding the latency to main-memory based on the 3 bets described above for a relatively small heap.  This is because the size and sophistication of various caches keep improving.  However as memory size increases they become less effective.  For example, if the array is doubled to be 4GB in size, then the average latency increases from ~30ns to ~55ns for the i7-860 doing the random heap walk.

It seems that for the linear walk case, memory latency does not exist.  However as we walk around memory in an evermore random pattern then the latency starts to become very apparent.

The random heap walk produced an interesting result.  This is a our worst case scenario, and given the hardware specifications of these systems, we could be looking at 150ns, 65ns, and 75ns for the above tests respectively based on memory controller and memory module latencies.  For the Nehalem (i7-860) I can further subvert the cache sub-system by using a 4GB array resulting in ~55ns on average per iteration.  The i7-2760QM has larger load buffers, TLB caches, and Linux is running with transparent huge pages which are all working to further hide the latency.  By playing with different prime numbers for the stride, results can vary wildly depending on processor type, e.g. try PRIME_INC = 39916801 for Nehalem.  I'd like to test this on a much larger heap with Sandy Bridge.

The main take away is the more predictable the pattern of access to memory, then the better the cache sub-systems are at hiding main-memory latency.  Let's look at these cache sub-systems in a little detail to try and understand the observed results.

Hardware Components

We have many layers of cache plus the pre-fetchers to consider for how latency gets hidden.  In this section I'll try and cover the major components used to hide latency that our hardware and systems software friends have put in place.  We will investigate these latency hiding components and use the Linux perf and Lightweight Performance Counters utilities to retrieve the performance counters from our CPUs which tell how effective these components are when we execute our programs.  Performance counters are CPU specific and what I've used here are specific to Sandy Bridge.

Data Caches
Processors typically have 2 or 3 layers of data cache.  Each layer as we move out is progressively larger with increasing latency.  The latest Intel processors have 3 layers (L1D, L2, and L3); with sizes 32KB, 256KB, and 4-30MB; and ~1ns, ~4ns, and ~15ns latency respectively for a 3.0GHz CPU.

Data caches are effectively hardware hash tables with a fixed number of slots for each hash value.  These slots are known as "ways".  An 8-way associative cache will have 8 slots to hold values for addresses that hash to the same cache location.  Within these slots the data caches do not store words, they store cache-lines of multiple words.  For an Intel processor these cache-lines are typically 64-bytes, that is 8 words on a 64-bit machine.  This plays to the spatial bet that adjacent memory is likely to be required soon, which is typically the case if we think of arrays or fields of an object.

Data caches are typically evicted in a LRU manner.  Caches work by using a write-back algorithm were stores need only be propagated to main-memory when a modified cache-line is evicted.  This gives rise the the interesting phenomenon that a load can cause a write-back to the outer cache layers and eventually to main-memory.
perf stat -e L1-dcache-loads,L1-dcache-load-misses java -Xmx4g TestMemoryAccessPatterns $

 Performance counter stats for 'java -Xmx4g TestMemoryAccessPatterns 1':
     1,496,626,053 L1-dcache-loads                                            
       274,255,164 L1-dcache-misses
         #   18.32% of all L1-dcache hits

 Performance counter stats for 'java -Xmx4g TestMemoryAccessPatterns 2':
     1,537,057,965 L1-dcache-loads                                            
     1,570,105,933 L1-dcache-misses
         #  102.15% of all L1-dcache hits 

 Performance counter stats for 'java -Xmx4g TestMemoryAccessPatterns 3':
     4,321,888,497 L1-dcache-loads                                           
     1,780,223,433 L1-dcache-misses
         #   41.19% of all L1-dcache hits  

likwid-perfctr -C 2 -g L2CACHE java -Xmx4g TestMemoryAccessPatterns $

java -Xmx4g TestMemoryAccessPatterns 1
+-----------------------+-------------+
|         Event         |   core 2    |
+-----------------------+-------------+
|   INSTR_RETIRED_ANY   | 5.94918e+09 |
| CPU_CLK_UNHALTED_CORE | 5.15969e+09 |
| L2_TRANS_ALL_REQUESTS | 1.07252e+09 |
|     L2_RQSTS_MISS     | 3.25413e+08 |
+-----------------------+-------------+
+-----------------+-----------+
|     Metric      |  core 2   |
+-----------------+-----------+
|   Runtime [s]   |  2.15481  |
|       CPI       | 0.867293  |
| L2 request rate |  0.18028  |
|  L2 miss rate   | 0.0546988 |
|  L2 miss ratio  | 0.303409  |
+-----------------+-----------+
+------------------------+-------------+
|         Event          |   core 2    |
+------------------------+-------------+
| L3_LAT_CACHE_REFERENCE | 1.26545e+08 |
|   L3_LAT_CACHE_MISS    | 2.59059e+07 |
+------------------------+-------------+

java -Xmx4g TestMemoryAccessPatterns 2
+-----------------------+-------------+
|         Event         |   core 2    |
+-----------------------+-------------+
|   INSTR_RETIRED_ANY   | 1.48772e+10 |
| CPU_CLK_UNHALTED_CORE | 1.64712e+10 |
| L2_TRANS_ALL_REQUESTS | 3.41061e+09 |
|     L2_RQSTS_MISS     | 1.5547e+09  |
+-----------------------+-------------+
+-----------------+----------+
|     Metric      |  core 2  |
+-----------------+----------+
|   Runtime [s]   | 6.87876  |
|       CPI       | 1.10714  |
| L2 request rate | 0.22925  |
|  L2 miss rate   | 0.104502 |
|  L2 miss ratio  | 0.455843 |
+-----------------+----------+
+------------------------+-------------+
|         Event          |   core 2    |
+------------------------+-------------+
| L3_LAT_CACHE_REFERENCE | 1.52088e+09 |
|   L3_LAT_CACHE_MISS    | 1.72918e+08 |
+------------------------+-------------+

java -Xmx4g TestMemoryAccessPatterns 3
+-----------------------+-------------+
|         Event         |   core 2    |
+-----------------------+-------------+
|   INSTR_RETIRED_ANY   | 6.49533e+09 |
| CPU_CLK_UNHALTED_CORE | 4.18416e+10 |
| L2_TRANS_ALL_REQUESTS | 4.67488e+09 |
|     L2_RQSTS_MISS     | 1.43442e+09 |
+-----------------------+-------------+
+-----------------+----------+
|     Metric      |  core 2  |
+-----------------+----------+
|   Runtime [s]   |  17.474  |
|       CPI       |  6.4418  |
| L2 request rate | 0.71973  |
|  L2 miss rate   | 0.220838 |
|  L2 miss ratio  | 0.306835 |
+-----------------+----------+
+------------------------+-------------+
|         Event          |   core 2    |
+------------------------+-------------+
| L3_LAT_CACHE_REFERENCE | 1.40079e+09 |
|   L3_LAT_CACHE_MISS    | 1.34832e+09 |
+------------------------+-------------+
Note: The cache-miss rate of the combined L1D, L2 and L3 increases significantly as the pattern of access becomes more random.

Translation Lookaside Buffers (TLBs)
Our programs deal with virtual memory addresses that need to be translated to physical memory addresses.  Virtual memory systems do this by mapping pages.  We need to know the offset for a given page and its size for any memory operation.  Typically page sizes are 4KB and are gradually moving to 2MB and greater.  Linux introduced Transparent Huge Pages in the 2.6.38 kernel giving us 2MB pages.  The translation of virtual memory pages to physical pages is maintained by the page table.  This translation can require multiple accesses to the page table which is a huge performance penalty.  To accelerate this lookup, processors have a small hardware cache at each cache level called the TLB cache.  A miss on the TLB cache can be hugely expensive because the page table may not be in a nearby data cache.  By moving to larger pages, a TLB cache can cover a larger address range for the same number of entries.
perf stat -e dTLB-loads,dTLB-load-misses java -Xmx4g TestMemoryAccessPatterns $
 
 Performance counter stats for 'java -Xmx4g TestMemoryAccessPatterns 1':
     1,496,128,634 dTLB-loads
           310,901 dTLB-misses
              #    0.02% of all dTLB cache hits 

 Performance counter stats for 'java -Xmx4g TestMemoryAccessPatterns 2':
     1,551,585,263 dTLB-loads
           340,230 dTLB-misses
              #    0.02% of all dTLB cache hits

 Performance counter stats for 'java -Xmx4g TestMemoryAccessPatterns 3':
     4,031,344,537 dTLB-loads
     1,345,807,418 dTLB-misses
              #   33.38% of all dTLB cache hits  
Note: We only incur significant TLB misses when randomly walking the whole heap when huge pages are employed.

Hardware Pre-Fetchers
Hardware will try and predict the next memory access our programs will make and speculatively load that memory into fill buffers.  This is done at it simplest level by pre-loading adjacent cache-lines for the spatial bet, or by recognising regular stride based access patterns, typically less than 2KB in stride length.  The tests below we are measuring the number of loads that hit a fill buffer from a hardware pre-fetch.
likwid-perfctr -C 2 -g LOAD_HIT_PRE_HW_PF:PMC0 java -Xmx4g TestMemoryAccessPatterns $

java -Xmx4g TestMemoryAccessPatterns 1
+--------------------+-------------+
|       Event        |   core 2    |
+--------------------+-------------+
| LOAD_HIT_PRE_HW_PF | 1.31613e+09 |
+--------------------+-------------+

java -Xmx4g TestMemoryAccessPatterns 2
+--------------------+--------+
|       Event        | core 2 |
+--------------------+--------+
| LOAD_HIT_PRE_HW_PF | 368930 |
+--------------------+--------+

java -Xmx4g TestMemoryAccessPatterns 3
+--------------------+--------+
|       Event        | core 2 |
+--------------------+--------+
| LOAD_HIT_PRE_HW_PF | 324373 |
+--------------------+--------+
Note: We have a significant success rate for load hits with the pre-fetcher on the linear walk.

Memory Controllers and Row Buffers
Beyond our last level cache (LLC) sits the memory controllers that manage access to the SDRAM banks.  Memory is organised into rows and columns.  To access an address, first the row address must be selected (RAS), then the column address is selected (CAS) within that row to get the word.  The row is typically a page in size and loaded into a row buffer.  Even at this stage the hardware is still helping hide the latency.  A queue of memory access requests is maintained and re-ordered so that multiple words can be fetched from the same row if possible.

Non-Uniform Memory Access (NUMA)
Systems now have memory controllers on the CPU socket.  This move to on-socket memory controllers gave an ~50ns latency reduction over existing front side bus (FSB) and external Northbridge memory controllers.  Systems with multiple sockets employ memory interconnects, QPI from Intel, which are used when one CPU wants to access memory managed by another CPU socket.  The presence of these interconnects gives rise to the non-uniform nature of server memory access.  In a 2-socket system memory may be local or 1 hop away.  On a 8-socket system memory can be up to 3 hops away, were each hop adds 20ns latency in each direction.

What does this mean for algorithms?

The difference between an L1D cache-hit, and a full miss resulting in main-memory access, is 2 orders of magnitude; i.e. <1ns vs. 65-100ns.  If algorithms randomly walk around our ever increasing address spaces, then we are less likely to benefit from the hardware support that hides this latency.

Is there anything we can do about this when designing algorithms and data-structures?  Yes there is a lot we can do.  If we perform chunks of work on data that is co-located, and we stride around memory in a predictable fashion, then our algorithms can be many times faster.  For example rather than using bucket and chain hash tables, like in the JDK, we can employ hash tables using open-addressing with linear-probing.  Rather than using linked-lists or trees with single items in each node, we can store an array of many items in each node.

Research is advancing on algorithmic approaches that work in harmony with cache sub-systems.  One area I find fascinating is Cache Oblivious Algorithms.  The name is a bit misleading but there are some great concepts here for how to improve software performance and better execute in parallel.  This article is a great illustration of the performance benefits that can be gained.

Conclusion

To achieve great performance it is important to have sympathy for the cache sub-systems.  We have seen in this article what can be achieved by accessing memory in patterns which work with, rather than against, these caches.  When designing algorithms and data structures, it is now vitally important to consider cache-misses, probably even more so than counting steps in the algorithm.  This is not what we were taught in algorithm theory when studying computer science.  The last decade has seen some fundamental changes in technology.  For me the two most significant are the rise of multi-core, and now big-memory systems with 64-bit address spaces.

One thing is certain, if we want software to execute faster and scale better, we need to make better use of the many cores in our CPUs, and pay attention to memory access patterns.

Update: 06-August-2012
Trying to design a random walk algorithm for all processors and memory sizes is tricky.  If I use the algorithm below then my Sandy Bridge processor is slower but the Nehalem is faster.  The point is performance will be very unpredictable when you walk around memory in a random fashion.  I've also included the L3 cache counters for more detail in all the tests.
    private static final long LARGE_PRIME_INC = 70368760954879L;

        RANDOM_HEAP_WALK
        {
            public int next(final int pageOffset, final int wordOffset, final int pos)
            {
                return (int)(pos + LARGE_PRIME_INC) & ARRAY_MASK;
            }
        };
Intel i7-2760QM @ 2.40GHz, 8GB RAM DDR3 1600MHz, 
Linux 3.4.6 kernel 64-bit, Java 1.7.0_05
=================================================
0 - 29.06ns RANDOM_HEAP_WALK
1 - 29.47ns RANDOM_HEAP_WALK
2 - 29.48ns RANDOM_HEAP_WALK
3 - 29.43ns RANDOM_HEAP_WALK
4 - 29.42ns RANDOM_HEAP_WALK

 Performance counter stats for 'java -Xmx4g TestMemoryAccessPatterns 3':
     9,444,928,682 dTLB-loads
     4,371,982,327 dTLB-misses
         #   46.29% of all dTLB cache hits 

     9,390,675,639 L1-dcache-loads
     1,471,647,016 L1-dcache-misses
         #   15.67% of all L1-dcache hits  

+-----------------------+-------------+
|         Event         |   core 2    |
+-----------------------+-------------+
|   INSTR_RETIRED_ANY   | 7.71171e+09 |
| CPU_CLK_UNHALTED_CORE | 1.31717e+11 |
| L2_TRANS_ALL_REQUESTS | 8.4912e+09  |
|     L2_RQSTS_MISS     | 2.79635e+09 |
+-----------------------+-------------+
+-----------------+----------+
|     Metric      |  core 2  |
+-----------------+----------+
|   Runtime [s]   | 55.0094  |
|       CPI       | 17.0801  |
| L2 request rate | 1.10108  |
|  L2 miss rate   | 0.362611 |
|  L2 miss ratio  | 0.329324 |
+-----------------+----------+
+--------------------+-------------+
|       Event        |   core 2    |
+--------------------+-------------+
| LOAD_HIT_PRE_HW_PF | 3.59509e+06 |
+--------------------+-------------+
+------------------------+-------------+
|        Event           |   core 2    |
+------------------------+-------------+
| L3_LAT_CACHE_REFERENCE | 1.30318e+09 |
| L3_LAT_CACHE_MISS      | 2.62346e+07 |
+------------------------+-------------+

Thursday 5 July 2012

Native C/C++ Like Performance For Java Object Serialisation

Do you ever wish you could turn a Java object into a stream of bytes as fast as it can be done in a native language like C++?  If you use standard Java Serialization you could be disappointed with the performance.  Java Serialization was designed for a very different purpose than serialising objects as quickly and compactly as possible.

Why do we need fast and compact serialisation?  Many of our systems are distributed and we need to communicate by passing state between processes efficiently.  This state lives inside our objects.  I've profiled many systems and often a large part of the cost is the serialisation of this state to-and-from byte buffers.  I've seen a significant range of protocols and mechanisms used to achieve this.  At one end of the spectrum are the easy to use but inefficient protocols likes Java Serialisation, XML and JSON.  At the other end of this spectrum are the binary protocols that can be very fast and efficient but they require a deeper understanding and skill.

In this article I will illustrate the performance gains that are possible when using simple binary protocols and introduce a little known technique available in Java to achieve similar performance to what is possible with native languages like C or C++.

The three approaches to be compared are:
  1. Java Serialization: The standard method in Java of having an object implement Serializable.
  2. Binary via ByteBuffer: A simple protocol using the ByteBuffer API to write the fields of an object in binary format.  This is our baseline for what is considered a good binary encoding approach.
  3. Binary via Unsafe: Introduction to Unsafe and its collection of methods that allow direct memory manipulation.  Here I will show how to get similar performance to C/C++.
The Code
import sun.misc.Unsafe;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.Arrays;

public final class TestSerialisationPerf
{
    public static final int REPETITIONS = 1 * 1000 * 1000;

    private static ObjectToBeSerialised ITEM =
        new ObjectToBeSerialised(
            1010L, true, 777, 99,
            new double[]{0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0},
            new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});


    public static void main(final String[] arg) throws Exception
    {
        for (final PerformanceTestCase testCase : testCases)
        {
            for (int i = 0; i < 5; i++)
            {
                testCase.performTest();

                System.out.format("%d %s\twrite=%,dns read=%,dns total=%,dns\n",
                                  i,
                                  testCase.getName(),
                                  testCase.getWriteTimeNanos(),
                                  testCase.getReadTimeNanos(),
                                  testCase.getWriteTimeNanos() + 
                                  testCase.getReadTimeNanos());

                if (!ITEM.equals(testCase.getTestOutput()))
                {
                    throw new IllegalStateException("Objects do not match");
                }

                System.gc();
                Thread.sleep(3000);
            }
        }
    }

    private static final PerformanceTestCase[] testCases =
    {
        new PerformanceTestCase("Serialisation", REPETITIONS, ITEM)
        {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();

            public void testWrite(ObjectToBeSerialised item) throws Exception
            {
                for (int i = 0; i < REPETITIONS; i++)
                {
                    baos.reset();

                    ObjectOutputStream oos = new ObjectOutputStream(baos);
                    oos.writeObject(item);
                    oos.close();
                }
            }

            public ObjectToBeSerialised testRead() throws Exception
            {
                ObjectToBeSerialised object = null;
                for (int i = 0; i < REPETITIONS; i++)
                {
                    ByteArrayInputStream bais = 
                        new ByteArrayInputStream(baos.toByteArray());
                    ObjectInputStream ois = new ObjectInputStream(bais);
                    object = (ObjectToBeSerialised)ois.readObject();
                }

                return object;
            }
        },

        new PerformanceTestCase("ByteBuffer", REPETITIONS, ITEM)
        {
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

            public void testWrite(ObjectToBeSerialised item) throws Exception
            {
                for (int i = 0; i < REPETITIONS; i++)
                {
                    byteBuffer.clear();
                    item.write(byteBuffer);
                }
            }

            public ObjectToBeSerialised testRead() throws Exception
            {
                ObjectToBeSerialised object = null;
                for (int i = 0; i < REPETITIONS; i++)
                {
                    byteBuffer.flip();
                    object = ObjectToBeSerialised.read(byteBuffer);
                }

                return object;
            }
        },

        new PerformanceTestCase("UnsafeMemory", REPETITIONS, ITEM)
        {
            UnsafeMemory buffer = new UnsafeMemory(new byte[1024]);

            public void testWrite(ObjectToBeSerialised item) throws Exception
            {
                for (int i = 0; i < REPETITIONS; i++)
                {
                    buffer.reset();
                    item.write(buffer);
                }
            }

            public ObjectToBeSerialised testRead() throws Exception
            {
                ObjectToBeSerialised object = null;
                for (int i = 0; i < REPETITIONS; i++)
                {
                    buffer.reset();
                    object = ObjectToBeSerialised.read(buffer);
                }

                return object;
            }
        },
    };
}

abstract class PerformanceTestCase
{
    private final String name;
    private final int repetitions;
    private final ObjectToBeSerialised testInput;
    private ObjectToBeSerialised testOutput;
    private long writeTimeNanos;
    private long readTimeNanos;

    public PerformanceTestCase(final String name, final int repetitions,
                               final ObjectToBeSerialised testInput)
    {
        this.name = name;
        this.repetitions = repetitions;
        this.testInput = testInput;
    }

    public String getName()
    {
        return name;
    }

    public ObjectToBeSerialised getTestOutput()
    {
        return testOutput;
    }

    public long getWriteTimeNanos()
    {
        return writeTimeNanos;
    }

    public long getReadTimeNanos()
    {
        return readTimeNanos;
    }

    public void performTest() throws Exception
    {
        final long startWriteNanos = System.nanoTime();
        testWrite(testInput);
        writeTimeNanos = (System.nanoTime() - startWriteNanos) / repetitions;

        final long startReadNanos = System.nanoTime();
        testOutput = testRead();
        readTimeNanos = (System.nanoTime() - startReadNanos) / repetitions;
    }

    public abstract void testWrite(ObjectToBeSerialised item) throws Exception;
    public abstract ObjectToBeSerialised testRead() throws Exception;
}

class ObjectToBeSerialised implements Serializable
{
    private static final long serialVersionUID = 10275539472837495L;

    private final long sourceId;
    private final boolean special;
    private final int orderCode;
    private final int priority;
    private final double[] prices;
    private final long[] quantities;

    public ObjectToBeSerialised(final long sourceId, final boolean special,
                                final int orderCode, final int priority,
                                final double[] prices, final long[] quantities)
    {
        this.sourceId = sourceId;
        this.special = special;
        this.orderCode = orderCode;
        this.priority = priority;
        this.prices = prices;
        this.quantities = quantities;
    }

    public void write(final ByteBuffer byteBuffer)
    {
        byteBuffer.putLong(sourceId);
        byteBuffer.put((byte)(special ? 1 : 0));
        byteBuffer.putInt(orderCode);
        byteBuffer.putInt(priority);

        byteBuffer.putInt(prices.length);
        for (final double price : prices)
        {
            byteBuffer.putDouble(price);
        }

        byteBuffer.putInt(quantities.length);
        for (final long quantity : quantities)
        {
            byteBuffer.putLong(quantity);
        }
    }

    public static ObjectToBeSerialised read(final ByteBuffer byteBuffer)
    {
        final long sourceId = byteBuffer.getLong();
        final boolean special = 0 != byteBuffer.get();
        final int orderCode = byteBuffer.getInt();
        final int priority = byteBuffer.getInt();

        final int pricesSize = byteBuffer.getInt();
        final double[] prices = new double[pricesSize];
        for (int i = 0; i < pricesSize; i++)
        {
            prices[i] = byteBuffer.getDouble();
        }

        final int quantitiesSize = byteBuffer.getInt();
        final long[] quantities = new long[quantitiesSize];
        for (int i = 0; i < quantitiesSize; i++)
        {
            quantities[i] = byteBuffer.getLong();
        }

        return new ObjectToBeSerialised(sourceId, special, orderCode, 
                                        priority, prices, quantities);
    }

    public void write(final UnsafeMemory buffer)
    {
        buffer.putLong(sourceId);
        buffer.putBoolean(special);
        buffer.putInt(orderCode);
        buffer.putInt(priority);
        buffer.putDoubleArray(prices);
        buffer.putLongArray(quantities);
    }

    public static ObjectToBeSerialised read(final UnsafeMemory buffer)
    {
        final long sourceId = buffer.getLong();
        final boolean special = buffer.getBoolean();
        final int orderCode = buffer.getInt();
        final int priority = buffer.getInt();
        final double[] prices = buffer.getDoubleArray();
        final long[] quantities = buffer.getLongArray();

        return new ObjectToBeSerialised(sourceId, special, orderCode, 
                                        priority, prices, quantities);
    }

    public boolean equals(final Object o)
    {
        if (this == o)
        {
            return true;
        }
        if (o == null || getClass() != o.getClass())
        {
            return false;
        }

        final ObjectToBeSerialised that = (ObjectToBeSerialised)o;

        if (orderCode != that.orderCode)
        {
            return false;
        }
        if (priority != that.priority)
        {
            return false;
        }
        if (sourceId != that.sourceId)
        {
            return false;
        }
        if (special != that.special)
        {
            return false;
        }
        if (!Arrays.equals(prices, that.prices))
        {
            return false;
        }
        if (!Arrays.equals(quantities, that.quantities))
        {
            return false;
        }

        return true;
    }
}

class UnsafeMemory
{
    private static final Unsafe unsafe;
    static
    {
        try
        {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            unsafe = (Unsafe)field.get(null);
        }
        catch (Exception e)
        {
            throw new RuntimeException(e);
        }
    }

    private static final long byteArrayOffset = unsafe.arrayBaseOffset(byte[].class);
    private static final long longArrayOffset = unsafe.arrayBaseOffset(long[].class);
    private static final long doubleArrayOffset = unsafe.arrayBaseOffset(double[].class);

    private static final int SIZE_OF_BOOLEAN = 1;
    private static final int SIZE_OF_INT = 4;
    private static final int SIZE_OF_LONG = 8;

    private int pos = 0;
    private final byte[] buffer;

    public UnsafeMemory(final byte[] buffer)
    {
        if (null == buffer)
        {
            throw new NullPointerException("buffer cannot be null");
        }

        this.buffer = buffer;
    }

    public void reset()
    {
        this.pos = 0;
    }

    public void putBoolean(final boolean value)
    {
        unsafe.putBoolean(buffer, byteArrayOffset + pos, value);
        pos += SIZE_OF_BOOLEAN;
    }

    public boolean getBoolean()
    {
        boolean value = unsafe.getBoolean(buffer, byteArrayOffset + pos);
        pos += SIZE_OF_BOOLEAN;

        return value;
    }

    public void putInt(final int value)
    {
        unsafe.putInt(buffer, byteArrayOffset + pos, value);
        pos += SIZE_OF_INT;
    }

    public int getInt()
    {
        int value = unsafe.getInt(buffer, byteArrayOffset + pos);
        pos += SIZE_OF_INT;

        return value;
    }

    public void putLong(final long value)
    {
        unsafe.putLong(buffer, byteArrayOffset + pos, value);
        pos += SIZE_OF_LONG;
    }

    public long getLong()
    {
        long value = unsafe.getLong(buffer, byteArrayOffset + pos);
        pos += SIZE_OF_LONG;

        return value;
    }

    public void putLongArray(final long[] values)
    {
        putInt(values.length);

        long bytesToCopy = values.length << 3;
        unsafe.copyMemory(values, longArrayOffset,
                          buffer, byteArrayOffset + pos,
                          bytesToCopy);
        pos += bytesToCopy;
    }

    public long[] getLongArray()
    {
        int arraySize = getInt();
        long[] values = new long[arraySize];

        long bytesToCopy = values.length << 3;
        unsafe.copyMemory(buffer, byteArrayOffset + pos,
                          values, longArrayOffset,
                          bytesToCopy);
        pos += bytesToCopy;

        return values;
    }

    public void putDoubleArray(final double[] values)
    {
        putInt(values.length);

        long bytesToCopy = values.length << 3;
        unsafe.copyMemory(values, doubleArrayOffset,
                          buffer, byteArrayOffset + pos,
                          bytesToCopy);
        pos += bytesToCopy;
    }

    public double[] getDoubleArray()
    {
        int arraySize = getInt();
        double[] values = new double[arraySize];

        long bytesToCopy = values.length << 3;
        unsafe.copyMemory(buffer, byteArrayOffset + pos,
                          values, doubleArrayOffset,
                          bytesToCopy);
        pos += bytesToCopy;

        return values;
    }
}

Results
2.8GHz Nehalem - Java 1.7.0_04
==============================
0 Serialisation  write=2,517ns read=11,570ns total=14,087ns
1 Serialisation  write=2,198ns read=11,122ns total=13,320ns
2 Serialisation  write=2,190ns read=11,011ns total=13,201ns
3 Serialisation  write=2,221ns read=10,972ns total=13,193ns
4 Serialisation  write=2,187ns read=10,817ns total=13,004ns
0 ByteBuffer     write=264ns   read=273ns    total=537ns
1 ByteBuffer     write=248ns   read=243ns    total=491ns
2 ByteBuffer     write=262ns   read=243ns    total=505ns
3 ByteBuffer     write=300ns   read=240ns    total=540ns
4 ByteBuffer     write=247ns   read=243ns    total=490ns
0 UnsafeMemory   write=99ns    read=84ns     total=183ns
1 UnsafeMemory   write=53ns    read=82ns     total=135ns
2 UnsafeMemory   write=63ns    read=66ns     total=129ns
3 UnsafeMemory   write=46ns    read=63ns     total=109ns
4 UnsafeMemory   write=48ns    read=58ns     total=106ns

2.4GHz Sandy Bridge - Java 1.7.0_04
===================================
0 Serialisation  write=1,940ns read=9,006ns total=10,946ns
1 Serialisation  write=1,674ns read=8,567ns total=10,241ns
2 Serialisation  write=1,666ns read=8,680ns total=10,346ns
3 Serialisation  write=1,666ns read=8,623ns total=10,289ns
4 Serialisation  write=1,715ns read=8,586ns total=10,301ns
0 ByteBuffer     write=199ns   read=198ns   total=397ns
1 ByteBuffer     write=176ns   read=178ns   total=354ns
2 ByteBuffer     write=174ns   read=174ns   total=348ns
3 ByteBuffer     write=172ns   read=183ns   total=355ns
4 ByteBuffer     write=174ns   read=180ns   total=354ns
0 UnsafeMemory   write=38ns    read=75ns    total=113ns
1 UnsafeMemory   write=26ns    read=52ns    total=78ns
2 UnsafeMemory   write=26ns    read=51ns    total=77ns
3 UnsafeMemory   write=25ns    read=51ns    total=76ns
4 UnsafeMemory   write=27ns    read=50ns    total=77ns

Analysis

To write and read back a single relatively small object on my fast 2.4 GHz Sandy Bridge laptop can take ~10,000ns using Java Serialization, whereas when using Unsafe this can come down to well less than 100ns even accounting for the test code itself.  To put this in context, when using Java Serialization the costs are on par with a network hop!  Now that would be very costly if your transport is a fast IPC mechanism on the same system.

There are numerous reasons why Java Serialisation is so costly.  For example it writes out the fully qualified class and field names for each object plus version information.  Also ObjectOutputStream keeps a collection of all written objects so they can be conflated when close() is called.   Java Serialisation requires 340 bytes for this example object, yet we only require 185 bytes for the binary versions.  Details for the Java Serialization format can be found here.  If I had not used arrays for the majority of data, then the serialised object would have been significantly larger with Java Serialization because of the field names.  In my experience text based protocols like XML and JSON can be even less efficient than Java Serialization.  Also be aware that Java Serialization is the standard mechanism employed for RMI.

The real issue is the number of instructions to be executed.  The Unsafe method wins by a significant margin because in Hotspot, and many other JVMs, the optimiser treats these operations as intrinsics and replaces the call with assembly instructions to perform the memory manipulation.  For primitive types this results in a single x86 MOV instruction which can often happen in a single cycle.  The details can be seen by having Hotspot output the optimised code as I described in a previous article.

Now it has to be said that "with great power comes great responsibility" and if you use Unsafe it is effectively the same as programming in C, and with that can come memory access violations when you get offsets wrong.

Adding Some Context

"What about the likes of Google Protocol Buffers?", I hear you cry out.  These are very useful libraries and can often offer better performance and more flexibility than Java Serialisation.  However they are not remotely close to the performance of using Unsafe like I have shown here.  Protocol Buffers solve a different problem and provide nice self-describing messages which work well across languages.  Please test with different protocols and serialisation techniques to compare results.

Also the astute among you will be asking, "What about Endianness (byte-ordering) of the integers written?"  With Unsafe the bytes are written in native order.  This is great for IPC and between systems of the same type.  When systems use differing formats then conversion will be necessary.

How do we deal with multiple versions of a class or determining what class an object belongs to?  I want to keep this article focused but let's say a simple integer to indicate the implementation class is all that is required for a header.  This integer can be used to look up the appropriately implementation for the de-serialisation operation.

An argument I often hear against binary protocols, and for text protocols, is what about being human readable and debugging?  There is an easy solution to this.  Develop a tool for reading the binary format!

Conclusion

In conclusion it is possible to achieve the same native C/C++ like levels of performance in Java for serialising an object to-and-from a byte stream by effectively using the same techniques.  The UnsafeMemory class, for which I've provided a skeleton implementation, could easily be expanded to encapsulate this behaviour and thus protect oneself from many of the potential issues when dealing with such a sharp tool.

Now for the burning question.  Would it not be so much better if Java offered an alternative Marshallable interface to Serializable by offering natively what I've effectively done with Unsafe???

Saturday 19 May 2012

Applying Back Pressure When Overloaded

How should a system respond when under sustained load?  Should it keep accepting requests until its response times follow the deadly hockey stick, followed by a crash?  All too often this is what happens unless a system is designed to cope with the case of more requests arriving than it is capable of processing.  If we are seeing a sustained arrival rate of requests, greater than our system is capable of processing, then something has to give.  Having the entire system degrade is not the ideal service we want to give our customers.  A better approach would be to process transactions at our systems maximum possible throughput rate, while maintaining a good response time, and rejecting requests above this arrival rate.

Let’s consider a small art gallery as an metaphor.  In this gallery the typical viewer spends on average 20 minutes browsing, and the gallery can hold a maximum of 30 viewers.  If more than 30 viewers occupy the gallery at the same time then customers become unhappy because they cannot have a clear view of the paintings.  If this happens they are unlikely to purchase or return.  To keep our viewers happy it is better to recommend that some viewers visit the café a few doors down and come back when the gallery is less busy.  This way the viewers in the gallery get to see all the paintings without other viewers in the way, and in the meantime those we cannot accommodate enjoy a coffee.  If we apply Little’s Law we cannot have customers arriving at more than 90 per hour, otherwise the maximum capacity is exceeded.  If between 9:00-10:00 they are arriving at 100 per hour, then I’m sure the café down the road will appreciate the extra 10 customers.

Within our systems the available capacity is generally a function of the size of our thread pools and time to process individual transactions.  These thread pools are usually fronted by queues to handle bursts of traffic above our maximum arrival rate.  If the queues are unbounded, and we have a sustained arrival rate above the maximum capacity, then the queues will grow unchecked.  As the queues grow they increasingly add latency beyond acceptable response times, and eventually they will consume all memory causing our systems to fail.  Would it not be better to send the overflow of requests to the café while still serving everyone else at the maximum possible rate?  We can do this by designing our systems to apply “Back Pressure”.

Figure 1.

Separation of concerns encourages good systems design at all levels.  I like to layer a design so that the gateways to third parties are separated from the main transaction services.  This can be achieved by having gateways responsible for protocol translation and border security only.  A typical gateway could be a web container running Servlets.  Gateways accept customer requests, apply appropriate security, and translate the channel protocols for forwarding to the transaction service hosting the domain model.  The transaction service may use a durable store if transactions need to be preserved.  For example, the state of a chat server domain model may not require preservation, whereas a model for financial transactions must be kept for many years for compliance and business reasons.

Figure 1. above is a simplified view of the typical request flow in many systems.  Pools of threads in a gateway accept user requests and forward them to a transaction service.  Let’s assume we have asynchronous transaction services fronted by an input and output queues, or similar FIFO structures.  If we want the system to meet a response time quality-of-service (QoS) guarantee, then we need to consider the three following variables:
  1. The time taken for individual transactions on a thread
  2. The number of threads in a pool that can execute transactions in parallel
  3. The length of the input queue to set the maximum acceptable latency
    max latency = (transaction time / number of threads) * queue length
    queue length = max latency / (transaction time / number of threads)

By allowing the queue to be unbounded the latency will continue to increase.  So if we want to set a maximum response time then we need to limit the queue length.

By bounding the input queue we block the thread receiving network packets which will apply back pressure up stream.  If the network protocol is TCP, similar back pressure is applied via the filling of network buffers, on the sender.  This process can repeat all the way back via the gateway to the customer.  For each service we need to configure the queues so that they do their part in achieving the required quality-of-service for the end-to-end customer experience.

One of the biggest wins I often find is to improve the time taken to process individual transaction latency.  This helps in the best and worst case scenarios.

Worst Case Scenario

Let’s say the queue is unbounded and the system is under sustained heavy load.  Things can begin to go wrong very quickly in subtle ways before memory is exhausted.  What do you think will happen when the queue is larger than the processor cache?  The consumer threads will be suffering cache misses just at the time when they are struggling to keep up, thus compounding the problem.  This can cause a system to get into trouble very quickly and eventually crash.  Under Linux this is particularly nasty because malloc, or one of its friends, will succeed because Linux allows “Over Commit” by default, then later at the point of using that memory, the OOM Killer will start shooting processes. When the OS starts shooting processes, you just know things are not going to end well!

What About Synchronous Designs?

You may say that with synchronous designs there are no queues.  Well not such obvious ones.  If you have a thread pool then it will have a lock, or semaphore, wait queues to assign threads.  If you are crazy enough to allocate a new thread on every request, then once you are over the huge cost of thread creation, your thread is in the run queue for a processor to execute.  Also, these queues involve context switches and condition variables which greatly increase the costs.  You just cannot run away from queues, they are everywhere!  Best to embrace them and design for the quality-of-service your system needs to deliver to its customers.  If we must have queues, then design for them, and maybe choose some nice lock-free ones with great performance.

When we need to support synchronous protocols like REST then use back pressure, signalled by our full incoming queue at the gateway, to send a meaningful “server busy” message such as the HTTP 503 status code.  The customer can then interpret this as time for a coffee and cake at the café down the road.

Subtleties To Watch Out For...

You need to consider the whole end-to-end service.  What if a client is very slow at consuming data from your system?  It could tie up a thread in the gateway taking it out of action.  Now you have less threads working the queue so the response time will be increasing.  Queues and threads need to be monitored, and appropriate action needs to be taken when thresholds are crossed.  For example, when a queue is 70% full, maybe an alert should be raised so an investigation can take place?  Also, transaction times need to be sampled to ensure they are in the expected range.

Summary

If we do not consider how our systems will behave when under heavy load then they will most likely seriously degrade at best, and at worst crash.  When they crash this way, we get to find out if there are any really evil data corruption bugs lurking in those dark places.  Applying back pressure is one effective technique for coping with sustained high-load, such that maximum throughput can be delivered without degrading system performance for the already accepted requests and transactions.