This chapter would be useful for anyone wanting to know what is the right way to test a multi-threaded program.

It's not very uncommon that in a technical interview, the interviewer would ask you to implement the algorithm for a given problem and then would ask you how you would test the code to make sure your algorithm and data structures work correctly in multi-threaded environment. When it comes to testing a concurrent programming, at the very minimum you test for invariants, pre-conditions and post-conditions. I have seen candidates performing brilliantly in implementing the algorithm with appropriate efficient data structures and even extending the algorithm for multi-threaded environment and making the code thread-safe, but horribly failing at writing the tests that would properly verify the performance and safety of the concurrent program.

So, in this chapter we will dig deeper into how to test a code that would be accessed concurrently by multiple threads.

When asked "how would you test a class and/or method for concurrency?", most candidate would say (and implement) something like: spawn multiple threads and call the method from each of them and then check if the post-conditions and invariants hold true.
BUT, this is an INCORRECT way to test concurrent programs. It is because in the worst case it may so happen that the threads start in different time in a way that made the process sequential what was expected to be concurrent.
The problem here is test code written in aforementioned way does not guarantee INTERLEAVING of the threads. To guarantee maximum interleaving, we need to start the threads at the same time.


When it comes to testing concurrent program there happens to be two aspects of testing:
  1. Testing for Correctness: This is where we test for correctness of invariants and post-conditions. Most unit tests we write tests for correctness.
  2. Testing for Safety: This is very specific to testing concurrent programs. This is where we test if a concurrent class performs correctly and as expected under unpredictable concurrent access, and heavy traffic load.
    To achieve this we need to do two things:
    • First, we need to set up multiple threads calling the method under test for some duration of time
    • Once the first step is complete, test if nothing went wrong.


Our focus here is testing for safety, since testing for correctness for concurrent programming is same as that in sequential programming. But testing for safety is very specific to testing concurrent code and can be very tricky if you don't know how to do it in the correct way. Let's demystify this right now so that you are not caught off guard when you face a situation that demands that you have a prior knowledge of testing concurrent code.

In short, today we will learn how to create maximum interleaving between the threads, because that is what creates the perfect environment for testing concurrent programs for safety.

Depending on your platform, creating and starting a thread can be a moderately heavyweight operation. If your thread is short running and you start a number of threads in a loop, the threads run sequentially rather than concurrently in the worst case. Even in the not-quite-worst case, the fact that the first thread has a head start on the others means that you may get fewer interleavings than expected: the first thread runs by itself for some amount of time, and then the first two threads run concurrently for some amount of time, and only eventually are all the threads running concurrently. (The same thing happens at the end of the run: the threads that got a head start also finish early.)

The problem we are trying to solve here is "how to make sure that all the threads start at the same time and possibly end at the same time too, creating maximum overlap or interleaving". The solution to this problem is to use Synchronizers, like CountDownLatch or CyclicBarriers. Please keep in mind that this is very specific to Java, but in almost every high-level programming language you would have something similar that leverages the concept introduced in here in this chapter.

CountDownLatch can be used as a starting gate and another as a finish gate. Alternatively, we can use CyclicBarrier, initialized with the number of worker threads plus one, and have the worker threads and the test driver wait at the barrier at the beginning and end of their run. This ensures that all threads are up and running before any start working. The code below uses this technique to coordinate starting and stopping the worker threads, creating more potential concurrent interleavings. We still can't guarantee that the scheduler won't run each thread to completion sequentially, but making the runs long enough reduces the extent to which scheduling distorts our results.

The final trick employed by the below code is to use a deterministic termination criterion so that no additional inter-thread coordination is needed to figure out when the test is finished. The test method starts exactly as many producers as consumers and each of them puts or takes the same number of elements, so the total number of items added and removed is the same.

Tests like this tend to be good at finding safety violations. For example, a common error in implementing semaphore controlled buffers is to forget that the code actually doing the insertion and extraction requires mutual exclusion (using synchronized or ReentrantLock). A sample run of the below test with a version of BoundedBuffer that omits making doInsert and doExtract synchronized fails fairly quickly. Running the test with a few dozen threads iterating a few million times on buffers of various capacity on various systems increases our confidence about the lack of data corruption in put and take.

Tests should be run on multiprocessor systems to increase the diversity of potential interleavings. However, having more than a few CPUs does not necessarily make tests more effective. To maximize the chance of detecting timing-sensitive data races, there should be more active threads than CPUs, so that at any given time some threads are running and some are switched out, thus reducing the predictability of interactions between threads.

In tests that run until they complete a fixed number of operations, it is possible that the test case will never finish if the code being tested encounters an exception due to a bug. The most common way to handle this is to have the test framework abort tests that do not terminate within a certain amount of time; how long to wait should be determined empirically, and failures must then be analyzed to ensure that the problem wasn't just that you didn't wait long enough. (This problem is not unique to testing concurrent classes; sequential tests must also distinguish between long-running and infinite loops.)

To correctly testing a concurrent code, generating as much INTERLEAVINGS of the threads as possible is the key.




We will be testing a class called BoundedBuffer. Let's see what this class does.

BoundedBuffer implements a fixed-length array-based queue with blocking put and take methods controlled by a pair of counting semaphores. The availableItems semaphore represents the number of elements that can be removed from the buffer, and is initially zero (since the buffer is initially empty). Similarly, availableSpaces represents how many items can be inserted into the buffer, and is initialized to the size of the buffer.

A take operation first requires that a permit be obtained from availableItems. This succeeds immediately if the buffer is nonempty, and otherwise blocks until the buffer becomes nonempty. Once a permit is obtained, the next element from the buffer is removed and a permit is released to the availableSpaces semaphore. The put operation is defined conversely, so that on exit from either the put or take methods, the sum of the counts of both semaphores always equals the bound. In practice, if you need a bounded buffer you should use ArrayBlockingQueue or LinkedBlockingQueue rather than rolling your own, but the technique used here illustrates how insertions and removals can be controlled in other data structures as well.


public class BoundedBuffer {
    private final Semaphore availableItems, availableSpaces;
    private final E[] items;
    private int putPosition = 0, takePosition = 0;

    public BoundedBuffer(int capacity) {
        availableItems = new Semaphore(0);
        availableSpaces = new Semaphore(capacity);
        items = (E[]) new Object[capacity];
    }

    public boolean isEmpty() {
        return availableItems.availablePermits() == 0;
    }

    public boolean isFull() {
        return availableSpaces.availablePermits() == 0;
    }

    public void put(E x) throws InterruptedException {
        availableSpaces.acquire();
        doInsert(x);
        availableItems.release();
    }

    public E take() throws InterruptedException {
        availableItems.acquire();
        E item = doExtract();
        availableSpaces.release();
        return item;
    }

    private synchronized void doInsert(E x) {
        int i = putPosition;
        items[i] = x;
        putPosition = (++i == items.length) ? 0 : i;
    }

    private synchronized E doExtract() {
        int i = takePosition;
        E x = items[i];
        items[i] = null;
        takePosition = (++i == items.length) ? 0 : i;
        return x;
    }
}



Now comes the testing part. As mentioned earlier we would be focusing on testing for thread safety. For BoundedBuffer class, that would mean that we should perform put and take operation on the BoundedBuffer concurrently from multiple threads for quite sometime (think of it as stress test) and would see if the put and take operations are having expected behavior in the multi-threaded environment, when called from different threads at the same time. One way to test it would be to put random variables in the BoundedBuffer from different threads and keep track of their sum, and perform take operation from multiple threads and as we remove the items from BoundedBuffer we keep track of their sum as well. If the implementation of BoundedBuffer is thread-safe, we get get same sum at the end for take operations performed and put operations performed.

Carefully notice how we are using CyclicBarrier to make sure that all the threads start at the same time and end at the same time to ensure maximum interleaving.

public class PutTakeTest {
    private static final ExecutorService pool = Executors.newCachedThreadPool();
    private final AtomicInteger putSum = new AtomicInteger(0); // keeps track of the sum of all the items put in BoundedBuffer
    private final AtomicInteger takeSum = new AtomicInteger(0); // keeps track of the sum of all the items removed from BoundedBuffer
    private final CyclicBarrier barrier; // CyclicBarrier will ensure that all the threads will start and end at the same time to ensure maximum interleaving creating an ideal environment for thread-safety testing
    private final BoundedBuffer<Integer> bb; // the BoundedBuffer we would be testing
    private final int nTrials; 
    private final int nPairs; // number of threads we need to spawn for each of producer and consumer
    // producer threads will be putting items in the bounded buffer
    // consumer threads will be removing items from bounded buffer
    // so total number of threads = npair producer threads + npair consumer threads = npair * 2

    public static void main(String[] args) {
        new PutTakeTest(10, 10, 100000).test(); // sample parameters
        pool.shutdown(); // after the test is done, don't forget to shutdown the thread pool
    }

    PutTakeTest(int capacity, int npairs, int ntrials) {
        this.bb = new BoundedBuffer<Integer>(capacity);
        this.nTrials = ntrials;
        this.nPairs = npairs;
        this.barrier = new CyclicBarrier(npairs * 2 + 1); // we have + 1 for the main thread
                        // we have npair * 2 because we have npair
                        // of consumers and npair of producers
    }

    // actual test method that tests for safety
    void test() {
        // Arrange
        try {
            for (int i = 0; i < nPairs; i++) {
                // Act
                pool.execute(new Producer()); // spawn producer threads to put items in bounded buffer
                pool.execute(new Consumer()); // spawn consumer threads to remove items from bounded buffer
            }

            // Make sure we have maximum interleaving to create ideal test environment
            barrier.await(); // wait for all threads to be ready
                    // This is main thread calling await()
                    // This is why we have plus one in npairs * 2 + 1 in the parameter
                    // passed to CyclicBarrier

            barrier.await(); // wait for all threads to finish
                    // This is main thread calling await()
                    // This is why we have plus one in npairs * 2 + 1 in the parameter
                    // passed to CyclicBarrier.
                    // Keep in mind that this call is for waiting for all the threads to finish

            // Assert
            assertEquals(putSum.get(), takeSum.get()); // putSum and takeSum should be equal
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }


    class Producer implements Runnable {
        public void run() {
            try {
                int seed = (this.hashCode() ^ (int)System.nanoTime());
                int sum = 0;
                barrier.await(); // wait for all threads to be ready
                for (int i = nTrials; i > 0; --i) {
                    bb.put(seed);
                    sum += seed;
                    seed = randomize(seed); // generate another random to put in the bounded buffer
                }
                putSum.getAndAdd(sum);
                barrier.await(); // wait for all threads to finish
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        // generate a random number
        private int randomize(int y) {
            y ^= (y << 6);
            y ^= (y >>> 21);
            y ^= (y << 7);
            return y;
        }
    }
    class Consumer implements Runnable {
        public void run() {
            try {
                barrier.await(); // wait for all threads to finish
                int sum = 0;
                for (int i = nTrials; i > 0; --i) {
                 sum += bb.take();
                }
                takeSum.getAndAdd(sum);
                barrier.await(); // wait for all threads to finish
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
}



In the above code, look how we are passing (npair * 2 + 1) as parameter to CyclicBarrier. barriers work as follows:
  • Threads call await when they reach the barrier point, and await blocks until all the threads have reached the barrier point. If all threads meet at the barrier point, the barrier has been successfully passed, in which case all threads are released and the barrier is reset so it can be used again.

The above situation happens twice in our test code : once when the threads are getting started and once when the threads are finishing. While starting, when all the threads are ready (all of them have called await()) the barrier releases all the threads so that they can start executing and the barrier gets reset and is ready to be used again. Now when the threads are finishing the same thing happens: they call await but they cannot finish till the barrier releases them. Once all the threads have finished executing and have called await() the barrier releases all of them so that they can end. This way we are creating maximum interleaving for the (npair * 2) number of producer and consumer threads.

Extras:


You can stop reading this chapter here, if you only want to know about testing for thread-safety (discussed above). But, if you want to know more and want to see how the basic unit tests for BoundedBuffer would look like, then feel free to tag along.

Basic Unit Tests:

The most basic unit tests for BoundedBuffer are similar to what we’d use in a sequential context—create a bounded buffer, call its methods, and assert postconditions and invariants. Some invariants that quickly come to mind are that a freshly created buffer should identify itself as empty, and also as not full. A similar but slightly more complicated safety test is to insert N elements into a buffer with capacity N (which should succeed without blocking), and test that the buffer recognizes that it is full (and not empty). You would need JUnit Test Framework.


class BoundedBufferTest extends TestCase {

    void testIsEmptyWhenConstructed() {
        BoundedBuffer<Integer> bb = new BoundedBuffer<>(10);

        assertTrue(bb.isEmpty());
        assertFalse(bb.isFull());
    }

    void testIsFullAfterPuts() throws InterruptedException {
        BoundedBuffer<Integer> bb = new BoundedBuffer<>(10);
        for (int i = 0; i < 10; i++) {
            bb.put(i);
        }

        assertTrue(bb.isFull());
        assertFalse(bb.isEmpty());
    }
}



Testing Blocking Operations:

Tests of essential concurrency properties require introducing more than one thread. Most testing frameworks are not very concurrency-friendly: they rarely include facilities to create threads or monitor them to ensure that they do not die unexpectedly. If a helper thread created by a test case discovers a failure, the framework usually does not know with which test the thread is associated, so some work may be required to relay success or failure information back to the main test runner thread so it can be reported.

If a method is supposed to block under certain conditions, then a test for that behavior should succeed only if the thread does not proceed. Testing that a method blocks is similar to testing that a method throws an exception; if the method returns normally, the test has failed.

Testing that a method blocks introduces an additional complication: once the method successfully blocks, you have to convince it somehow to unblock. The obvious way to do this is via interruption—start a blocking activity in a separate thread, wait until the thread blocks, interrupt it, and then assert that the blocking operation completed. Of course, this requires your blocking methods to respond to interruption by returning early or throwing InterruptedException. The “wait until the thread blocks” part is easier said than done; in practice, you have to make an arbitrary decision about how long the few instructions being executed could possibly take, and wait longer than that. You should be prepared to increase this value if you are wrong (in which case you will see spurious test failures).

The below shows an approach to testing blocking operations. It creates a “taker” thread that attempts to take an element from an empty buffer. If take succeeds, it registers failure. The test runner thread starts the taker thread, waits a long time, and then interrupts it. If the taker thread has correctly blocked in the take operation, it will throw InterruptedException, and the catch block for this exception treats this as success and lets the thread exit. The main test runner thread then attempts to join with the taker thread and verifies that the join returned successfully by calling Thread.isAlive; if the taker thread responded to the interrupt, the join should complete quickly.

The timed join ensures that the test completes even if take gets stuck in some unexpected way. This test method tests several properties of take—not only that it blocks but that, when interrupted, it throws InterruptedException. This is one of the few cases in which it is appropriate to subclass Thread explicitly instead of using a Runnable in a pool: in order to test proper termination with join. The same approach can be used to test that the taker thread unblocks after an element is placed in the queue by the main thread.

It is tempting to use Thread.getState to verify that the thread is actually blocked on a condition wait, but this approach is not reliable. There is nothing that requires a blocked thread ever to enter the WAITING or TIMED_WAITING states, since the JVM can choose to implement blocking by spin-waiting instead. Similarly, because spurious wakeups from Object.wait or Condition.await are permitted, a thread in the WAITING or TIMED_WAITING state may temporarily transition to RUNNABLE even if the condition for which it is waiting is not yet true. Even ignoring these implementation options, it may take some time for the target thread to settle into a blocking state. The result of Thread.getState should not be used for concurrency control, and is of limited usefulness for testing—its primary utility is as a source of debugging information.


void testTakeBlocksWhenEmpty() {
    final BoundedBuffer<Integer> bb = new BoundedBuffer<>(10);
    Thread taker = new Thread() {
        public void run() {
            try {
                int unused = bb.take();
                fail(); // if we get here, it’s an error
            } catch (InterruptedException success) { }
        }};

    try {
        taker.start();
        Thread.sleep(LOCKUP_DETECT_TIMEOUT);
        taker.interrupt();
        taker.join(LOCKUP_DETECT_TIMEOUT);
        assertFalse(taker.isAlive());
    } catch (Exception unexpected) {
    fail();
    }
}



Must Read:



Recommended Read:





Instructor:



If you have any feedback, please use this form: https://thealgorists.com/Feedback.



Help Your Friends save 40% on our products

wave