package com.tandbergtv.ruleengine;

import static javax.jms.Session.AUTO_ACKNOWLEDGE;

import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.naming.Context;
import javax.naming.InitialContext;

import org.apache.log4j.Logger;

import com.tandbergtv.workflow.adaptor.AdaptorException;
import com.tandbergtv.workflow.adaptor.AdaptorResponseImpl;
import com.tandbergtv.workflow.adaptor.IAdaptorResponse;
import com.tandbergtv.workflow.adaptor.conf.IAdaptorConfiguration;
import com.tandbergtv.workflow.message.IMessage;
import com.tandbergtv.workflow.message.WorkflowMessage;
import com.tandbergtv.workflow.message.WorkflowMessage.MessageType;
import com.tandbergtv.workflow.message.util.WPCLConverter;

/**
 * responsible for executing normalization ruleset by sending a message on JMS queue
 * It reads the queue name from adaptor configuration
 * @author Kinjal Mehta
 * 
 */
public class RunNormalizationRuleHandler {

	private static final String RULE_SET_ID_PARAMETER = "ruleSetId";
	private static final String CORRELATION_ID_PARAMETER = "correlationId";
	private IAdaptorConfiguration conf = null;
	private static final String REQUEST_QUEUE_NAME = "requestQueue";
	private static final String RESPONSE_QUEUE_NAME = "responseQueue";

	Logger logger = Logger.getLogger(RunNormalizationRuleHandler.class);

	public RunNormalizationRuleHandler(IAdaptorConfiguration arg0) {
		conf = arg0;
	}

	public IAdaptorResponse handleMessage(WorkflowMessage message) throws AdaptorException {
		String ruleSetId = message.getValue(RULE_SET_ID_PARAMETER);
		QueueConnection queueConnection = null;
		QueueSession queueSession = null;
		IMessage response = null;
		try {
			Context context = new InitialContext();
			QueueConnectionFactory factory = (QueueConnectionFactory) context
					.lookup("ConnectionFactory");
			queueConnection = factory.createQueueConnection();
			queueSession = queueConnection.createQueueSession(false, AUTO_ACKNOWLEDGE);
			logger.debug("Queue name read = " + conf.getParameterValue(REQUEST_QUEUE_NAME));
			logger.debug("Queue name read = " + conf.getParameterValue(RESPONSE_QUEUE_NAME));
			Queue queue = (Queue) context.lookup((String) conf
					.getParameterValue(REQUEST_QUEUE_NAME));
			QueueSender queueSender = queueSession.createSender(queue);

			Queue responseQueue = (Queue) context.lookup((String) conf
					.getParameterValue(RESPONSE_QUEUE_NAME));
			ObjectMessage queueMessage = queueSession.createObjectMessage();
			queueMessage.setStringProperty("ruleSetId", ruleSetId);
			queueMessage.setJMSReplyTo(responseQueue);
			queueSender.send(queue, queueMessage);
			response = prepareAckMessage(message, queueMessage.getJMSMessageID());
		} catch (Exception ex) {
			logger.error("Error sending message to queue", ex);
			response = prepareNackMessage(message, ex.getMessage());
		} finally {
			try {
				if (queueSession != null)
					queueSession.close();
			} catch (Exception e) {
			}
		}
		return new AdaptorResponseImpl(response);
	}

	private IMessage prepareNackMessage(WorkflowMessage message, String errorMessage)
			throws AdaptorException {
		WorkflowMessage response = new WorkflowMessage(message.getMessageUID());
		response.setType(MessageType.nack);
		response.setKey(message.getKey());
		response.getPayload().putValue("error-message", errorMessage);

		try {
			return new WPCLConverter().convert(response);
		} catch (Exception ex) {
			throw new AdaptorException(ex);
		}
	}

	private IMessage prepareAckMessage(WorkflowMessage message, String id) throws AdaptorException {
		WorkflowMessage response = new WorkflowMessage(message.getMessageUID());
		response.setType(MessageType.ack);
		response.setKey(message.getKey());
		response.putValue(CORRELATION_ID_PARAMETER, id);

		try {
			return new WPCLConverter().convert(response);
		} catch (Exception ex) {
			throw new AdaptorException(ex);
		}
	}

}
