/*
 * Decompiled with CFR 0.152.
 */
package com.tandbergtv.workflow.driver.message.queue;

import com.ericsson.cms.neptune.rabbitmq.workflow.WorkflowQueueType;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.rabbitmq.client.Channel;
import com.tandbergtv.workflow.comm.routing.impl.WorkflowMessageConvertor;
import com.tandbergtv.workflow.core.service.Service;
import com.tandbergtv.workflow.core.service.ServiceRegistry;
import com.tandbergtv.workflow.core.service.thread.Scheduler;
import com.tandbergtv.workflow.dao.hibernate.HibernateUtil;
import com.tandbergtv.workflow.driver.internal.PersistenceServiceFactory;
import com.tandbergtv.workflow.driver.internal.ServiceLookup;
import com.tandbergtv.workflow.driver.internal.WorkflowProcessDTOService;
import com.tandbergtv.workflow.driver.message.queue.FileMessageConsumerFactory;
import com.tandbergtv.workflow.driver.message.queue.IMessageProcessor;
import com.tandbergtv.workflow.driver.message.queue.IQueueInfo;
import com.tandbergtv.workflow.driver.message.queue.MessageProcessorImpl;
import com.tandbergtv.workflow.driver.message.queue.MessageWriter;
import com.tandbergtv.workflow.driver.message.queue.RabbitMQQueueMonitorProvider;
import com.tandbergtv.workflow.driver.message.queue.RejectExecutionMessageExecption;
import com.tandbergtv.workflow.message.WorkflowMessage;
import com.tandbergtv.workflow.message.WorkflowMessageBuilder;
import com.tandbergtv.workflow.message.command.internal.CommandExecutor;
import com.tandbergtv.workflow.message.command.internal.CreateIDCommandExecutorStrategy;
import com.tandbergtv.workflow.message.command.internal.CreateIdFeatureDBImpl;
import com.tandbergtv.workflow.message.consumer.IWorkOrderMessageConsumer;
import com.tandbergtv.workflow.process.ratelimiter.IActiveWorkOrderMonitor;
import com.tandbergtv.workflow.process.ratelimiter.IRateLimiter;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.hibernate.SessionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

public class WorkOrderMessageConsumer
implements InitializingBean,
IWorkOrderMessageConsumer,
DisposableBean {
    private static final Logger LOGGER = LoggerFactory.getLogger(WorkOrderMessageConsumer.class);
    private static final String DEFAULT_MESSAGE_CONSUMER = "Work-Order-Message-Consumer";
    private static final long DEFAULT_WAITING_MILLISECONDS = 2000L;
    private static final String MANUAL_ACKNOWLEDGE = "MANUAL";
    private static final String BY_PASS_QUEUE = "cms.queue.bypass.wo";
    private static final String COMPLETION_QUEUE = "cms.queue.workorder";
    private static final String REDELIVERY_TAG = "redelivery";
    public static final String BY_PASS_WORKFLOW_MESSAGE_CONSUMER = "Work-Order-Message-Consumer-byPass";
    public static final String NORMAL_WORKFLOW_MESSAGE_CONSUMER = "Work-Order-Message-Consumer-normal";
    private long waitingIntervalMillis = 2000L;
    private IMessageProcessor processor;
    private IRateLimiter rateLimiter;
    private IActiveWorkOrderMonitor activeWorkOrderMonitor;
    private IQueueInfo queueInfo;
    private MessageWriter messageWriter;
    private Scheduler<Void> scheduler;
    private Cache<String, AtomicInteger> redeliverCounters = CacheBuilder.newBuilder().maximumSize(10000L).expireAfterWrite(15L, TimeUnit.MINUTES).build();
    @Autowired
    private RabbitAdmin rabbitAdmin;
    @Autowired
    private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;

    public String getServiceName() {
        return WorkOrderMessageConsumer.class.getSimpleName();
    }

    public void start() {
    }

    public void stop() {
    }

    public void startConsume() {
        MessageListenerContainer normalConsumer = this.rabbitListenerEndpointRegistry.getListenerContainer(NORMAL_WORKFLOW_MESSAGE_CONSUMER);
        MessageListenerContainer byPassConsumer = this.rabbitListenerEndpointRegistry.getListenerContainer(BY_PASS_WORKFLOW_MESSAGE_CONSUMER);
        normalConsumer.start();
        byPassConsumer.start();
    }

    public void stopConsume() {
        MessageListenerContainer normalConsumer = this.rabbitListenerEndpointRegistry.getListenerContainer(NORMAL_WORKFLOW_MESSAGE_CONSUMER);
        MessageListenerContainer byPassConsumer = this.rabbitListenerEndpointRegistry.getListenerContainer(BY_PASS_WORKFLOW_MESSAGE_CONSUMER);
        normalConsumer.stop();
        byPassConsumer.stop();
    }

    public void afterPropertiesSet() {
        this.processor = this.createProcessor();
        this.queueInfo = this.createWorkflowMessageQueueInfo();
        this.rateLimiter = ServiceLookup.lookupRateLimiter();
        this.rateLimiter = ServiceLookup.lookupRateLimiter();
        this.activeWorkOrderMonitor = ServiceLookup.lookupActiveWOMonitor();
        this.messageWriter = new MessageWriter(FileMessageConsumerFactory.getOffloadQueueDir());
        this.scheduler = new Scheduler(DEFAULT_MESSAGE_CONSUMER, 1, 1);
        this.scheduler.start();
        this.scheduler.schedule(this::fetchQueueSize, 1000L, 15000L);
        ServiceRegistry.getDefault().register(this.getServiceName(), (Service)this);
    }

    protected void fetchQueueSize() {
        try {
            int queueSize = this.getQueueSize(WorkflowQueueType.COMPLETION.getValue()) + this.getQueueSize(WorkflowQueueType.BY_PASS.getValue());
            if (queueSize >= 0) {
                this.queueInfo.updateQueueSize(queueSize);
            }
        }
        catch (Exception e) {
            LOGGER.info("Failed to get queue size from rabbitMQ:" + ExceptionUtils.getRootCauseMessage((Throwable)e));
        }
    }

    private int getQueueSize(String queueName) {
        Properties queueProperties = this.rabbitAdmin.getQueueProperties(queueName);
        return (Integer)queueProperties.get(RabbitAdmin.QUEUE_MESSAGE_COUNT);
    }

    public void destroy() {
        this.queueInfo.stop();
        this.redeliverCounters.cleanUp();
        this.scheduler.stop();
        LOGGER.debug("Message Consumer is stopped");
    }

    @RabbitListener(id="Work-Order-Message-Consumer-byPass", queues={"cms.queue.bypass.wo"}, ackMode="MANUAL", autoStartup="false")
    public void onByPassMessage(Message<WorkflowMessage> message, Channel channel) {
        this.handleMessage(message, channel);
    }

    @RabbitListener(id="Work-Order-Message-Consumer-normal", queues={"cms.queue.workorder"}, ackMode="MANUAL", autoStartup="false")
    public void onNormalMessage(Message<WorkflowMessage> message, Channel channel) {
        this.handleMessage(message, channel);
    }

    private void handleMessage(Message<WorkflowMessage> message, Channel channel) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("receive workflow message, handling ", (Object)this.msgInfo((WorkflowMessage)message.getPayload()));
        }
        WorkflowMessage workflowMessage = (WorkflowMessage)message.getPayload();
        MessageHeaders headers = message.getHeaders();
        Long deliveryTag = (Long)headers.get((Object)"amqp_deliveryTag", Long.class);
        Boolean redeliveredInProperties = (Boolean)headers.get((Object)"amqp_redelivered", Boolean.class);
        Boolean redeliveryInHeader = (Boolean)headers.get((Object)REDELIVERY_TAG, Boolean.class);
        boolean isRedelivered = redeliveredInProperties != null && redeliveredInProperties == true || redeliveryInHeader != null && redeliveryInHeader == true;
        this.consumeMessage(channel, deliveryTag, isRedelivered, workflowMessage);
    }

    protected void consumeMessage(Channel channel, long deliverTag, Boolean isRedelivered, WorkflowMessage message) {
        try {
            this.validateMessage(message);
            if (this.requireExecutionTokenByRateLimter(message)) {
                this.executeMessage(channel, deliverTag, isRedelivered, message);
            } else {
                LOGGER.debug("Failed to get execution token for new received message, the " + this.msgInfo(message) + " is nacked to RabbitMQ.");
                this.nackMessage(channel, deliverTag);
                this.sleepForNextConsume();
            }
        }
        catch (RejectExecutionMessageExecption e) {
            LOGGER.error("Invalid message from rabbitMQ.", (Throwable)e);
            this.rejectInvalidMessage(channel, deliverTag, e.getWFMessage());
        }
        catch (Exception e) {
            LOGGER.error("Failed to handle message from rabbitMQ.", (Throwable)e);
            this.nackMessage(channel, deliverTag);
            try {
                Thread.sleep(this.waitingIntervalMillis * 5L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    private String msgInfo(WorkflowMessage message) {
        String priorityInfo = message.getSourceComponent() != null ? ", sourceComponent:" + message.getSourceComponent() + ", priority:" + message.getSourceComponent().getQueuePriority() : "";
        return "Message[" + message.getCreateId() + priorityInfo + ", cmd =" + message.getCommandName() + "]";
    }

    private void sleepForNextConsume() {
        try {
            Thread.sleep(this.waitingIntervalMillis);
        }
        catch (InterruptedException e) {
            LOGGER.error("sleepForNextConsume interrupted", (Throwable)e);
        }
    }

    private void validateMessage(WorkflowMessage message) {
        String commandName = message.getCommandName();
        if (this.isInvalidCommand(commandName)) {
            String error = "The Message[" + message.getCreateId() + "] with command type :" + message.getCommandName() + " is not supported, reject the handling";
            throw new RejectExecutionMessageExecption(error);
        }
        if (this.isDuplicatedRecoverCommand(message, commandName)) {
            String error = "The Message[" + message.getCreateId() + "] is duplicated in current handling queue, reject the handling";
            throw new RejectExecutionMessageExecption(error, message);
        }
    }

    private boolean isDuplicatedRecoverCommand(WorkflowMessage message, String commandName) {
        return "recover".equals(commandName) && this.activeWorkOrderMonitor.isInMonitor(message.getCreateId());
    }

    private boolean isInvalidCommand(String commandName) {
        return !WorkflowMessageBuilder.SUPPORT_COMMANDS.contains(commandName);
    }

    private boolean requireExecutionTokenByRateLimter(WorkflowMessage message) {
        return this.rateLimiter.requireExecution(message);
    }

    protected void resetReceivedMessage(Channel channel, long deliveryTag, WorkflowMessage message) {
        try {
            String createId = message.getCreateId();
            this.decountActiveMonitor(createId);
            AtomicInteger counter = (AtomicInteger)this.redeliverCounters.get((Object)createId, () -> new AtomicInteger(0));
            if (counter.getAndIncrement() >= 20) {
                counter.set(0);
                this.rejectOverRedeliverMessage(channel, deliveryTag, message);
            } else {
                this.nackMessage(channel, deliveryTag);
            }
        }
        catch (Exception exception) {
            LOGGER.debug("Cannot reset the received message ", (Throwable)exception);
            throw new IllegalStateException("Cannot reset the received message ", exception);
        }
    }

    protected void executeMessage(Channel channel, long deliveryTag, Boolean isRedelivered, WorkflowMessage message) {
        LOGGER.debug("Got execution token from RateLimter, handling  " + this.msgInfo(message));
        try {
            boolean success = this.processor.processMessage(message, isRedelivered);
            if (success) {
                LOGGER.debug("Created new Work Order by " + this.msgInfo(message));
                this.ackMessage(channel, deliveryTag);
                this.queueInfo.handledOneMessage(message.getDelayTime());
            } else {
                this.resetReceivedMessage(channel, deliveryTag, message);
            }
        }
        catch (RejectExecutionMessageExecption e) {
            LOGGER.error(this.msgInfo(message) + " is rejected." + e.getMessage());
            this.rejectReceivedMessage(channel, deliveryTag, message, e.isMoveMessageToRejectFolder());
        }
        catch (Exception exception) {
            LOGGER.error("Something wrong when handling message [" + message.getCreateId() + "] to work order. ", (Throwable)exception);
            this.resetReceivedMessage(channel, deliveryTag, message);
        }
    }

    private void rejectReceivedMessage(Channel channel, long deliveryTag, WorkflowMessage message, boolean writeRejectedMessage) {
        this.decountActiveMonitor(message.getCreateId());
        if (writeRejectedMessage) {
            this.writeRejectedMessage(message);
        }
        this.ackMessage(channel, deliveryTag);
    }

    private void decountActiveMonitor(String createId) {
        try {
            this.activeWorkOrderMonitor.removeActiveWO(createId);
        }
        catch (Exception e) {
            LOGGER.warn("fail to remove active wo from monitor:" + e.getMessage());
        }
    }

    private void rejectOverRedeliverMessage(Channel channel, long deliverTag, WorkflowMessage message) {
        LOGGER.error(this.msgInfo(message) + " is redelivered more than 20 times to execute and get a error ,reject the handling");
        this.rejectInvalidMessage(channel, deliverTag, message);
    }

    private void rejectInvalidMessage(Channel channel, long deliverTag, WorkflowMessage message) {
        if (message != null) {
            this.writeRejectedMessage(message);
        }
        this.ackMessage(channel, deliverTag);
    }

    protected boolean ackMessage(Channel channel, long deliverTag) {
        try {
            channel.basicAck(deliverTag, false);
        }
        catch (Exception exception) {
            LOGGER.debug("Failed to send ack to rabbitMQ: ", (Throwable)exception);
        }
        return true;
    }

    protected boolean nackMessage(Channel channel, long deliverTag) {
        try {
            channel.basicNack(deliverTag, false, true);
        }
        catch (Exception exception) {
            LOGGER.debug("Failed to send nack to rabbitMQ: ", (Throwable)exception);
        }
        return true;
    }

    private void writeRejectedMessage(WorkflowMessage message) {
        try {
            String messageId = message.getCreateId();
            String messageXML = new WorkflowMessageConvertor().asString(message);
            String path = this.messageWriter.writeRejectedMessage(messageId, messageXML);
            LOGGER.info("Rejected " + this.msgInfo(message) + " is moved to : " + path);
        }
        catch (Exception e) {
            LOGGER.warn("Failed to write the rejected mesage " + e.getMessage());
        }
    }

    private IMessageProcessor createProcessor() {
        CommandExecutor cmdExecutor = (CommandExecutor)ServiceRegistry.getDefault().lookup(CommandExecutor.class);
        SessionFactory sessionFactory = HibernateUtil.getSessionFactory();
        WorkflowProcessDTOService processDtoService = PersistenceServiceFactory.createProcessDtoService((SessionFactory)sessionFactory);
        CreateIDCommandExecutorStrategy strategy = new CreateIDCommandExecutorStrategy(new CreateIdFeatureDBImpl(processDtoService));
        return new MessageProcessorImpl(cmdExecutor, strategy);
    }

    private IQueueInfo createWorkflowMessageQueueInfo() {
        IQueueInfo workflowMessageQueueInfo = (IQueueInfo)new RabbitMQQueueMonitorProvider().createMonitor();
        workflowMessageQueueInfo.start();
        return workflowMessageQueueInfo;
    }
}

