/**
 * MessageQueueService.java
 * Created Mar 1, 2007
 * Copyright (C) Tandberg Television 2007
 */
package com.tandbergtv.workflow.driver.internal;

import static com.tandbergtv.workflow.core.service.ServiceEvents.STARTED;
import static com.tandbergtv.workflow.core.service.ServiceEvents.STARTING;
import static com.tandbergtv.workflow.core.service.ServiceEvents.STOPPED;
import static com.tandbergtv.workflow.core.service.ServiceEvents.STOPPING;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.log4j.Logger;
import org.jbpm.graph.exe.Token;

import com.tandbergtv.workflow.core.event.ColleaguePriority;
import com.tandbergtv.workflow.core.event.DefaultMediator;
import com.tandbergtv.workflow.core.event.IColleague;
import com.tandbergtv.workflow.core.event.IMediator;
import com.tandbergtv.workflow.core.event.WorkflowEvent;
import com.tandbergtv.workflow.core.service.ServiceEvent;
import com.tandbergtv.workflow.driver.event.WorkflowProcessEvent;
import com.tandbergtv.workflow.driver.event.message.MessageRejectedEvent;
import com.tandbergtv.workflow.driver.service.IMessageQueueService;
import com.tandbergtv.workflow.message.IMessageUID;
import com.tandbergtv.workflow.message.WorkflowMessage;

/**
 * Default implementation of the per-token message queue service. This per-token queue is not 
 * thread-safe, however it is unlikely that it will be accessed concurrently based on the 
 * normal behavior of a token.
 * 
 * @author Sahil Verma
 */
public class MessageQueueService implements IMessageQueueService, IColleague {
	
	private Map<Token, BlockingQueue<WorkflowMessage>> queues;
	
	/* This lock must be taken around all accesses to the collection of queues */
	private static final Lock lock = new ReentrantLock();
	
	private static final String SERVICE_NAME = "Message Queue";
	
	private static final Logger logger = Logger.getLogger(MessageQueueService.class);
	
	/**
	 * Creates a MessageQueueService
	 */
	public MessageQueueService() {
		this.queues = new HashMap<Token, BlockingQueue<WorkflowMessage>>();
	}
	
	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.core.service.Service#getServiceName()
	 */
	public String getServiceName() {
		return SERVICE_NAME;
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.core.service.ServiceLifecycle#start()
	 */
	public void start() {
		IMediator mediator = DefaultMediator.getInstance();
		mediator.sendAsync(new ServiceEvent(this, STARTING));
		
		mediator.register(this);
		
		mediator.sendAsync(new ServiceEvent(this, STARTED));
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.core.service.ServiceLifecycle#stop()
	 */
	public void stop() {
		IMediator mediator = DefaultMediator.getInstance();
		mediator.sendAsync(new ServiceEvent(this, STOPPING));
		
		lock.lock();
		
		try {
			Iterator<Entry<Token, BlockingQueue<WorkflowMessage>>> i = queues.entrySet().iterator();

			while (i.hasNext()) {
				i.next();
				i.remove();
			}
		} finally {
			lock.unlock();
		}
		
		mediator.unregister(this);
		mediator.sendAsync(new ServiceEvent(this, STOPPED));
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.message.queue.IMessageQueueService#addMessage(org.jbpm.graph.exe.Token, com.tandbergtv.workflow.message.WorkflowMessage)
	 */
	public void addMessage(Token token, WorkflowMessage message) {
		IMessageUID uid = message.getMessageUID();
		BlockingQueue<WorkflowMessage> queue = getQueue(token);
		
		if (queue == null) {
			logger.warn(token +  ", message queue unavailable, rejecting " + uid);
			return;
		}
		
		/* If there's more than one message, there are going to be problems for sure. */
		if (queue.size() > 0)
			logger.warn(token +  ", queue size is " + queue.size());
		
		logger.debug(token + ", adding message " + uid);
		queue.add(message);
	}
	
	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.message.queue.IMessageQueueService#getQueue(org.jbpm.graph.exe.Token)
	 */
	public BlockingQueue<WorkflowMessage> getQueue(Token token) {
		BlockingQueue<WorkflowMessage> queue = null;
		lock.lock();
		
		try {
			queue = this.queues.get(token);
		} finally {
			lock.unlock();
		}
		
		return queue;
	}
	
	/**
	 * Creates a queue for the specified token
	 * 
	 * @param token
	 * @return The Queue
	 */
	private BlockingQueue<WorkflowMessage> createQueue(Token token) {
		BlockingQueue<WorkflowMessage> queue = new LinkedBlockingQueue<WorkflowMessage>();
		
		lock.lock();
		
		try {
			this.queues.put(token, queue);
		} finally {
			lock.unlock();
		}
		
		logger.debug(token + ", created message queue");
		
		return queue;
	}
	
	/**
	 * Flushes and removes the queue for the specified token
	 * 
	 * @param t
	 */
	private void removeQueue(Token t) {
		BlockingQueue<WorkflowMessage> queue = null;
		lock.lock();
		
		try {
			queue = this.queues.remove(t);
		} finally {
			lock.unlock();
		}
		
		if (queue != null) {
			queue.clear();
			logger.debug(t + ", removed message queue");
		}
	}
	
	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.core.event.IColleague#getColleagueName()
	 */
	public String getColleagueName() {
		return this.getClass().getName();
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.core.event.IColleague#getColleaguePriority()
	 */
	public ColleaguePriority getColleaguePriority() {
		return ColleaguePriority.NORMAL;
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.core.event.IColleague#receive(com.tandbergtv.workflow.core.event.WorkflowEvent)
	 */
	public void receive(WorkflowEvent event) {
		if (event instanceof WorkflowProcessEvent) {
			WorkflowProcessEvent e = (WorkflowProcessEvent)event;
			Token token = e.getToken();
			
			/* Basic idea - lifecycle of the per-token queue follows lifecycle of the token */
			switch (e.getType()) {
				case CREATED:
				case RESUMED:
				case JOINED:
				case CACHE_INITED:
					createQueue(token);
					break;
				case BRANCHED:
				case PAUSED:
				case FAILED:
				case CANCELLED:
				case STOPPED:
					removeQueue(token);
					break;
				default:
					break;
			}
		} else if (event instanceof MessageRejectedEvent) {
			MessageRejectedEvent e = (MessageRejectedEvent)event;
			boolean removed = getQueue(e.getToken()).remove(e.getMessage());
			
			if (removed)
				logger.warn(e.getToken() + ", removed message [" + e.getMessage().getMessageUID() + "] from queue");
		}
	}
}
