Concurrency patterns – Active Object
Active Object pattern decouples method execution from its invocation. Think asynchronous method invocation, callbacks etc. To avoid race conditions, incoming client requests are queued and handled by a scheduler. The scheduler picks a queued object and makes it run its logic. It is object’s responsibility to know what to do when it gets invoked, hence the Active Object.
Application
- Android – asynchronous background service sending messages to the UI thread
- Any kind of a queue / message delivery system
Key Components
- Proxy: provides interface the clients can use to submit their requests
- Activation List: a queue of pending client requests
- Scheduler: decides which request to execute next
- Active Object: implements the core business logic
- Callback: contains execution result (i.e. a promise or a future)
Advantages
-
- Reduced code complexity: Once pattern’s mechanics are in place, the code can be treated as single-threaded.
- No need for additional synchronization: Concurrent requests are serialized and handled by a single internal thread
Drawbacks
-
- Performance overhead: Sophisticated scheduling, spinning and request handling can be expensive in terms of memory and can lead to non-trivial context switching.
- Programming overhead: Active Object essentially requires you to create a small framework. It can definitely be kept self-contained enough, but it boils down to a simple the fact that you need to be aware of multiple components:
- Activation List – the queue of incoming requests
- Callback – yields the results
- Scheduler thread – watches for incoming requests
- Scheduler implementation – enqueues requests
- Proxy – client interface allowing to submit requests
- Future – an asynchronous response
Example Implementation
public class ThreadSafeCounter implements Counter { private long value; .. }
ThreadUnsafeCounter.java represents a naive implementation which fails to handle concurrent access. The failure is proved by a multi-threaded test:
public class ThreadUnsafeCounterMultiThreadedTest { .. Counter counter = new ThreadUnsafeCounter(INITIAL_VALUE); .. // Note that a test failure is expected @Test(expected = AssertionError.class) public void incrementAndGet() { testExecutor.runTest(incrementAndGetCommand); assertEquals(getExpectedIncrementedValue(), counter.get()); } .. }
On the other hand ThreadSafeCounter.java relies on the Active Object pattern when handling concurrent requests:
public class ThreadSafeCounter implements Counter { // The internal state, subject to race conditions. private long value; // Activation List: incoming requests (tasks) // are put into a queue private BlockingQueue<Callable<Long>> taskQueue = new LinkedBlockingQueue<>(); // Callback: provides access to the calculated results // (incrementAndGet, etc.) private BlockingQueue<Long> resultQueue = new LinkedBlockingQueue<>(); // Scheduler: a dedicated thread created and started // when the counter is instantiated public ThreadSafeCounter(long value) { .. new Thread(new Runnable() { @Override public void run() { while (true) { // Constantly watching for incoming requests .. } } }).start(); } .. // Proxy: allows the clients to submit new tasks private long enqueueTask(Callable<Long> task) {..} }
// This is the actual task scheduler. It only allows for a single task at a time. ExecutorService executorService = Executors.newSingleThreadExecutor(); .. // At some point in the future the counter's new value will be available Future<Long> future = executorService.submit(taskQueue.take()); .. // Meanwhile, the client is blocked until the result is ready while (true) { Long result = resultQueue.poll(500, TimeUnit.MILLISECONDS); if (result != null) break; } ..
Source Code