/*
 * Decompiled with CFR 0.152.
 */
package com.tandbergtv.workflow.driver.message.queue;

import com.ericsson.cms.neptune.rabbitmq.workflow.WorkflowQueueType;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import com.tandbergtv.workflow.comm.routing.impl.WorkflowMessageConvertor;
import com.tandbergtv.workflow.core.service.Service;
import com.tandbergtv.workflow.core.service.thread.Scheduler;
import com.tandbergtv.workflow.driver.message.queue.FileMessageConsumerFactory;
import com.tandbergtv.workflow.driver.message.queue.IMessageProcessor;
import com.tandbergtv.workflow.driver.message.queue.IQueueInfo;
import com.tandbergtv.workflow.driver.message.queue.MessageReceiver;
import com.tandbergtv.workflow.driver.message.queue.MessageWriter;
import com.tandbergtv.workflow.driver.message.queue.RejectExecutionMessageExecption;
import com.tandbergtv.workflow.message.WorkflowMessage;
import com.tandbergtv.workflow.message.WorkflowMessageBuilder;
import com.tandbergtv.workflow.message.producer.MessageConvertor;
import com.tandbergtv.workflow.process.ratelimiter.IActiveWorkOrderMonitor;
import com.tandbergtv.workflow.process.ratelimiter.IRateLimiter;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.log4j.Logger;
import org.springframework.util.CollectionUtils;

public class WorkOrderMessageConsumer
implements Service {
    private static final Logger LOGGER = Logger.getLogger(WorkOrderMessageConsumer.class);
    private static final String DEFAULT_MESSAGE_CONSUMER = "Work-Order-Message-Consumer";
    public static final String DEFAULT_ENCODER = "UTF-8";
    public static final long DEFAULT_WAITING_MILLISECONDS = 2000L;
    private long waitingIntervalMillis = 2000L;
    private long noDataSleepInterval = 100L;
    private boolean isStopped = false;
    private IMessageProcessor processor;
    private MessageReceiver messageReceiver;
    private IRateLimiter rateLimiter;
    private IActiveWorkOrderMonitor activeWorkOrderMonitor;
    private MessageConvertor messageConvertor;
    private Scheduler<Void> scheduler;
    private IQueueInfo queueInfo;
    private MessageWriter messageWriter;
    private Cache<String, AtomicInteger> redeliverCounters = CacheBuilder.newBuilder().maximumSize(10000L).expireAfterWrite(15L, TimeUnit.MINUTES).build();

    public String getServiceName() {
        return DEFAULT_MESSAGE_CONSUMER;
    }

    public void start() {
        this.messageWriter = new MessageWriter(FileMessageConsumerFactory.getOffloadQueueDir());
        this.messageReceiver.start();
        this.scheduler = new Scheduler(DEFAULT_MESSAGE_CONSUMER, 3, 3);
        this.scheduler.start();
        LOGGER.debug((Object)("Start to pull message from RabbitMQ by service: " + this.getServiceName()));
        this.scheduler.schedule(() -> this.handleByPassMessages());
        this.scheduler.schedule(() -> this.handleNormalMessages(), 5000L);
        this.scheduler.schedule(() -> this.fetchQueueSize(), 1000L, 15000L);
        this.queueInfo.start();
    }

    protected Void handleNormalMessages() {
        this.pullMessage(false);
        return null;
    }

    protected Void handleByPassMessages() {
        this.pullMessage(true);
        return null;
    }

    protected void fetchQueueSize() {
        try {
            int queueSize = this.messageReceiver.getQueueSize(WorkflowQueueType.COMPLETION) + this.messageReceiver.getQueueSize(WorkflowQueueType.BY_PASS);
            if (queueSize >= 0) {
                this.queueInfo.updateQueueSize(queueSize);
            }
        }
        catch (Exception e) {
            LOGGER.info((Object)("Failed to get queue size from rabbitMQ:" + ExceptionUtils.getRootCauseMessage((Throwable)e)));
        }
    }

    public void stop() {
        this.isStopped = true;
        this.scheduler.stop();
        this.messageReceiver.stop();
        this.queueInfo.stop();
        this.redeliverCounters.cleanUp();
        LOGGER.debug((Object)"Message Consumer is stopped");
    }

    protected void pullMessage(boolean filterByPassComponent) {
        while (!this.isStopped) {
            GetResponse response = null;
            long deliverTag = 0L;
            try {
                response = this.fetchMessage(filterByPassComponent);
                if (response == null || response.getEnvelope() == null) continue;
                deliverTag = response.getEnvelope().getDeliveryTag();
                WorkflowMessage message = this.extractValidMessage(response);
                if (this.requireExecutionTokenByRateLimter(message)) {
                    this.executeMessage(response.getEnvelope(), message);
                    continue;
                }
                LOGGER.debug((Object)("Cannot create new Work Order now, the execution threshold has been reached. Message[" + message.getCreateId() + "] is nacked"));
                this.nackMessage(deliverTag);
                this.waitUntilExecutionTokenOpened();
            }
            catch (RejectExecutionMessageExecption e) {
                LOGGER.error((Object)"Invalid message from rabbitMQ.", (Throwable)e);
                this.rejectInvalidMessage(deliverTag, e.getWFMessage());
            }
            catch (Exception e) {
                LOGGER.error((Object)"Failed to handle message from rabbitMQ.", (Throwable)e);
                this.nackMessage(deliverTag);
                try {
                    Thread.sleep(this.waitingIntervalMillis * 5L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
    }

    private void waitUntilExecutionTokenOpened() {
        while (this.rateLimiter.reachThreshold()) {
            try {
                Thread.sleep(this.waitingIntervalMillis);
            }
            catch (InterruptedException e) {
                return;
            }
        }
    }

    private WorkflowMessage extractValidMessage(GetResponse response) {
        WorkflowMessage message = null;
        try {
            String msgBody = new String(response.getBody(), DEFAULT_ENCODER);
            message = this.messageConvertor.asWorkflowMessage(msgBody);
        }
        catch (Exception e) {
            throw new RejectExecutionMessageExecption("Cannot parse the message :", e);
        }
        String commandName = message.getCommandName();
        if (this.isInvalidCommand(commandName)) {
            String error = "The message[" + message.getCreateId() + "] with command type :" + message.getCommandName() + " is not supported, reject the handling";
            throw new RejectExecutionMessageExecption(error);
        }
        if (this.isDuplicatedRecoverCommand(message, commandName)) {
            String error = "The message[" + message.getCreateId() + "] is duplicated in current handling queue, reject the handling";
            throw new RejectExecutionMessageExecption(error, message);
        }
        return message;
    }

    private boolean isDuplicatedRecoverCommand(WorkflowMessage message, String commandName) {
        return "recover".equals(commandName) && this.activeWorkOrderMonitor.isInMonitor(message.getCreateId());
    }

    private boolean isInvalidCommand(String commandName) {
        return !WorkflowMessageBuilder.SUPPORT_COMMANDS.contains(commandName);
    }

    private boolean requireExecutionTokenByRateLimter(WorkflowMessage message) {
        return this.rateLimiter.requireExecution(message);
    }

    protected void resetReceivedMessage(Envelope envelope, WorkflowMessage message) {
        try {
            String createId = message.getCreateId();
            this.decountActiveMonitor(createId);
            AtomicInteger counter = (AtomicInteger)this.redeliverCounters.get((Object)createId, () -> new AtomicInteger(0));
            if (counter.getAndIncrement() >= 20) {
                counter.set(0);
                this.rejectOverRedeliverMessage(envelope.getDeliveryTag(), message);
            } else {
                this.nackMessage(envelope.getDeliveryTag());
            }
        }
        catch (Exception exception) {
            LOGGER.debug((Object)"Cannot reset the received message ", (Throwable)exception);
            throw new IllegalStateException("Cannot reset the received message ", exception);
        }
    }

    protected void executeMessage(Envelope envelope, WorkflowMessage message) {
        LOGGER.debug((Object)("Got execution token from RateLimter, handling  message, cmd =" + message.getCommandName() + " id = " + message.getCreateId()));
        try {
            boolean success = this.processor.processMessage(message, envelope.isRedeliver());
            if (success) {
                this.ackMessage(envelope.getDeliveryTag());
                this.queueInfo.handledOneMessage(message.getDelayTime());
            } else {
                this.resetReceivedMessage(envelope, message);
            }
        }
        catch (RejectExecutionMessageExecption e) {
            LOGGER.error((Object)("Message [" + message.getCreateId() + "] is rejected." + e.getMessage()));
            this.rejectReceivedMessage(envelope, message, e.isMoveMessageToRejectFolder());
        }
        catch (Exception exception) {
            LOGGER.error((Object)("Something wrong when handling message [" + message.getCreateId() + "] to work order. "), (Throwable)exception);
            this.resetReceivedMessage(envelope, message);
        }
    }

    private void rejectReceivedMessage(Envelope envelope, WorkflowMessage message, boolean writeRejectedMessage) {
        this.decountActiveMonitor(message.getCreateId());
        if (writeRejectedMessage) {
            this.writeRejectedMessage(message);
        }
        this.ackMessage(envelope.getDeliveryTag());
    }

    private void decountActiveMonitor(String createId) {
        try {
            this.activeWorkOrderMonitor.removeActiveWO(createId);
        }
        catch (Exception e) {
            LOGGER.warn((Object)("fail to remove active wo from monitor:" + e.getMessage()));
        }
    }

    private void rejectOverRedeliverMessage(long deliverTag, WorkflowMessage message) {
        LOGGER.error((Object)("The message[" + message.getCreateId() + "] is redelivered more than 20 times to execute and get a error ,reject the handling"));
        this.rejectInvalidMessage(deliverTag, message);
    }

    private void rejectInvalidMessage(long deliverTag, WorkflowMessage message) {
        if (message != null) {
            this.writeRejectedMessage(message);
        }
        this.ackMessage(deliverTag);
    }

    protected boolean ackMessage(long deliverTag) {
        try {
            this.messageReceiver.getChannel().basicAck(deliverTag, false);
        }
        catch (Exception exception) {
            LOGGER.debug((Object)"Failed to send ack to rabbitMQ: ", (Throwable)exception);
        }
        return true;
    }

    protected boolean nackMessage(long deliverTag) {
        try {
            this.messageReceiver.getChannel().basicNack(deliverTag, false, true);
        }
        catch (Exception exception) {
            LOGGER.debug((Object)"Failed to send nack to rabbitMQ: ", (Throwable)exception);
        }
        return true;
    }

    private void writeRejectedMessage(WorkflowMessage message) {
        try {
            String messageId = message.getCreateId();
            String messageXML = new WorkflowMessageConvertor().asString(message);
            String path = this.messageWriter.writeRejectedMessage(messageId, messageXML);
            LOGGER.info((Object)("Rejected Message [" + message.getCreateId() + "] is moved to : " + path));
        }
        catch (Exception e) {
            LOGGER.warn((Object)("Failed to write the rejected mesage " + e.getMessage()));
        }
    }

    private GetResponse fetchMessage(boolean filterByPassComponent) {
        return filterByPassComponent ? this.fetchByPassMessage() : this.fetchMessage();
    }

    private GetResponse fetchByPassMessage() {
        GetResponse response = null;
        do {
            try {
                if (!CollectionUtils.isEmpty((Collection)this.rateLimiter.getByPassComponents())) {
                    response = this.messageReceiver.nextMessage(WorkflowQueueType.BY_PASS);
                }
            }
            catch (Exception e) {
                LOGGER.error((Object)"Failed to receive message from RabbitMQ", (Throwable)e);
            }
            if (response != null) {
                LOGGER.info((Object)"Find a byPass message by Filter key:");
                break;
            }
            try {
                Thread.sleep(this.noDataSleepInterval);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        } while (!this.isStopped);
        return response;
    }

    private GetResponse fetchMessage() {
        GetResponse response = null;
        long retryCounter = 0L;
        do {
            try {
                response = this.messageReceiver.nextMessage(WorkflowQueueType.COMPLETION);
                if (response != null) {
                    break;
                }
            }
            catch (Exception e) {
                LOGGER.error((Object)"Failed to receive message from RabbitMQ", (Throwable)e);
            }
            try {
                if (++retryCounter % 1000L == 0L) {
                    LOGGER.debug((Object)"Cannot find message from rabbit MQ around 100 seconds.");
                    retryCounter = 0L;
                }
                Thread.sleep(this.noDataSleepInterval);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        } while (!this.isStopped);
        return response;
    }

    public long getWaitingIntervalMilli() {
        return this.waitingIntervalMillis;
    }

    public void setWaitingIntervalMilli(long waitingIntervalMilli) {
        this.waitingIntervalMillis = waitingIntervalMilli;
    }

    public IMessageProcessor getProcessor() {
        return this.processor;
    }

    public void setProcessor(IMessageProcessor processor) {
        this.processor = processor;
    }

    public MessageReceiver getMessageReceiver() {
        return this.messageReceiver;
    }

    public void setMessageReceiver(MessageReceiver messageReceiver) {
        this.messageReceiver = messageReceiver;
    }

    public boolean isStopped() {
        return this.isStopped;
    }

    public void setStopped(boolean isStopped) {
        this.isStopped = isStopped;
    }

    public MessageConvertor getMessageConvertor() {
        return this.messageConvertor;
    }

    public void setMessageConvertor(MessageConvertor messageConvertor) {
        this.messageConvertor = messageConvertor;
    }

    public IRateLimiter getRateLimiter() {
        return this.rateLimiter;
    }

    public void setRateLimiter(IRateLimiter rateLimiter) {
        this.rateLimiter = rateLimiter;
    }

    public IActiveWorkOrderMonitor getActiveWorkOrderMonitor() {
        return this.activeWorkOrderMonitor;
    }

    public void setActiveWorkOrderMonitor(IActiveWorkOrderMonitor activeWorkOrderMonitor) {
        this.activeWorkOrderMonitor = activeWorkOrderMonitor;
    }

    public void setQueueInfor(IQueueInfo queueInfo) {
        this.queueInfo = queueInfo;
    }
}

