/**
 * ProcessManager.java
 * Created Jan 9, 2007
 * Copyright (C) Tandberg Television 2007
 */
package com.tandbergtv.workflow.driver.internal;

import static com.tandbergtv.workflow.core.ProcessStatus.ACTIVE;
import static com.tandbergtv.workflow.core.ProcessStatus.BRANCHED;
import static com.tandbergtv.workflow.core.ProcessStatus.PAUSED;
import static com.tandbergtv.workflow.core.ProcessStatus.QUEUED;
import static com.tandbergtv.workflow.core.event.ColleaguePriority.LOW;
import static com.tandbergtv.workflow.core.service.ServiceEvents.STARTED;
import static com.tandbergtv.workflow.core.service.ServiceEvents.STARTING;
import static com.tandbergtv.workflow.core.service.ServiceEvents.STOPPED;
import static com.tandbergtv.workflow.core.service.ServiceEvents.STOPPING;
import static com.tandbergtv.workflow.driver.event.WorkflowProcessEventType.CACHE_INITED;

import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.ResourceBundle;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.log4j.Logger;
import org.jbpm.graph.def.ProcessDefinition;
import org.jbpm.graph.exe.Token;
import org.jbpm.taskmgmt.def.Task;

import com.tandbergtv.workflow.core.CustomToken;
import com.tandbergtv.workflow.core.Datatype;
import com.tandbergtv.workflow.core.ProcessPriority;
import com.tandbergtv.workflow.core.ResourceGroupAware;
import com.tandbergtv.workflow.core.TaskVariable;
import com.tandbergtv.workflow.core.WorkflowProcess;
import com.tandbergtv.workflow.core.event.ColleaguePriority;
import com.tandbergtv.workflow.core.event.DefaultMediator;
import com.tandbergtv.workflow.core.event.IMediator;
import com.tandbergtv.workflow.core.event.WorkflowEvent;
import com.tandbergtv.workflow.core.service.ServiceEvent;
import com.tandbergtv.workflow.core.service.ServiceRegistry;
import com.tandbergtv.workflow.core.service.cache.ICacheService;
import com.tandbergtv.workflow.core.service.thread.ISchedulerService;
import com.tandbergtv.workflow.core.service.thread.Scheduler;
import com.tandbergtv.workflow.driver.DriverException;
import com.tandbergtv.workflow.driver.DriverRuntimeException;
import com.tandbergtv.workflow.driver.event.WorkflowProcessEvent;
import com.tandbergtv.workflow.driver.event.WorkflowProcessEventType;
import com.tandbergtv.workflow.driver.monitor.IProcessMonitor;
import com.tandbergtv.workflow.driver.plugin.ICreateDelegate;
import com.tandbergtv.workflow.driver.service.IPersistenceService;
import com.tandbergtv.workflow.driver.service.IProcessManagerService;
import com.tandbergtv.workflow.driver.service.IProcessSearchService;

/**
 * Default implementation of the process manager
 * 
 * @author Sahil Verma
 */
public class ProcessManager implements IProcessManagerService, IProcessMonitor {

	private ISchedulerService<CustomToken> scheduler;
	
	private ISchedulerService<Void> globalScheduler;
	
	private static final String SERVICE_NAME = "Process Manager";
	
	/* Accounting */
	private AtomicInteger created;
	
	private AtomicInteger created0;
	
	private AtomicInteger restarted;
	
	private AtomicInteger paused;
	
	private AtomicInteger cancelled;
	
	private double maxCreateRate;
	
	private double createRate;
	
	private double createRate0;
	
	/* Used for calculating the exponential moving average of processes created over time */
	private double factor;
	
	private boolean initialized;
	
	private Properties properties;
	
	/*
	 * Periodicity of moving average calculation - make it too large and it will be inaccurate,
	 * make it too small and there will be overhead specially under high load
	 */
	private static final Long EMA_CALCULATION_PERIOD = 500L;
	
	private static String DRIVER_HALTED_PROPERTY = "isDriverHalted";
	
	private static final Logger logger = Logger.getLogger(ProcessManager.class);
	
	/**
	 * Creates a ProcessManager
	 */
	public ProcessManager(Properties properties) {
		super();
		this.created = new AtomicInteger(0);
		this.restarted = new AtomicInteger(0);
		this.paused = new AtomicInteger(0);
		this.cancelled = new AtomicInteger(0);
		this.created0 = new AtomicInteger(0);
		DefaultMediator.getInstance().register(this);
		/* NOTE Several things rely upon the global scheduler being single threaded */
		this.globalScheduler = new Scheduler<Void>("process-runtime", 1, 1);
		
		ResourceBundle bundle = 
			ResourceBundle.getBundle(this.getClass().getPackage().getName() + ".service");
		this.maxCreateRate = Double.parseDouble(bundle.getString("max.create.rate"));
		this.factor = Double.parseDouble(bundle.getString("history.factor"));
		
		int core = Integer.parseInt(bundle.getString("scheduler.pool.size"));
		int max = Integer.parseInt(bundle.getString("scheduler.pool.max"));
		this.scheduler = new Scheduler<CustomToken>("token-thread", core, max);
		this.properties = properties;
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.driver.monitor.IProcessManagerMBean#getCancelled()
	 */
	public int getCancelled() {
		return this.cancelled.intValue();
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.driver.monitor.IProcessManagerMBean#getCreated()
	 */
	public int getCreated() {
		return this.created.intValue();
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.driver.monitor.IProcessManagerMBean#getPaused()
	 */
	public int getPaused() {
		return this.paused.intValue();
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.driver.monitor.IProcessManagerMBean#getRestarted()
	 */
	public int getRestarted() {
		return this.restarted.intValue();
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.driver.monitor.IProcessMonitor#getCreateRate()
	 */
	public double getCreateRate() {
		return this.createRate;
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.driver.monitor.IProcessMonitor#setMaxCreateRate(double)
	 */
	public void setMaxCreateRate(double rate) {
		this.maxCreateRate = rate;
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.driver.monitor.IProcessMonitor#getMaxCreateRate()
	 */
	public double getMaxCreateRate() {
		return this.maxCreateRate;
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.driver.monitor.IProcessMonitor#getCreateRatePerMinute()
	 */
	public int getCreateRatePerMinute() {
		return (int)(this.createRate * 60);
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.driver.monitor.IProcessMonitor#getMaxCreateRatePerMinute()
	 */
	public int getMaxCreateRatePerMinute() {
		return (int)(this.maxCreateRate * 60);
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.driver.service.IProcessManagerService#create(org.jbpm.graph.def.ProcessDefinition, java.util.Map)
	 */
	public CustomToken create(ProcessDefinition template, Map<String, Object> parameters) throws DriverException {
		return create(template, ProcessPriority.NORMAL, parameters);
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.driver.IProcessManagerService#create(org.jbpm.graph.def.ProcessDefinition, com.tandbergtv.workflow.core.ProcessPriority, java.util.Map)
	 */
	public CustomToken create(ProcessDefinition template, ProcessPriority priority, Map<String, Object> parameters)
		throws DriverException {
		Map<String, Object> copy = getInitialVariables(template, parameters);
		
		preCreate(template, copy);

		/* Wokay, we're good to go */
		WorkflowProcess process = new WorkflowProcess(template, priority);
		
		for (String name : copy.keySet())
			process.getContextInstance().setVariable(name, copy.get(name));
		
		postCreate(process);
		
		return process.getRootToken();
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.driver.service.IProcessManagerService#create(com.tandbergtv.workflow.core.CustomToken, org.jbpm.graph.def.ProcessDefinition, java.util.Collection)
	 */
	public CustomToken create(CustomToken token, ProcessDefinition template, Map<String, Object> parameters)
		throws DriverException {
		Map<String, Object> copy = getInitialVariables(template, parameters);
		
		preCreate(template, copy);

		/* Wokay, we're good to go */
		WorkflowProcess process = token.createSubProcessInstance(template);
		
		for (String name : copy.keySet())
			process.getContextInstance().setVariable(name, copy.get(name));
		
		postCreate(process);
		
		return process.getRootToken();
	}
	
	protected Map<String, Object> getInitialVariables(ProcessDefinition template, Map<String, Object> parameters)
		throws DriverException {
		Map<String, Object> copy = new HashMap<String, Object>();
		DatatypeConverter converter = DatatypeConverter.getInstance();
		
		/* Make sure the input variables are sane */
		for (TaskVariable variable : getStartTaskVariables(template)) {
			Datatype datatype = variable.getDatatype();
			String key = variable.getVariableName();
			Object value = parameters.get(key);

			/*
			 * Value wasn't specified - if the variable is required, we abort. Otherwise
			 * we must define optional variables because the process might use them later on
			 */
			if (value == null || value instanceof String && ((String)value).trim().length() == 0) {
				if (variable.isRequired())
					throw new DriverException("Required variable (" + key + ") is not defined");
				value = getDefaultValue(datatype);
			}

			if (value != null) {
				try {
					copy.put(key, converter.convert(value, datatype));
				} catch (TypeConversionException e) {
					throw new DriverException("Input value data type mismatch", e);
				}
			}
		}
		
		return copy;
	}
	
	protected void preCreate(ProcessDefinition template, Map<String, Object> parameters)
		throws DriverException {
		/* FIXME Instantiate the extension that attaches to the pre-create extension point */
		ICreateDelegate delegate = (ICreateDelegate)getCallback("driver.create.delegate");

		if (delegate != null)
			delegate.create(template, parameters);
	}
	
	protected void postCreate(WorkflowProcess process) {
		/* Generate the sequence id */
		ServiceRegistry.getDefault().lookup(IPersistenceService.class).create(process);
		this.created.incrementAndGet();
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.driver.service.IProcessManagerService#start(com.tandbergtv.workflow.core.CustomToken)
	 */
	public Future<CustomToken> start(final CustomToken token) throws DriverException {
		return schedule(new Callable<CustomToken>() {
			public CustomToken call() throws Exception {
				try {
					if (getIsHalted()) {
						token.suspend();
						paused.incrementAndGet();
					} else
						token.start();
				} catch (Throwable t) {
					logger.error(token, t);
				}
				
				return token;
			}
		});
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.driver.service.IProcessManagerService#pause(com.tandbergtv.workflow.core.CustomToken)
	 */
	public Future<CustomToken> pause(final CustomToken token) {
		return schedule(new Callable<CustomToken>() {
			public CustomToken call() throws Exception {
				token.suspend();
				if (token.isRoot())
					paused.incrementAndGet();
				return token;
			}
		});
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.driver.service.IProcessManagerService#pause(java.util.Collection)
	 */
	public Collection<Future<CustomToken>> pause(Collection<CustomToken> tokens) {
		Collection<Future<CustomToken>> futures = new ArrayList<Future<CustomToken>>();
		
		for (CustomToken token : tokens)
			futures.add(pause(token));
		
		return futures;
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.driver.service.IProcessManagerService#resume(com.tandbergtv.workflow.core.CustomToken)
	 */
	public Future<CustomToken> resume(final CustomToken token) throws DriverException {
		return schedule(new Callable<CustomToken>() {
			public CustomToken call() throws Exception {
				try {
					token.resume();
				} catch (Throwable t) {
					logger.error(token, t);
				}
				
				return token;
			}
		});
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.driver.IProcessManagerService#restart(java.io.Serializable)
	 */
	public Future<CustomToken> restart(final Serializable id) throws DriverException {
		return schedule(new Callable<CustomToken>() {
			public CustomToken call() throws Exception {
				ServiceRegistry registry = ServiceRegistry.getDefault();
				WorkflowProcess process = registry.lookup(IPersistenceService.class).get(id);
				ProcessDefinition template = process.getProcessDefinition();
				
				Map<String, Object> parameters = new HashMap<String, Object>();
				
				/* Copy over the variable values of the original process... */
				for (TaskVariable variable : getStartTaskVariables(template)) {
					String name = variable.getVariableName();
					Object value = process.getContextInstance().getVariable(name);
					
					parameters.put(name, value);
				}
				
				/* ...and create a new one */
				CustomToken token = create(template, process.getPriority(), parameters);
				
				start(token);
				
				restarted.incrementAndGet();
				
				return token;
			}
		});
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.driver.service.IProcessManagerService#dequeue(com.tandbergtv.workflow.core.CustomToken)
	 */
	public Future<CustomToken> dequeue(final CustomToken token) {
		return schedule(new Callable<CustomToken>() {
			public CustomToken call() throws Exception {
				try {
					token.dequeue();
				} catch (Throwable t) {
					logger.error(token, t);
				}
				
				return token;
			}
		});
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.driver.IProcessManagerService#cancel(org.jbpm.graph.exe.Token)
	 */
	public Future<CustomToken> cancel(final CustomToken token) {
		return schedule(new Callable<CustomToken>() {
			public CustomToken call() throws Exception {
				token.cancel();
				if (token.isRoot())
					cancelled.incrementAndGet();
				return token;
			}
		});
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.driver.service.IProcessManagerService#cancel(java.util.Collection)
	 */
	public Collection<Future<CustomToken>> cancel(Collection<CustomToken> tokens) {
		Collection<Future<CustomToken>> futures = new ArrayList<Future<CustomToken>>();
		
		for (CustomToken token : tokens)
			futures.add(cancel(token));
		
		return futures;
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.driver.IProcessManagerService#delete(com.tandbergtv.workflow.core.WorkflowProcess)
	 */
	public Future<CustomToken> delete(final WorkflowProcess process) throws DriverException {
		return schedule(new Callable<CustomToken>() {
			public CustomToken call() throws Exception {
				process.delete();
				return process.getRootToken();
			}
		});
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.driver.service.IProcessManagerService#pause()
	 */
	public Future<Void> pause() throws DriverException {
		return this.globalScheduler.schedule(new Callable<Void>() {
			public Void call() throws DriverException {
				logger.info("Suspending...");
				setIsHalted(true);
				IProcessSearchService service = ServiceRegistry.getDefault().lookup(IProcessSearchService.class);
				
				for (WorkflowProcess process : service.findAllByStatus(ACTIVE, BRANCHED, QUEUED))
					pause(process.getRootToken());
				
				return (Void)null;
			}
		});
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.driver.service.IProcessManagerService#resume()
	 */
	public Future<Void> resume() throws DriverException {
		return this.globalScheduler.schedule(new Callable<Void>() {
			public Void call() throws DriverException {
				logger.info("Resuming...");
				setIsHalted(false);
				ServiceRegistry registry = ServiceRegistry.getDefault();
				List<WorkflowProcess> processes = registry.lookup(IProcessSearchService.class).findAllByStatus(PAUSED);
				
				/*
				 * 	We want to resume processes in order of their priority
				 *  and in the order by which they were created.
				 */
				Collections.sort(processes, new Comparator<WorkflowProcess>() {
					public int compare(WorkflowProcess p1, WorkflowProcess p2) {
						if (p1.getPriority() == p2.getPriority())
							return (p2.getStart().getTime() > p1.getStart().getTime() ? 
								-1 : (p2.getStart().getTime() == p1.getStart().getTime() ? 0 : 1));
						return p2.getPriority().compareTo(p1.getPriority());
					}
				});
				
				for (WorkflowProcess process : processes)
					resume(process.getRootToken());
				
				return (Void)null;
			}
		});
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.driver.IProcessManagerService#setPriority(com.tandbergtv.workflow.core.WorkflowProcess, com.tandbergtv.workflow.core.ProcessPriority)
	 */
	public void setPriority(WorkflowProcess process, ProcessPriority priority) {
		process.setPriority(priority);
		
		CustomToken rootToken = process.getRootToken();
		
		DefaultMediator.getInstance().sendAsync(
			new WorkflowProcessEvent(this, process, rootToken, WorkflowProcessEventType.PRIORITY_CHANGED));
		
		for (Token token : rootToken.getActiveChildTokens()) {
			/* Process priority 'trickles down' to each child token */
			DefaultMediator.getInstance().sendAsync(
				new WorkflowProcessEvent(this, process, token, WorkflowProcessEventType.PRIORITY_CHANGED));
		}
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.core.event.IColleague#getColleagueName()
	 */
	public String getColleagueName() {
		return SERVICE_NAME;
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.core.event.IColleague#getColleaguePriority()
	 */
	public ColleaguePriority getColleaguePriority() {
		return LOW;
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.core.event.IColleague#receive(com.tandbergtv.workflow.core.event.WorkflowEvent)
	 */
	public void receive(WorkflowEvent event) {
		if (!(event instanceof ServiceEvent) || initialized)
			return;
		
		ServiceEvent e = ServiceEvent.class.cast(event);
		
		if (e.getService().getServiceName() == "ResourceManager" && e.getEvent() == STARTED)
			start();
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.core.service.ServiceLifecycle#start()
	 */
	public void start() {
		logger.info("Starting process manager service...");
		final IMediator mediator = DefaultMediator.getInstance();
		
		mediator.sendAsync(new ServiceEvent(this, STARTING));
		
		this.globalScheduler.start();
		this.scheduler.start();
		
		/*
		 * Crash recovery must happen before the service started notification. This ensures that
		 * all resource requests are completed before they get honored.
		 */
		globalScheduler.schedule(new CrashRecoveryThread());

		globalScheduler.schedule(new Callable<Void>() {
			/* (non-Javadoc)
			 * @see java.util.concurrent.Callable#call()
			 */
			public Void call() throws Exception {
				mediator.sendAsync(new ServiceEvent(ProcessManager.this, STARTED));
				logger.info("Process manager service started");		
				
				return (Void)null;
			}
		});
		
		/* Calculate the exponential moving average periodically */
		globalScheduler.schedule(new Runnable() {
			/* (non-Javadoc)
			 * @see java.lang.Runnable#run()
			 */
			public void run() {
				/* Lotsa floating-point mult and div...shouldn't be too expensive I hope */
				double deltaN = created.doubleValue() - created0.doubleValue();
				double delta = deltaN * 1000 / EMA_CALCULATION_PERIOD.doubleValue();
				double rate = factor * delta + (1.0 - factor) * createRate0; 
				
				/* Clamp to lower bound */
				createRate = (rate > 1E-4) ? rate : 0.0;
				
				/* History */
				createRate0 = createRate;
				created0.set(created.get()); // Hehe. I missed this the first time around.
				
				logger.debug("Average " + createRate);
			}
		}, 0, EMA_CALCULATION_PERIOD);
		
		this.initialized = true;
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.core.service.ServiceLifecycle#stop()
	 */
	public void stop() {
		DefaultMediator.getInstance().sendAsync(new ServiceEvent(this, STOPPING));
		
		this.scheduler.stop();
		this.globalScheduler.stop();
		
		DefaultMediator.getInstance().sendAsync(new ServiceEvent(this, STOPPED));
		this.initialized = false;
	}
	
	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.core.service.Service#getServiceName()
	 */
	public String getServiceName() {
		return SERVICE_NAME;
	}
	
	/**
	 * Schedules a task on the token threadpool for immediate execution
	 * 
	 * @param callable
	 * @return
	 */
	private Future<CustomToken> schedule(Callable<CustomToken> callable) {
		/* Add printfs for current queue size and current threadpool usage stats? */
		Future<CustomToken> future = null;
		
		try {
			future = this.scheduler.schedule(callable);
		} catch (RejectedExecutionException e) {
			logger.warn("Could not add scheduled task to " + this.scheduler.getServiceName(), e);
		}
		
		return future;
	}
	
	/**
	 * Returns true if the process manager has been halted
	 * @return
	 */
	private boolean getIsHalted() {
		String haltedValue = properties.getProperty(DRIVER_HALTED_PROPERTY);
		
		return Boolean.parseBoolean(haltedValue);
	}
	
	/**
	 * Sets and persists the halted property 
	 */
	private void setIsHalted(boolean isHalted) {
		String value = Boolean.toString(isHalted);
		
		properties.setProperty(DRIVER_HALTED_PROPERTY, value);
		
		OutputStream out = null;
		String name = properties.getProperty("driver.properties.file");
		
		try {
			if (name != null) {
				out = new FileOutputStream(new File(name));
				
				properties.store(out, null);
			}
		} catch (Exception e) {
			logger.warn("Failed to save the halted state", e);
		} finally {
			if (out != null) {
				try {
					out.close();
				} catch (Exception ex) {
					logger.warn("Failed to close the properties file", ex);
				}
			}
		}
	}
	
	/**
	 * Gets the default value of the specified data type
	 * 
	 * @param type
	 * @return a default value
	 */
	private Object getDefaultValue(Datatype type) {
		Object value = null;

		switch (type) {
			case STRING:
				value = "";
				break;
			case BOOLEAN:
				value = Boolean.FALSE;
				break;
			case INT:
				value = Integer.valueOf(0);
				break;
			case FILE:
				value = "";
				break;
			case URL:
				value = "";
				break;
			default:
				value = null;
				break;
		}

		return value;
	}
	
	/**
	 * Returns a list of variables defined in the start node of the specified template
	 * 
	 * @param template
	 * @return The variable list defined in the start node
	 */
	@SuppressWarnings("unchecked")
	private List<TaskVariable> getStartTaskVariables(ProcessDefinition template) {
		Task t = template.getTaskMgmtDefinition().getStartTask();
		
		if (t == null)
			return new ArrayList<TaskVariable>();
		
		return t.getTaskController().getVariableAccesses();
	}
	
	private Object getCallback(String id) {
		/* 
		 * Ideally we should IExtension e = getExtension(id) and then instantiate the extension,
		 * but for now that would require a lot of plumbing. So we're ignoring the id and
		 * returning the only delegate that we have in the framework
		 */
		Object callback = null;
		
		ResourceBundle bundle = 
			ResourceBundle.getBundle(this.getClass().getPackage().getName() + ".service");
		String className = bundle.getString(id);
		
		try {
			Class<?> clazz = this.getClass().getClassLoader().loadClass(className);
			callback = clazz.newInstance();
		} catch (Exception e) {
			throw new DriverRuntimeException("Failed to load delegate", e);
		}
		
		return callback;
	}
	
	/**
	 * Runs the crash recovery procedure.
	 */
	private final class CrashRecoveryThread implements Callable<Void> {
		/* (non-Javadoc)
		 * @see java.util.concurrent.Callable#call()
		 */
		public Void call() throws Exception {
			IMediator mediator = DefaultMediator.getInstance();
			Collection<WorkflowProcess> processes = search();
			
			logger.info("Performing crash recovery for " + processes.size() + " process(es)");

			/* Stage 1 - anything that is queued is cached */
			for (WorkflowProcess process : processes) {
				for (CustomToken token : process.findAllTokens()) {
					if (token.getStatus() == QUEUED) {
						cache(process);
						/* This causes a message queue to get created - bizzare dependency */
						mediator.sendAsync(new WorkflowProcessEvent(this, process, token, CACHE_INITED));
					}
				}
			}

			/* Stage 2 - crash recovery */
			for (WorkflowProcess process : processes) {
				final CustomToken token = process.getRootToken();

				if (token.getStatus() == BRANCHED)
					cache(process);
				
				/*
				 * For tokens that are going to end up making a resource request, we know that
				 * this thread is going to fall off very soon. It's probably OK to recover those tokens
				 * in the current thread.
				 */
				if (ResourceGroupAware.class.isAssignableFrom(token.getCurrentNode().getClass())) {
					ResourceGroupAware aware = (ResourceGroupAware)token.getCurrentNode();
					
					if (aware.getResourceGroupID() != null) {
						try {
							token.recover();
						} catch (Throwable t) {
							logger.warn(token + ", failed to recover", t);
						}
						
						continue;
					}
				}
				
				/*
				 * This token potentially has a long running execution that may end up getting 
				 * stuck for a long time. We want to continue the crash recovery process, therefore we
				 * run this in a separate thread.
				 */
				schedule(new Callable<CustomToken>() {
					/* (non-Javadoc)
					 * @see java.util.concurrent.Callable#call()
					 */
					public CustomToken call() {
						try {
							token.recover();
						} catch (Throwable t) {
							logger.warn(token + ", failed to recover", t);
						}

						return token;
					}
				});
			}
			
			return (Void)null;
		}
		
		private Collection<WorkflowProcess> search() {
			ServiceRegistry registry = ServiceRegistry.getDefault();
			IProcessSearchService service = registry.lookup(IProcessSearchService.class);
			
			return service.findAllByStatus(ACTIVE, BRANCHED, QUEUED);
		}
		
		@SuppressWarnings("unchecked")
		private void cache(WorkflowProcess process) {
			ServiceRegistry registry = ServiceRegistry.getDefault();
			ICacheService<WorkflowProcess> cache = 
				(ICacheService<WorkflowProcess>) registry.lookup("Process Cache");

			cache.add(process.getId(), process);
		}
	}
}
