/**
 * ScheduleNotifier.java
 * Created Jul 2, 2008
 * Copyright (c) TANDBERG Television 2007-2008
 */
package com.tandbergtv.watchpoint.pmm.schedule.notify;

import static com.tandbergtv.watchpoint.pmm.schedule.search.ScheduleSearchKey.ACTIVE;
import static com.tandbergtv.watchpoint.pmm.schedule.search.ScheduleSearchKey.PITCH_DATE;
import static com.tandbergtv.workflow.driver.search.SearchType.DATE;
import static com.tandbergtv.workflow.driver.search.SearchType.NUMERIC;
import static java.io.File.separator;
import static javax.xml.xpath.XPathConstants.NODE;

import java.io.File;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathFactory;

import org.apache.log4j.Logger;
import org.w3c.dom.Document;
import org.w3c.dom.Node;

import com.tandbergtv.cms.portal.util.transaction.Transactional;
import com.tandbergtv.watchpoint.pmm.core.ProgressEvent;
import com.tandbergtv.watchpoint.pmm.entities.AssetList;
import com.tandbergtv.watchpoint.pmm.entities.DistributionSchedule;
import com.tandbergtv.watchpoint.pmm.entities.IAssetList;
import com.tandbergtv.watchpoint.pmm.entities.Planner;
import com.tandbergtv.watchpoint.pmm.entities.Schedule;
import com.tandbergtv.watchpoint.pmm.entities.Title;
import com.tandbergtv.watchpoint.pmm.entities.TitleListType;
import com.tandbergtv.watchpoint.pmm.entities.event.AssetListEvent;
import com.tandbergtv.watchpoint.pmm.entities.event.TitleStatusUpdatedEvent;
import com.tandbergtv.watchpoint.pmm.entities.event.TitleUpdatedEvent;
import com.tandbergtv.watchpoint.pmm.schedule.ISchedulePersistenceService;
import com.tandbergtv.watchpoint.pmm.schedule.search.IScheduleSearchService;
import com.tandbergtv.watchpoint.pmm.title.ITitlePersistenceService;
import com.tandbergtv.watchpoint.search.Entity;
import com.tandbergtv.workflow.core.event.ColleaguePriority;
import com.tandbergtv.workflow.core.event.DefaultMediator;
import com.tandbergtv.workflow.core.event.IColleague;
import com.tandbergtv.workflow.core.event.WorkflowEvent;
import com.tandbergtv.workflow.core.service.ServiceRegistry;
import com.tandbergtv.workflow.core.service.thread.ISchedulerService;
import com.tandbergtv.workflow.core.service.thread.Scheduler;
import com.tandbergtv.workflow.driver.search.RangeParameter;
import com.tandbergtv.workflow.driver.search.SearchParameterBase;
import com.tandbergtv.workflow.driver.search.SortParameter;
import com.tandbergtv.workflow.driver.search.ValueParameter;
import com.tandbergtv.workflow.util.SearchCriteria;
import com.tandbergtv.workflow.util.SortingOrder;

/**
 * Implementation of the {@link IScheduleNotifier} service which writes out warning messages to the
 * log file
 * 
 * @author Sahil Verma
 */
public class ScheduleNotifierService implements IScheduleNotifier, IColleague {

	private ISchedulerService<Void> scheduler;
	
	/* FIXME Locking. And this is a VERY poor choice of data structure */
	private Collection<Notification> notifications;
	
	private static final long ONE_MINUTE_MILLIS = 60 * 1000L;
	
	private static final long ONE_HOUR_MILLIS = 60 * ONE_MINUTE_MILLIS;
	
	private static final long ONE_DAY_MILLIS = 24 * ONE_HOUR_MILLIS;
	
	private static final String SERVICE_NAME = "Schedule Notifier";
	
	private static final Logger logger = Logger.getLogger(ScheduleNotifierService.class);
	
	/* executor on which event processing threads are scheduled */
	private ScheduledExecutorService executor;
	
	/* Delay in msecs before the event is processed */
	private long eventProcessingDelay;
	
	/* Number of days in the past for which notifications will be generated */
	private int pastDays;
	
	/* Number of days in the future for which notifications will be generated */
	private int futureDays;
	
	/*
	 * If the value is not specified or the element does not exist, then process the event 
	 * immediately.
	 */
	private long DEFAULT_PROCESS_DELAY = 0;
	
	/*
	 * If 'past' is not set in progress.xml, notifications will be generated for
	 * schedules in the past 30 days
	 */
	private int DEFAULT_PAST_NUMBER_OF_DAYS = 30; 
	
	/*
	 * If 'future' is not set in progress.xml, notifications will be generated for
	 * schedules in the coming 30 days
	 */
	private int DEFAULT_FUTURE_NUMBER_OF_DAYS = 30;
	
	/* Constants used to lookup the progress.xml to get the value for event processing delay */
	private String PRODUCT_DIR = "com.tandbergtv.cms.product.dir";
	private String PMM_CONFIG_DIR = "pmm";
	private String CONFIG_DIR = "conf";
	private String CONFIG_FILE = "progress.xml";
	
	/**
	 * Creates a ScheduleNotifierService
	 */
	public ScheduleNotifierService() {
		this.notifications = Collections.synchronizedSet(new HashSet<Notification>());

		setEventProcessingDelay();
		
		setNumberOfPastDays();
		
		setNumberOfFutureDays();
	}

	private String getValue(String nodePath) throws Exception {
		Document document = getConfiguration();
		XPath xpath = XPathFactory.newInstance().newXPath();
		Node node = (Node) xpath.evaluate(nodePath, document, NODE);
		if (node != null) {
			return node.getTextContent();
		}
		return null;
	}

	private void setNumberOfFutureDays() {
		try {
			futureDays = Integer.parseInt(getValue("//future"));
			logger.debug("Notifications will be generated for schedules " + futureDays 
					+ " days from today.");
		}
		catch(Exception e) {
			logger.error("Error while getting the value for number of future days to calcuate " 
					+ "notifications: ", e);
			logger.debug("Using default value to generate notifications for coming " + futureDays 
					+ " days.");
			futureDays = DEFAULT_FUTURE_NUMBER_OF_DAYS;
		}
	}

	private void setNumberOfPastDays() {
		try {
			pastDays = Integer.parseInt(getValue("//past"));
			logger.debug("Notifications will be generated for schedules "
					+ pastDays + " days in the past from today.");
		}
		catch(Exception e) {
			logger.error("Error while getting the value of number of past days to calcuate " +
					"notifications: ", e);
			logger.debug("Using default value to generate notifications for past " + pastDays 
					+ " days.");
			pastDays = DEFAULT_PAST_NUMBER_OF_DAYS;
		}
	}

	private void setEventProcessingDelay() {
		try {
			eventProcessingDelay = Long.parseLong(getValue("//eventProcessDelay"));
			logger.debug("Set the event processing delay to: " + eventProcessingDelay + " msec");
		}
		catch(Exception e) {
			logger.error("Error while getting processing delay ", e);
			logger.debug("Set the event processing delay to the default value: " + 
					eventProcessingDelay + " msec");
			eventProcessingDelay = DEFAULT_PROCESS_DELAY;
		}
	}

	private Document getConfiguration() throws Exception {
		String dir = System.getProperty(PRODUCT_DIR) + separator + CONFIG_DIR;
		String path = dir + separator + PMM_CONFIG_DIR + separator + CONFIG_FILE;
		
		File file = new File(path);
		Document document = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(file);
		
		return document;
	}
	
	/* (non-Javadoc)
	 * @see com.tandbergtv.watchpoint.pmm.schedule.notify.IScheduleNotifier#getNotification(com.tandbergtv.watchpoint.pmm.entities.Schedule)
	 */
	public Notification getNotification(Schedule schedule) {
		for (Notification notification : this.notifications) {
			if (notification.getSchedule().equals(schedule))
				return notification;
		}
		
		return null;
	}
	
	@Override
	public Collection<Notification> getCurrentNotifications() {
		return notifications;
	}
	
	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.core.event.IColleague#getColleagueName()
	 */
	public String getColleagueName() {
		return SERVICE_NAME;
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.core.event.IColleague#getColleaguePriority()
	 */
	public ColleaguePriority getColleaguePriority() {
		return ColleaguePriority.LOW;
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.core.event.IColleague#receive(com.tandbergtv.workflow.core.event.WorkflowEvent)
	 */
	public void receive(final WorkflowEvent event) {
		logger.debug("Received : " + event);
		executor.schedule(new Runnable() {
			public void run() {
				processEvent(event);
			}			
		}, eventProcessingDelay, TimeUnit.MILLISECONDS);
	}
	
	/**
	 * Re-calculates the notification based on the event.
	 *    
	 * @param event
	 */
	@Transactional
	private void processEvent(WorkflowEvent event) {
		// only expects these 4 types of events
		if (!(event instanceof AssetListEvent || event instanceof ProgressEvent || 
				event instanceof TitleStatusUpdatedEvent || event instanceof TitleUpdatedEvent))
			return;
		
		logger.debug("Processing : " + event);
		
		if (event instanceof TitleStatusUpdatedEvent || event instanceof TitleUpdatedEvent) {
			Collection<IAssetList> associatedAssetLists = event instanceof TitleStatusUpdatedEvent ? 
					((TitleStatusUpdatedEvent) event).getTitle().getTitlelists()
					: ((TitleUpdatedEvent) event).getTitle().getTitlelists();
			if (associatedAssetLists == null) {
				return;
			}
			/* For each planner associated with this title, recalculate the notification */
			for (IAssetList associatedAssetList : associatedAssetLists) {
				// a title may be associated with multiple planners.
				if (associatedAssetList.getType() == TitleListType.PLANNER) {
					ISchedulePersistenceService service = ServiceRegistry.getDefault().lookup(
									ISchedulePersistenceService.class);
					Schedule s = service.get(associatedAssetList.getId());								
					reCalculateNotification(s);
				}
			}
		} else {
			List<Schedule> schedules = getSchedules(event);
			if (schedules.isEmpty())
				return;
			for(Schedule s : schedules) {
				reCalculateNotification(s);
			}
		}
	}	

	/* 
	 * Recalculate. Several cases - title got added and is late, a late title is removed, 
	 * progress for a late title arrives, pitch date of a list with late title(s) changes and 
	 * now falls within the 'alert' window etc etc
	 */	
	private void reCalculateNotification(Schedule schedule) {
		// Since the model is generating events, even deleting a schedule sends an event which 
		// we want to ignore.
		if(!schedule.getIsActive()) {
			removeNotification(schedule);
			logger.debug("Schedule[" + schedule.getId() + "] is inactive hence skipping recalculation.");
			return;
		}

		Notification notification = null;
		if (fallsWithinWindow(schedule)) {
			logger.debug(schedule + ", recalculating notification");
			notification = generate(schedule);
		}
		
		removeNotification(schedule);
		// Next scheduler run will take care of logging the warning
		if (notification != null)
			addNotification(notification);
	}
	
	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.core.service.Service#getServiceName()
	 */
	public String getServiceName() {
		return SERVICE_NAME;
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.core.service.ServiceLifecycle#start()
	 */
	public void start() {
		scheduler = new Scheduler<Void>("Notifications", 1, 1);
		scheduler.start();
		
		DefaultMediator.getInstance().register(this);
		
		this.scheduler.schedule(new Runnable() {
			/* (non-Javadoc)
			 * @see java.lang.Runnable#run()
			 */
			public void run() {
				try {
					generate();
				} catch (Exception e) {
					logger.error("Failure during generating notifications", e);
				}
			}
		}, 0, ONE_DAY_MILLIS); /* Hmmmm so ugly. API should take TimeUnit parameter. */
		
		this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
			public Thread newThread(Runnable r) {
				Thread t = Executors.defaultThreadFactory().newThread(r);
				
				t.setName("schedule-notifier");
				t.setDaemon(true);
				return t;
			}
		});
	}

	/* (non-Javadoc)
	 * @see com.tandbergtv.workflow.core.service.ServiceLifecycle#stop()
	 */
	public void stop() {
		scheduler.stop();
		executor.shutdown();
	}
	
	/**
	 * Extracts the schedule using information in the event
	 * 
	 * @param event
	 * @return the schedule, or null if the event is not associated with a schedule
	 */
	private List<Schedule> getSchedules(WorkflowEvent event) {
		List<Long> scheduleIds = new ArrayList<Long>();
		List<Schedule> lists = new ArrayList<Schedule>();

		if (event instanceof AssetListEvent) {
			AssetListEvent e = AssetListEvent.class.cast(event);
			AssetList list = e.getList();
			if (list instanceof Schedule) {
				scheduleIds.add(list.getId());
			}
		} else {
			/*
			 * For progress events if the schedule id is not specified, use the schedules 
			 * associated with this title.
			 */
			ProgressEvent e = ProgressEvent.class.cast(event);
			if (e.getScheduleId() != null) {
				scheduleIds.add(e.getScheduleId());
			} else {
				ITitlePersistenceService titleService = ServiceRegistry.getDefault().
					lookup(ITitlePersistenceService.class);
				Title t = titleService.get(e.getTitleId());
				for(IAssetList list : t.getTitlelists()) {
					/* read only pitch schedules and planners */
					if (list.getType() == TitleListType.PITCH
							|| list.getType() == TitleListType.PLANNER) {
						scheduleIds.add(list.getId());
					}
				}
			}
		}

		/* fetch the complete schedule object */
		if (!scheduleIds.isEmpty()) {
			ISchedulePersistenceService service = ServiceRegistry.getDefault()
					.lookup(ISchedulePersistenceService.class);

			for(Long id : scheduleIds) {
				/*
				 * When creating a schedule,AssetListEvent is sent before even
				 * persisting the schedule. Ideally the model should not be sending events.
				 */
				if(id != null) {
					lists.add(service.get(id));
				}
			}
		}
		return lists;
	}
	
	@Transactional
	private void generate() {
		Collection<Schedule> schedules = getSchedules();

		/*
		 * Get rid of everything. We might be transitioning to a new month, don't want to display
		 * notifications beyond window start.
		 */
		removeNotifications();

		for (Schedule schedule : schedules) {
			Notification notification = generate(schedule);
			
			if (notification != null)
				notify(notification);
		}
	}
	
	private Notification generate(Schedule schedule) {
		/* Check for types of schedules for which notifications are supported */
		if (!(schedule instanceof Planner) && !(schedule instanceof DistributionSchedule))
			return null;
		
		NotificationGeneratorFactory factory = NotificationGeneratorFactory.newInstance();
		INotificationGenerator generator = factory.newGenerator(schedule);
		Notification notification = generator.getNotification(schedule);

		return notification;
	}
	
	/**
	 * Adds the specified notification and also emits its string representation to the log
	 * 
	 * @param notification
	 */
	private void notify(Notification notification) {
		addNotification(notification);
		
		for (String message : notification.getMessages())
			logger.warn(message);
	}
	
	/**
	 * Adds the specified notification
	 * 
	 * @param notification
	 */
	private void addNotification(Notification notification) {
		this.notifications.add(notification);
	}
	
	private void removeNotifications() {
		this.notifications.clear();
	}
	
	/**
	 * Removes notification for the specified schedule
	 * 
	 * @param schedule
	 */
	private void removeNotification(Schedule schedule) {
		Iterator<Notification> i = this.notifications.iterator();
		
		while (i.hasNext()) {
			Notification notification = i.next();
			
			if (schedule.equals(notification.getSchedule())) {
				i.remove();
				break;
			}
		}
	}
	
	private boolean fallsWithinWindow(Schedule schedule) {
		Date date = schedule.getDate();
		
		return (date.after(getWindowStart()) && date.before(getWindowEnd()));
	}
	
	private Collection<Schedule> getSchedules() {
		IScheduleSearchService service = ServiceRegistry.getDefault().lookup(IScheduleSearchService.class);
		// Explicitly getting all pitch schedules and then planners because hibernate version 
		// 3.3.1.GA has a bug related to setting the result start and end limit when searching 
		// for entities given the base class. 
		Collection<Schedule> schedules = service.search(getSearchCriteria(DistributionSchedule.class));
		schedules.addAll(service.search(getSearchCriteria(Planner.class)));
		
		return schedules;
	}
	
	private SearchCriteria getSearchCriteria(Class<?> clazz) {
		SearchCriteria criteria = new SearchCriteria();
		Entity e = new Entity("schedule", clazz, "s");
		
		e.addParameter(new ValueParameter(ACTIVE.toString(), NUMERIC, 1));
		e.addParameter(getDateRangeCriterion());
		e.addParameter(new SortParameter(PITCH_DATE.toString(), SortingOrder.DESCENDING));
		
		criteria.addParameter(e);
		
		return criteria;
	}
	
	private SearchParameterBase getDateRangeCriterion() {
		/* Search for schedules within the (arbitrary :) time window */
		DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
		Date from = getWindowStart();
		RangeParameter range = new RangeParameter(PITCH_DATE.toString(), DATE, formatter.format(from));
		Date to = getWindowEnd();
		
		range.setTo(formatter.format(to));
		
		return range;
	}
	
	private Date getWindowStart() {
		Date today = new Date();
		Calendar calendar = new GregorianCalendar();
		
		calendar.setTime(today);
		calendar.add(Calendar.DATE, (-pastDays));
		
		return calendar.getTime();
	}
	
	private Date getWindowEnd() {
		Date today = new Date();
		
		Calendar calendar = new GregorianCalendar();
		
		calendar.setTime(today);
		calendar.add(Calendar.DATE, futureDays);
		
		return calendar.getTime();
	}
}
