Tuesday, November 05, 2013

Self Monitoring Processor Design for a State Machine Implementation

Self Monitoring Processor Design for a FSM

Using the Runnable and Callable interface as well as the Java 6 ThreadPool classes to create a class design that not only executes a task, but can monitor the execution progress periodically Here is how the interface looks like. Note that each processor has an associated ‘State’ and each state has a status - like ex scheduled, started, awaiting_async, success , fail ,timeout etc. This is because the IProcessor class is also used to implement a state machine.I won't be going through this part in more detail here.
public interface IProcessor extends Runnable,
  Callable {

  ProcessingStatus getCurrentStatus();
ProcessingState getCurrentState();

ProcessingStatus execute();
Future getTask();

void setScheduledFutureHandle(ScheduledFuture handle);

Now we define an Abstract class that does the basic work

public abstract class Processor implements IProcessor,
  Comparable {

//Method which does the actual work
public ProcessingStatus call() throws InterruptedException {
//Actual execution is delegated to the subclass
currentStatus = execute();
//Method which is called from the Scheduled Thread Pool , and which  //monitors the state of the task; ex if it is running beyond timeout, //putting it to timeout etc , basically Task Status Monitoring
public void run() {
All other different Processor classes will just have to inherit and implement the execute method
public class ProcessorStateX extends Processor {

  * execute method will be called as part of the processor execution
 public ProcessingStatus execute() {
               //do some StateX related work
Now these tasks will be created and pushed to a Input queue and from there a controlling class will take this and add this to a Thread Pool; Let us see how it is done
public class StateController{

private ExecutorService fxdpool = null;
private ScheduledExecutorService pool = null;

//Create the ThreadPools
public void startStateController(){

this.fxdpool = Executors.newFixedThreadPool(FIXED_THREADPOOL_COUNT_2);
this.pool = Executors.newScheduledThreadPool(SCHEDULED_THREADPOOL_COUNT_4);

//The method that takes from the input queue and adds it to the ThreadPool
//@processor is what is present in the Task Completed queue
private void pollTaskQueueAndExecute(IProcessor processor) throws InterruptedException {

//This passes in the current completed (success or failed ) processor and looks up a state flow xml to create a next processor that represents the next state in the State Machine
taskProcessor = getNextProcessor(processor);

  //send it for execution
  //Will call Processor::call method
Future unusedFuture = fxdpool.
              submit((Runnable) taskProcessor.getTask());

 if (unusedFuture.isCancelled()) {
       log.info("Task has been cancelled :"
       + taskProcessor.getTask());
  //Create a scheduled checker for the task- Task Self monitoring
  // Will call Processor::run method
  final ScheduledFuture handle = pool.scheduleAtFixedRate(
      taskProcessor, 0,      taskProcessor.getPeriodicExecutionInterval(),

Thats about it. And here is the method in the SiteController that causes the transition between different states, which basically implements the FSM.

Total Pageviews