Concurrency patterns – Active Object

Active Object pattern decouples method execution from its invocation. Think asynchronous method invocation, callbacks etc. To avoid race conditions, incoming client requests are queued and handled by a scheduler. The scheduler picks a queued object and makes it run its logic. It is object’s responsibility to know what to do when it gets invoked, hence the Active Object.

Application

  • Android – asynchronous background service sending messages to the UI thread
  • Any kind of a queue / message delivery system
An important aspect is that each of the invoked objects resides in its own thread of control. The scheduler guarantees a serialised access to the queued tasks.

Key Components

  • Proxy: provides interface the clients can use to submit their requests
  • Activation List: a queue of pending client requests
  • Scheduler: decides which request to execute next
  • Active Object: implements the core business logic
  • Callback: contains execution result (i.e. a promise or a future)

 

Advantages

    • Reduced code complexity: Once pattern’s mechanics are in place, the code can be treated as single-threaded.
  • No need for additional synchronization: Concurrent requests are serialized and handled by a single internal thread

 

Drawbacks

    • Performance overhead: Sophisticated scheduling, spinning and request handling can be expensive in terms of memory and can lead to non-trivial context switching.
  • Programming overhead: Active Object essentially requires you to create a small framework. It can definitely be kept self-contained enough, but it boils down to a simple the fact that you need to be aware of multiple components:
  • Activation List – the queue of incoming requests
  • Callback – yields the results
  • Scheduler thread – watches for incoming requests
  • Scheduler implementation – enqueues requests
  • Proxy – client interface allowing to submit requests
  • Future – an asynchronous response

 

Example Implementation

A simple counter implementing a sub-set of the AtomicLong. The counter keeps its internal state which is then a subject to race conditions:
public class ThreadSafeCounter implements Counter {
   private long value;
   ..
}
The challenge is to ensure the counter consistently yields the correct results, even when many threads access and modify counter’s intrinsic value.

ThreadUnsafeCounter.java represents a naive implementation which fails to handle concurrent access. The failure is proved by a multi-threaded test:
public class ThreadUnsafeCounterMultiThreadedTest {
   ..
   Counter counter = new ThreadUnsafeCounter(INITIAL_VALUE);
   ..
   // Note that a test failure is expected
   @Test(expected = AssertionError.class)
   public void incrementAndGet() {
     testExecutor.runTest(incrementAndGetCommand);
     assertEquals(getExpectedIncrementedValue(), counter.get());
   }
   ..
}

On the other hand ThreadSafeCounter.java relies on the Active Object pattern when handling concurrent requests:

public class ThreadSafeCounter implements Counter {
   // The internal state, subject to race conditions.
   private long value;
   // Activation List: incoming requests (tasks)
   // are put into a queue
   private BlockingQueue<Callable<Long>> taskQueue =
                 new LinkedBlockingQueue<>();
   // Callback: provides access to the calculated results
   // (incrementAndGet, etc.)
   private BlockingQueue<Long> resultQueue =
                 new LinkedBlockingQueue<>();
   // Scheduler: a dedicated thread created and started
   // when the counter is instantiated
   public ThreadSafeCounter(long value) {
     ..
     new Thread(new Runnable() {
       @Override
       public void run() {
         while (true) {
           // Constantly watching for incoming requests
           ..
         }
       }
     }).start();
   }
   ..
   // Proxy: allows the clients to submit new tasks
   private long enqueueTask(Callable<Long> task) {..}
}
The implementation offloads the actual task scheduling to the Executor framework. The execution results are handled asynchronously via futures. For simplicity, I chose to block the clients until the results become available. Still in the ThreadSafeCounter.java:
// This is the actual task scheduler. It only allows for a single task at a time.
ExecutorService executorService = Executors.newSingleThreadExecutor();
..
// At some point in the future the counter's new value will be available
Future<Long> future = executorService.submit(taskQueue.take());
..
// Meanwhile, the client is blocked until the result is ready
while (true) {
  Long result = resultQueue.poll(500, TimeUnit.MILLISECONDS);
  if (result != null) break;
}
..
Source Code

Resources

 

Similar Posts