package com.tandbergtv.ruleengine;

import static javax.jms.Session.AUTO_ACKNOWLEDGE;

import java.util.HashMap;
import java.util.Map;

import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.naming.Context;
import javax.naming.InitialContext;

import org.apache.log4j.Logger;

import com.tandbergtv.workflow.adaptor.AdaptorException;
import com.tandbergtv.workflow.adaptor.conf.IAdaptorConfiguration;
import com.tandbergtv.workflow.comm.IDevice;
import com.tandbergtv.workflow.comm.ISource;
import com.tandbergtv.workflow.comm.routing.RoutingServiceFactory;
import com.tandbergtv.workflow.message.IMessage;
import com.tandbergtv.workflow.message.MessageUIDImpl;
import com.tandbergtv.workflow.message.WPCLCommand;
import com.tandbergtv.workflow.message.WorkflowMessage;
import com.tandbergtv.workflow.message.WorkflowMessage.MessageType;
import com.tandbergtv.workflow.message.util.WPCLConverter;

/**
 * It binds to a JMS queue at startup and converts queue messages to WPCL messages and delivers to
 * workflow.
 * 
 * @author Kinjal Mehta
 * 
 */
public class NormalizationRulesResponseHandler {

	private IAdaptorConfiguration conf = null;
	private static QueueConnection queueConnection = null;
	private static QueueSession queueSession = null;
	private static Queue queue = null;
	private static final String RESPONSE_QUEUE_NAME = "responseQueue";
	private Logger logger = Logger.getLogger(NormalizationRulesResponseHandler.class);
	private static final String CORRELATION_ID_PARAMETER = "correlationId";
	private static final String STATUS_MESSAGE = "statusMessage";
	private static final String RESPONSE_UID = "re0103";
	private RuleEngineAdaptor adaptor = null;

	public NormalizationRulesResponseHandler(RuleEngineAdaptor adaptor, IAdaptorConfiguration arg0) {
		conf = arg0;
		this.adaptor = adaptor;
		initializeQueue();
	}

	public NormalizationRulesResponseHandler() {
	}

	/**
	 * reads the queue name from Adaptor configuration and binds for any messages.
	 */
	private void initializeQueue() {
		try {
			Context context = new InitialContext();
			queue = (Queue) context.lookup((String) conf.getParameterValue(RESPONSE_QUEUE_NAME));
			QueueConnectionFactory factory = (QueueConnectionFactory) context
					.lookup("ConnectionFactory");
			queueConnection = factory.createQueueConnection();
			queueSession = queueConnection.createQueueSession(false, AUTO_ACKNOWLEDGE);
			queueConnection.start();
			MessageConsumer consumer = queueSession.createConsumer((Destination) queue);
			consumer.setMessageListener(new QueueListener());
			logger.debug("Normalization progress queue listener initialized");
		} catch (Exception e) {
			logger.error("Error creating normalization progress queue listener", e);
		}

	}

	/**
	 * Queue message listener which basically converts queue messages to WPCL message. It looks at
	 * the success value and if true reads the percent value and sends notification message. If success
	 * value is false, it sends a NACK message.
	 * 
	 */
	private class QueueListener implements MessageListener {

		public void onMessage(Message message) {
			IMessage wfsMessage = null;
			String correlationID = "";
			try {
				logger.debug("Handling message with correlation id : "
						+ message.getJMSCorrelationID());
				String status = message.getStringProperty("status");
				boolean success = message.getBooleanProperty("success");
				correlationID = message.getJMSCorrelationID();

				if (success) {
					int percent = message.getIntProperty("percent");
					if (percent == 100)
						wfsMessage = prepareTaskCompleteMessage(correlationID,status);
					else
						wfsMessage = prepareTaskUpdateMessage(percent, correlationID, status);
				} else {
					wfsMessage = prepareNackMessage(correlationID, status);
				}

			} catch (Exception e) {
				logger.warn("Error reading progress - " + e.getMessage(), e);
				try {
					wfsMessage = prepareNackMessage(correlationID, e.getMessage());
				} catch (Exception ex) {
					logger.warn("Error creating Nack message : " + ex.getMessage(), ex);
				}
			}
			IMessage response = null;
			try {
				response = RoutingServiceFactory.newInstance().createRoutingService().send(
						wfsMessage);
				logger.debug("Response receieved:" + response.getPayload().getContent());
			} catch (Exception e) {
				logger.error("Error sending message - " + e.getMessage(), e);
			}
		}
	}

	/**
	 * helper method for creating task complete message
	 * @param id correlation id of JMS message
	 * @return 
	 * @throws AdaptorException
	 */
	private IMessage prepareTaskCompleteMessage(String id, String status) throws AdaptorException {
		WorkflowMessage response = new WorkflowMessage(new MessageUIDImpl(RESPONSE_UID));
		response.setType(MessageType.notification);
		response.putValue(CORRELATION_ID_PARAMETER, id);
		response.putValue(STATUS_MESSAGE,status);
		response.setCommand(new WPCLCommand("task-complete"));
		
		/* set source url */
		IDevice rulesDevice = adaptor.getRulesDevice();
		if (rulesDevice == null)
			logger.warn("Cannot get rules source URL");
		response.setSource((ISource) rulesDevice);
		try {
			return new WPCLConverter().convert(response);
		} catch (Exception ex) {
			throw new AdaptorException(ex);
		}
	}
	/**
	 * helper method for creating Task update message
	 * @param percent - progress value for rule set execution
	 * @param id - correlation ID of JMS message 
	 * @return
	 * @throws AdaptorException
	 */
	private IMessage prepareTaskUpdateMessage(int percent, String id, String status) throws AdaptorException {
		WorkflowMessage response = new WorkflowMessage(new MessageUIDImpl(RESPONSE_UID));
		/* set source url */
		IDevice rulesDevice = adaptor.getRulesDevice();
		if (rulesDevice == null)
			logger.warn("Cannot get rules source URL");
		response.setSource((ISource) rulesDevice);
		response.setType(MessageType.notification);
		response.putValue(CORRELATION_ID_PARAMETER, id);
		response.putValue(STATUS_MESSAGE, status);
		Map<String, String> paramMap = new HashMap<String, String>();
		paramMap.put("percent", new Integer(percent).toString());
		response.setCommand(new WPCLCommand("task-update", paramMap));
		try {
			return new WPCLConverter().convert(response);
		} catch (Exception ex) {
			throw new AdaptorException(ex);
		}

	}
	/**
	 * helper method for creating NACK message
	 * @param correlationID - correlation ID of JMS message 
	 * @param errorMessage - reason for failure
	 * @return
	 * @throws AdaptorException
	 */
	private IMessage prepareNackMessage(String correlationID, String errorMessage)
			throws AdaptorException {
		WorkflowMessage response = new WorkflowMessage(new MessageUIDImpl(RESPONSE_UID));
		response.setType(MessageType.nack);
		response.putValue(CORRELATION_ID_PARAMETER, correlationID);
		/* set source url */
		IDevice rulesDevice = adaptor.getRulesDevice();
		if (rulesDevice == null)
			logger.warn("Cannot get rules source URL");
		response.setSource((ISource) rulesDevice);
		response.getPayload().putValue("error-message", errorMessage);
		response.setCommand(new WPCLCommand("task-complete"));
		try {
			IMessage res = new WPCLConverter().convert(response);
			logger.debug("Nack message = " + res.getPayload().getContent());
			return res;
		} catch (Exception ex) {
			throw new AdaptorException(ex);
		}
	}

	public void destroy() {
		logger.info("Destroying Normalization Response handler");
		try {
			if (queueConnection != null) {
				logger.info("Closing queue connection");
				queueConnection.close();
			}

			if (queueSession != null) {
				logger.info("Closing queue session");
				queueSession.close();
			}
		} catch (Exception e) {
			logger.warn("Error closing queue session : " + e.getMessage(), e);
		}
	}
}
