/*
 * Decompiled with CFR 0.152.
 */
package com.tandbergtv.workflow.resourcemanager.internal.event;

import com.ericsson.cms.neptune.cluster.service.IDistributedSchedulerService;
import com.tandbergtv.workflow.core.event.WorkflowEvent;
import com.tandbergtv.workflow.core.service.cache.ICacheService;
import com.tandbergtv.workflow.core.service.cache.PartitionEvent;
import com.tandbergtv.workflow.core.service.thread.IRecoverableSchedulerService;
import com.tandbergtv.workflow.core.service.thread.ISchedulerService;
import com.tandbergtv.workflow.core.service.thread.ITaskCompletionListener;
import com.tandbergtv.workflow.core.service.thread.Scheduler;
import com.tandbergtv.workflow.driver.event.NodeTimeoutEvent;
import com.tandbergtv.workflow.driver.event.WorkflowProcessEvent;
import com.tandbergtv.workflow.driver.event.message.TaskCompleteEvent;
import com.tandbergtv.workflow.driver.event.message.TaskStatusEvent;
import com.tandbergtv.workflow.driver.event.message.TaskUpdateEvent;
import com.tandbergtv.workflow.pluginmanager.event.ResourceGroupDescriptorChangeEvent;
import com.tandbergtv.workflow.pluginmanager.event.ResourceTypeDescriptorChangeEvent;
import com.tandbergtv.workflow.resourcemanager.entities.ResourceConsumerKey;
import com.tandbergtv.workflow.resourcemanager.entities.ResourceType;
import com.tandbergtv.workflow.resourcemanager.event.AcquireResourceEvent;
import com.tandbergtv.workflow.resourcemanager.event.IResourceConsumerAware;
import com.tandbergtv.workflow.resourcemanager.event.ReleaseResourceEvent;
import com.tandbergtv.workflow.resourcemanager.event.ResourceDeletedEvent;
import com.tandbergtv.workflow.resourcemanager.event.ResourceGroupCachedEvent;
import com.tandbergtv.workflow.resourcemanager.event.ResourceGroupSubscribeEvent;
import com.tandbergtv.workflow.resourcemanager.event.ResourceGroupUnsubscribeEvent;
import com.tandbergtv.workflow.resourcemanager.event.ResourceOnlineEvent;
import com.tandbergtv.workflow.resourcemanager.event.ResourceTypeCachedEvent;
import com.tandbergtv.workflow.resourcemanager.internal.ILocatorService;
import com.tandbergtv.workflow.resourcemanager.internal.ResourceManagementService;
import com.tandbergtv.workflow.resourcemanager.internal.event.DistributedHandleEventCallable;
import com.tandbergtv.workflow.resourcemanager.internal.event.HandleEventCallable;
import com.tandbergtv.workflow.resourcemanager.internal.event.HandleEventCallableQueue;
import com.tandbergtv.workflow.resourcemanager.internal.event.IEventHandler;
import com.tandbergtv.workflow.resourcemanager.internal.event.ILocalEventScheduler;
import com.tandbergtv.workflow.resourcemanager.internal.event.LocalHandleEventCallable;
import com.tandbergtv.workflow.resourcemanager.internal.event.ResourceManagementEventKey;
import com.tandbergtv.workflow.resourcemanager.internal.event.proxy.NodeTimeoutEventProxy;
import com.tandbergtv.workflow.resourcemanager.internal.event.proxy.ReleaseResourceEventProxy;
import com.tandbergtv.workflow.resourcemanager.internal.event.proxy.TaskCompleteEventProxy;
import com.tandbergtv.workflow.resourcemanager.internal.event.proxy.TaskUpdateEventProxy;
import com.tandbergtv.workflow.resourcemanager.internal.event.proxy.WorkflowProcessEventProxy;
import com.tandbergtv.workflow.resourcemanager.mgmt.heartbeat.ResourceHeartBeatEvent;
import com.tandbergtv.workflow.resourcemanager.mgmt.initialization.ResourceInitializationEvent;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;

public class DistributedEventHandler
extends ResourceManagementService
implements IEventHandler {
    private static final Logger LOGGER = Logger.getLogger(DistributedEventHandler.class);
    private static final String RESOURCE_EVENT_SCHEDULER = "resource-events";
    private static final String QUEUED_TASKS_DRAIN_SCHEDULER = "resource-events-queueDrain";
    private static final String COMPLETED_TASKS_SCHEDULER = "resource-events-completionMonitor";
    private static final String CONSUMER_EVENT_QUEUE_PREFIX = "resource-consumer-";
    private static final String PLUGIN_UPDATE_EVENT_QUEUE = "resource-pluginUpdate";
    private final AtomicBoolean running = new AtomicBoolean(false);
    private ISchedulerService<Void> eventProcessor;
    private ISchedulerService<Void> callableQueueProcessor;
    private ISchedulerService<Void> completedCallablesMonitor;
    private BlockingQueue<HandleEventCallable> completionQueue;
    private ConcurrentMap<String, HandleEventCallableQueue> callableQueues;
    private final ITaskCompletionListener completionListener = new TaskCompletionListener();

    @Override
    public void handleEvent(WorkflowEvent event) {
        long start = System.currentTimeMillis();
        if (!this.mustHandleEvent(event)) {
            return;
        }
        event = this.updateEvent(event);
        this.eventProcessor.schedule((Callable)new ProcessEventCallable(event));
        this.log(event, "received", start);
    }

    @Override
    public void start() {
        this.running.set(true);
        this.callableQueues = new ConcurrentHashMap<String, HandleEventCallableQueue>();
        this.completionQueue = new LinkedBlockingQueue<HandleEventCallable>();
        this.getLocalEventScheduler().addTaskCompletionListener(this.completionListener);
        this.getRecoverableEventScheduler().addTaskCompletionListener(this.completionListener);
        this.completedCallablesMonitor = new Scheduler(COMPLETED_TASKS_SCHEDULER, 2, 5);
        this.completedCallablesMonitor.start();
        this.callableQueueProcessor = new Scheduler(QUEUED_TASKS_DRAIN_SCHEDULER, 1, 5);
        this.callableQueueProcessor.start();
        this.eventProcessor = new Scheduler(RESOURCE_EVENT_SCHEDULER, 1, 1);
        this.eventProcessor.start();
        this.completedCallablesMonitor.schedule((Callable)new CompletedCallablesMonitorCallable());
    }

    @Override
    public void stop() {
        this.running.set(false);
        this.eventProcessor.stop();
        this.callableQueueProcessor.stop();
        this.completedCallablesMonitor.stop();
        this.getLocalEventScheduler().removeTaskCompletionListener(this.completionListener);
        this.getRecoverableEventScheduler().removeTaskCompletionListener(this.completionListener);
        this.completionQueue.clear();
        this.callableQueues.clear();
    }

    @Override
    public String getServiceName() {
        return "ResourceManagementDistributedEventHandler";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processEvent(WorkflowEvent event) {
        long start = System.currentTimeMillis();
        ResourceManagementEventKey key = this.getEventKey(event);
        boolean isLocalEvent = key.getResourceTypeId() == null;
        HandleEventCallable callable = isLocalEvent ? new LocalHandleEventCallable(key, event) : new DistributedHandleEventCallable(key, event);
        String queueId = callable.getEventKey().getQueueId();
        if (queueId != null) {
            boolean submitted = false;
            while (!submitted) {
                HandleEventCallableQueue queue = new HandleEventCallableQueue(queueId);
                HandleEventCallableQueue oldQueue = this.callableQueues.putIfAbsent(queueId, queue);
                if (oldQueue != null) {
                    queue = oldQueue;
                }
                queue.getLock().lock();
                try {
                    if (queue.isDisposed()) continue;
                    queue.submit(callable);
                    submitted = true;
                }
                finally {
                    queue.getLock().unlock();
                }
            }
            this.scheduleProcessCallableQueue(queueId);
        } else {
            this.scheduleHandleEventCallable(callable);
        }
        this.log(event, "prepared", start);
    }

    private void scheduleProcessCallableQueue(String queueId) {
        this.callableQueueProcessor.schedule((Callable)new ProcessCallableQueueCallable(queueId));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processCallableQueue(String queueId) {
        long start = System.currentTimeMillis();
        WorkflowEvent event = null;
        boolean processed = false;
        while (!processed) {
            HandleEventCallableQueue callableQueue = (HandleEventCallableQueue)this.callableQueues.get(queueId);
            if (callableQueue == null) {
                processed = true;
                continue;
            }
            callableQueue.getLock().lock();
            try {
                if (callableQueue.isDisposed()) continue;
                HandleEventCallable callable = callableQueue.getTaskForProcessing();
                if (callable != null) {
                    this.scheduleHandleEventCallable(callable);
                    event = callable.getEvent();
                }
                processed = true;
            }
            finally {
                callableQueue.getLock().unlock();
            }
        }
        if (event != null) {
            this.log(event, "drained", start);
        }
    }

    private void scheduleHandleEventCallable(HandleEventCallable callable) {
        long start = System.currentTimeMillis();
        if (callable instanceof LocalHandleEventCallable) {
            this.getLocalEventScheduler().schedule(callable);
        } else {
            this.getDistributedEventScheduler().schedule((Callable)callable);
        }
        this.log(callable.getEvent(), "scheduled", start);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleCallableCompletedNotification(HandleEventCallable callable) {
        HandleEventCallableQueue callableQueue;
        long start = System.currentTimeMillis();
        String queueId = callable.getEventKey().getQueueId();
        if (queueId != null && (callableQueue = (HandleEventCallableQueue)this.callableQueues.get(queueId)) != null) {
            callableQueue.getLock().lock();
            try {
                callableQueue.notifyCallableCompleted(callable);
                if (callableQueue.isBusy()) {
                    this.scheduleProcessCallableQueue(queueId);
                } else {
                    callableQueue.dispose();
                    this.callableQueues.remove(queueId);
                }
            }
            finally {
                callableQueue.getLock().unlock();
            }
        }
        this.log(callable.getEvent(), "completed", start);
    }

    private void log(WorkflowEvent event, String category, long start) {
        long duration = System.currentTimeMillis() - start;
        LOGGER.debug((Object)("[RM-Events] " + event + " - " + category + " - " + duration + " ms."));
    }

    private boolean mustHandleEvent(WorkflowEvent event) {
        boolean handle = false;
        if (event instanceof AcquireResourceEvent || event instanceof ReleaseResourceEvent || event instanceof TaskStatusEvent) {
            handle = true;
        } else if (event instanceof WorkflowProcessEvent) {
            switch (((WorkflowProcessEvent)event).getType()) {
                case PAUSED: 
                case CANCELLED: 
                case PRIORITY_CHANGED: 
                case FAILED: 
                case CRASHED: 
                case TIMEOUT: {
                    handle = true;
                }
            }
        } else if (event instanceof PartitionEvent) {
            handle = event.getSource().equals(this.getResourceTypeCache());
        } else if (event instanceof ResourceTypeDescriptorChangeEvent || event instanceof ResourceGroupDescriptorChangeEvent) {
            handle = true;
        } else if (event instanceof ResourceHeartBeatEvent || event instanceof ResourceInitializationEvent) {
            handle = true;
        } else if (event instanceof ResourceDeletedEvent || event instanceof ResourceTypeCachedEvent || event instanceof ResourceGroupCachedEvent) {
            handle = true;
        } else if (event instanceof ResourceOnlineEvent) {
            handle = true;
        } else if (event instanceof ResourceGroupSubscribeEvent || event instanceof ResourceGroupUnsubscribeEvent) {
            handle = true;
        }
        return handle;
    }

    private WorkflowEvent updateEvent(WorkflowEvent event) {
        if (event instanceof NodeTimeoutEvent) {
            event = new NodeTimeoutEventProxy((NodeTimeoutEvent)event);
        } else if (event instanceof WorkflowProcessEvent) {
            event = new WorkflowProcessEventProxy((WorkflowProcessEvent)event);
        } else if (event instanceof TaskUpdateEvent) {
            event = new TaskUpdateEventProxy((TaskUpdateEvent)event);
        } else if (event instanceof TaskCompleteEvent) {
            this.logEventDebugInfo((WorkflowEvent)event);
            event = new TaskCompleteEventProxy((TaskCompleteEvent)event);
        } else if (event instanceof ReleaseResourceEvent) {
            event = new ReleaseResourceEventProxy((ReleaseResourceEvent)event);
        }
        return event;
    }

    private void logEventDebugInfo(WorkflowEvent event) {
        if (event instanceof TaskCompleteEvent) {
            TaskCompleteEvent e = (TaskCompleteEvent)event;
            long cost = System.currentTimeMillis() - event.getCreateTime();
            if (cost > 200L) {
                LOGGER.debug((Object)("PERF:Received a TaskCompleteEvent, external resource allocation for " + e.getToken() + "now, resource: " + e.getMessage().getSource() + ", delay time:" + cost + " ms."));
            } else {
                LOGGER.debug((Object)("Received a TaskCompleteEvent, token " + e.getToken() + " now, resource: " + e.getMessage().getSource()));
            }
        }
    }

    private ResourceManagementEventKey getEventKey(WorkflowEvent event) {
        Long resourceTypeId = null;
        Long resourceGroupId = null;
        String queueId = null;
        if (event instanceof IResourceConsumerAware) {
            ResourceConsumerKey key = ((IResourceConsumerAware)event).getResourceConsumerKey();
            resourceGroupId = key.getResourceGroupId();
            queueId = CONSUMER_EVENT_QUEUE_PREFIX + key.getId();
        } else if (event instanceof WorkflowProcessEvent) {
            resourceGroupId = ((WorkflowProcessEvent)event).getResourceGroupId();
            long tokenId = ((WorkflowProcessEvent)event).getToken().getId();
            queueId = CONSUMER_EVENT_QUEUE_PREFIX + tokenId;
        } else if (event instanceof TaskStatusEvent) {
            resourceGroupId = ((TaskStatusEvent)event).getResourceGroupId();
            queueId = CONSUMER_EVENT_QUEUE_PREFIX + ((TaskStatusEvent)event).getToken().getId();
        } else if (event instanceof ResourceTypeCachedEvent) {
            resourceTypeId = ((ResourceTypeCachedEvent)event).getResourceTypeId();
        } else if (event instanceof ResourceGroupCachedEvent) {
            resourceTypeId = ((ResourceGroupCachedEvent)event).getResourceTypeId();
        } else if (event instanceof ResourceGroupSubscribeEvent) {
            resourceGroupId = ((ResourceGroupSubscribeEvent)event).getResourceGroupId();
        } else if (event instanceof ResourceGroupUnsubscribeEvent) {
            resourceGroupId = ((ResourceGroupUnsubscribeEvent)event).getResourceGroupId();
        } else if (event instanceof ResourceTypeDescriptorChangeEvent || event instanceof ResourceGroupDescriptorChangeEvent) {
            queueId = PLUGIN_UPDATE_EVENT_QUEUE;
        }
        if (resourceGroupId != null) {
            resourceTypeId = this.getLocatorService().getResourceTypeIdForGroup(resourceGroupId);
        }
        return new ResourceManagementEventKey(resourceTypeId, queueId);
    }

    private ILocalEventScheduler getLocalEventScheduler() {
        return (ILocalEventScheduler)this.getService("resource-localEvents");
    }

    private IDistributedSchedulerService<Void> getDistributedEventScheduler() {
        return (IDistributedSchedulerService)this.getService("WFS:ResourceManagementEventScheduler");
    }

    private IRecoverableSchedulerService<Void> getRecoverableEventScheduler() {
        return (IRecoverableSchedulerService)this.getService("WFS:ResourceManagementEventScheduler");
    }

    private ICacheService<ResourceType> getResourceTypeCache() {
        return (ICacheService)this.getService("WFS:ResourceTypeCache");
    }

    private ILocatorService getLocatorService() {
        return this.getService(ILocatorService.class);
    }

    private final class TaskCompletionListener
    implements ITaskCompletionListener {
        private TaskCompletionListener() {
        }

        public <T> void onTaskCompleted(Callable<T> task, Future<T> future) {
            long start = System.currentTimeMillis();
            if (task instanceof HandleEventCallable) {
                HandleEventCallable callable = (HandleEventCallable)task;
                if (callable.getEventKey().getQueueId() != null) {
                    DistributedEventHandler.this.completionQueue.offer(callable);
                }
                DistributedEventHandler.this.log(callable.getEvent(), "notifiedComplete", start);
            }
        }

        public void onTaskCompleted(Runnable task, Future<?> future) {
        }
    }

    private final class HandleCallableCompletedNotificationCallable
    implements Callable<Void> {
        private final HandleEventCallable completedCallable;

        public HandleCallableCompletedNotificationCallable(HandleEventCallable callable) {
            this.completedCallable = callable;
        }

        @Override
        public Void call() throws Exception {
            try {
                DistributedEventHandler.this.handleCallableCompletedNotification(this.completedCallable);
            }
            catch (Exception e) {
                String msg = "Error handling completion notification for event callable " + this.completedCallable;
                LOGGER.debug((Object)msg, (Throwable)e);
            }
            return null;
        }
    }

    private final class CompletedCallablesMonitorCallable
    implements Callable<Void> {
        private CompletedCallablesMonitorCallable() {
        }

        @Override
        public Void call() throws Exception {
            while (DistributedEventHandler.this.running.get()) {
                HandleEventCallable completedCallable = null;
                try {
                    completedCallable = (HandleEventCallable)DistributedEventHandler.this.completionQueue.take();
                    DistributedEventHandler.this.completedCallablesMonitor.schedule((Callable)new HandleCallableCompletedNotificationCallable(completedCallable));
                }
                catch (Exception e) {
                    String msg = "Error handling completion notification for event callable " + completedCallable;
                    LOGGER.debug((Object)msg, (Throwable)e);
                }
            }
            return null;
        }
    }

    private final class ProcessCallableQueueCallable
    implements Callable<Void> {
        private final String queueId;

        public ProcessCallableQueueCallable(String queueId) {
            this.queueId = queueId;
        }

        @Override
        public Void call() throws Exception {
            try {
                DistributedEventHandler.this.processCallableQueue(this.queueId);
            }
            catch (Exception e) {
                LOGGER.error((Object)("Error processing resource event queue[" + this.queueId + "]"), (Throwable)e);
            }
            return null;
        }
    }

    private final class ProcessEventCallable
    implements Callable<Void> {
        private WorkflowEvent event;

        public ProcessEventCallable(WorkflowEvent event) {
            this.event = event;
        }

        @Override
        public Void call() throws Exception {
            try {
                DistributedEventHandler.this.processEvent(this.event);
            }
            catch (Throwable e) {
                LOGGER.error((Object)("Error while handling event " + this.event.getClass().getSimpleName() + " in the Resource Manager."), e);
            }
            return null;
        }
    }
}

