Wednesday, 28 September 2011

Concurrent Request chaining.

This is not going to be another definition of what multi-threading or concurrent process execution is, but to findout if you've been another culprit of concurrent request chaining. So what is Concurrent Request Chaining? a simple and perhaps most basic defintion is HTTP GET/POST. The protocol provides a mechanism to accept and process multiple requests via the same session using chaining. As you would appreciate, there are several techniques and built-in objects and types to employ in a multi-threading execution enviroment, to allow a single thread at a time into a particular section of request block thus allowing protection to example variables(make it thread safe) or data from being corrupted by simultaneous modifications from different request coming from the same session.

Although these multi-threaded objects and types provides the answer to the scenario described above, they are not the answer to all business rules. And hence do require proper usage to attain the correct locking for specific instances.

Let's take this business rule for example: A deposit function should be synchronized, if calls to the function are coming from the same account number. This rule is similar to the underlying HTTP request chain explained above.

Noticed there are several approaches to this rule, but how do you provide a robust solution to this, using the concurrent objects and types provided by your JDK? I suppose you've started to think through dozens of possible canditate solutions already. Does each approach work well for the scenario, is the approach lightweight enough?

A classic solution to this will be to join the threads, which if not done properly open doors for dreadful resource issues. Another solution to this will be to use the java.util.concurrent.locks.ReentrantLock and java.util.HashMap to hold a queue of objects to process. This approach gives you the flexibility to lock each process for a specific period before reentrant.
  private Map<Long, ReentrantLock> queue= new HashMap<Long, ReentrantLock>();
  public void AccountDeposit(Long accountNumber)
  {
    addWork(accountNumber);

    ReentrantLock lock = queue.get(accountNumber);
    try
    {
      if (lock.tryLock(1, TimeUnit.MINUTES))
      {
        // Perform Deposit Routine
      }
    }
    catch (InterruptedException e)
    {
      //do some clean up
    }
    finally
    {
      lock.unlock();
    }
  }

  private void addWork(Long accountNumber)
  {
    if (!queue.keySet().contains(accountNumber))
    {
      queue.put(accountNumber, new ReentrantLock());
    }
  }
But if you look closely, the above technique is avoiding adding same Account Number to the queue. If I were you, I will correct this so the queue can hold duplicate customer account numbers to avoid potential deposit calls from been ignored. Here again I will ask you to take very close look, is this approach robust enough? will this be able to handle various data source requests without any potential resource issues in the near future?

Another solution is to manage the pool of threads. This gives you a more flexible way of asking a thread to hold a queue of specific Account Number. Put the deposit and its member objects in a separate instance worker thread, this makes the member objects local to a specific execution unit. Let's take below worker class as an example.
public class DepositHandlerThread extends Thread
{
  public long busyStartTime;
  public static final int MAX_SIZE = 10;
  public final AtomicInteger idUnderProcess = new AtomicInteger();
  private final BlockingQueue<DepositToken> queue = new ArrayBlockingQueue<DepositToken>(MAX_SIZE);

  public static class DepositToken
  {
    private int clientId;

    public DepositToken(int clientId)
    {
      this.clientId = clientId;
    }
  }

  public DepositHandlerThread()
  {
  }

  public void run()
  {
    DepositToken token;
    try
    {
      while ((token =queue.take()) != null)
      {
        // perform the deposit routine
        idUnderProcess.set(token.clientId);
        doDeposit(token);

        if (!queue.isEmpty())
        {
          busyStartTime = 0;
        }
      }
    }
    catch (InterruptedException e)
    {}

  }

  public void doDeposit(DepositToken tokekn)
  {
    /** Some routine here to perform deposit with Token holding client id **/
  }

  public void addWork(DepositToken token)
  {
    try
    {
      busyStartTime = System.currentTimeMillis();
      queue.put(token);
    }
    catch (InterruptedException e)
    {
      throw new AssertionError(e);
    }
  }
}
It accepts and process requests as and when they arrive. Within the controller class which accepts/fetch and process the Deposit Request, you can then manage a pool of worker threads by assigning tasks to avoid same Account Number request executed by seperate threads. This ensures that, a worker currently dealing with a subjected Account Number, has the pile which does not overlaps the execution process, using thread-safe non-blocking FIFO queue. See below example:

Controller class:

  class DepositHandler extends Thread
  {
    int tc;
    private DepositHandlerThread threadPool[];

    public DepositHandler()
    {
      // start pool
      tc = 3;
      threadPool = new DepositHandlerThread[tc];
      for (int i = 0; i < tc; i++)
      {
        threadPool[i] = new DepositHandlerThread();
        threadPool[i].start();
      }
      Thread.yield();
    }

    public void run()
    {
    
    while(/**You have data available from or at some data source**/)
     {
      // I will simulate the situation with a static value
      final Long accountNumber= 123343460;
      boolean taskAssigned = false;

      // Control the pool of threads
      for (int i = 0; i < tc; i++)
      {
        // Check to make sure a live thread is currently working with
        // the ID in hand and if so add the new task to its pile.
        if ((threadPool[i].busyStartTime != 0) && (threadPool[i].isAlive()) && threadPool[i].accountNoUnderProcess .get() == accountNumber)
        {
          // add the new client AccountNumber to queue
          threadPool[i].addWork(new DepositToken(accountNumber));
          taskAssigned = true;
          break;
        }
      }

      if (!taskAssigned)
      {
        int c = 0;
        do
        {
          if (threadPool[c].busyStartTime == 0)
          {
            if (threadPool[c].isAlive())
            {
              // Add works to idle thread
              threadPool[c].addWork(new DepositToken(accountNumber));
              break;
            }
          }
          c++;
        }
        while (c < tc);
      }
     }
    }
  }
The approach exhibited above is transparent, avoid overlaps and future proven, to say the least, it does not force any excessive locking on the process and therefore provide a lightweight thread management process to handle the scenario at hand.

Disclaimer
The code above has not been tested, it has been written to demonstrate request chaining.

No comments:

Post a Comment