/*
 * Decompiled with CFR 0.152.
 */
package com.tandbergtv.workflow.driver.message.queue;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import com.tandbergtv.workflow.comm.routing.impl.WorkflowMessageConvertor;
import com.tandbergtv.workflow.core.service.Service;
import com.tandbergtv.workflow.core.service.thread.Scheduler;
import com.tandbergtv.workflow.driver.message.queue.FileMessageConsumerFactory;
import com.tandbergtv.workflow.driver.message.queue.IMessageProcessor;
import com.tandbergtv.workflow.driver.message.queue.IQueueInfo;
import com.tandbergtv.workflow.driver.message.queue.MessageReceiver;
import com.tandbergtv.workflow.driver.message.queue.MessageWriter;
import com.tandbergtv.workflow.driver.message.queue.RejectExecutionMessageExecption;
import com.tandbergtv.workflow.message.WorkflowMessage;
import com.tandbergtv.workflow.message.WorkflowMessageBuilder;
import com.tandbergtv.workflow.message.producer.MessageConvertor;
import com.tandbergtv.workflow.process.ratelimiter.IActiveWorkOrderMonitor;
import com.tandbergtv.workflow.process.ratelimiter.IRateLimiter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.log4j.Logger;

public class WorkOrderMessageConsumer
implements Service {
    private static final Logger LOGGER = Logger.getLogger(WorkOrderMessageConsumer.class);
    private static final String DEFAULT_MESSAGE_CONSUMER = "Work-Order-Message-Consumer";
    public static final String DEFAULT_ENCODER = "UTF-8";
    public static final long DEFAULT_WAITING_MILLISECONDS = 2000L;
    private long waitingIntervalMillis = 2000L;
    private long noDataSleepInterval = 100L;
    private boolean isStopped = false;
    private IMessageProcessor processor;
    private MessageReceiver messageReceiver;
    private IRateLimiter rateLimiter;
    private IActiveWorkOrderMonitor activeWorkOrderMonitor;
    private MessageConvertor messageConvertor;
    private Scheduler<Void> scheduler;
    private IQueueInfo queueInfo;
    private MessageWriter messageWriter;
    private Cache<String, AtomicInteger> redeliverCounters = CacheBuilder.newBuilder().maximumSize(10000L).expireAfterWrite(15L, TimeUnit.MINUTES).build();
    private LinkedBlockingQueue<Long> nackedMessageQueue = new LinkedBlockingQueue(10000);
    private long lastNackSendTime = 0L;

    public String getServiceName() {
        return DEFAULT_MESSAGE_CONSUMER;
    }

    public void start() {
        this.messageWriter = new MessageWriter(FileMessageConsumerFactory.getOffloadQueueDir());
        this.messageReceiver.start();
        this.scheduler = new Scheduler(DEFAULT_MESSAGE_CONSUMER, 2, 2);
        this.scheduler.start();
        LOGGER.debug((Object)("Start to pull message from RabbitMQ by service: " + this.getServiceName()));
        this.scheduler.schedule(() -> {
            this.pullMessage();
            return null;
        });
        this.scheduler.schedule(() -> this.fetchQueueSize(), 1000L, 15000L);
        this.queueInfo.start();
    }

    private void batchHandleNackedMessage() {
        if (System.currentTimeMillis() - this.lastNackSendTime < 10000L || this.nackedMessageQueue.isEmpty()) {
            return;
        }
        this.lastNackSendTime = System.currentTimeMillis();
        try {
            LOGGER.debug((Object)("Got " + this.nackedMessageQueue.size() + " messages to nack."));
            ArrayList toHandle = Lists.newArrayList();
            this.nackedMessageQueue.drainTo(toHandle);
            if (!toHandle.isEmpty()) {
                Iterator iterator = toHandle.iterator();
                while (iterator.hasNext()) {
                    long deliverTag = (Long)iterator.next();
                    this.nackMessage(deliverTag);
                }
            }
        }
        catch (Exception e) {
            LOGGER.error((Object)"Failed to send nack  message to rabbitMQ.", (Throwable)e);
        }
    }

    protected void fetchQueueSize() {
        try {
            int queueSize = this.messageReceiver.getQueueSize();
            if (queueSize >= 0) {
                this.queueInfo.updateQueueSize(queueSize);
            }
        }
        catch (Exception e) {
            LOGGER.info((Object)("Failed to get queue size from rabbitMQ:" + ExceptionUtils.getRootCauseMessage((Throwable)e)));
        }
    }

    public void stop() {
        this.isStopped = true;
        this.scheduler.stop();
        this.messageReceiver.stop();
        this.queueInfo.stop();
        this.redeliverCounters.cleanUp();
        LOGGER.debug((Object)"Message Consumer is stopped");
    }

    protected void pullMessage() throws InterruptedException {
        int noExecutionTokenCounter = 0;
        while (!this.isStopped) {
            try {
                GetResponse response = this.fetchMessage();
                WorkflowMessage message = this.extractValidMessage(response);
                if (message == null) {
                    LOGGER.debug((Object)"No valid message found..");
                    continue;
                }
                if (this.requireExecutionTokenByRateLimter(message)) {
                    LOGGER.debug((Object)("Got execution token from RateLimter, handling  message, cmd =" + message.getCommandName() + " id = " + message.getCreateId()));
                    this.executeMessage(response.getEnvelope(), message);
                    noExecutionTokenCounter = 0;
                    continue;
                }
                LOGGER.debug((Object)("Cannot create new Work Order now, the execution threshold has been reached. Message[" + message.getCreateId() + "] is nacked"));
                this.addNackMessageToQueue(response.getEnvelope().getDeliveryTag());
                long sleepTime = this.waitingIntervalMillis;
                if (++noExecutionTokenCounter > 5) {
                    sleepTime = this.waitingIntervalMillis * 5L;
                    noExecutionTokenCounter = 0;
                }
                Thread.sleep(sleepTime);
            }
            catch (Exception e) {
                LOGGER.error((Object)"Failed to handle message from rabbitMQ.", (Throwable)e);
                Thread.sleep(this.waitingIntervalMillis * 5L);
            }
        }
    }

    private WorkflowMessage extractValidMessage(GetResponse response) {
        if (response == null) {
            return null;
        }
        WorkflowMessage message = this.extractMessage(response);
        long deliverTag = response.getEnvelope().getDeliveryTag();
        String commandName = message.getCommandName();
        if (this.isInvalidCommand(commandName)) {
            this.rejectUnsuportMessage(deliverTag, message);
            message = null;
        } else if (this.isDuplicatedRecoverCommand(message, commandName)) {
            this.rejectDuplicatedMessage(deliverTag, message);
            message = null;
        }
        return message;
    }

    private boolean isDuplicatedRecoverCommand(WorkflowMessage message, String commandName) {
        return "recover".equals(commandName) && this.activeWorkOrderMonitor.isInMonitor(message.getCreateId());
    }

    private boolean isInvalidCommand(String commandName) {
        return !WorkflowMessageBuilder.SUPPORT_COMMANDS.contains(commandName);
    }

    private boolean requireExecutionTokenByRateLimter(WorkflowMessage message) {
        return this.rateLimiter.requireExecution(message);
    }

    private WorkflowMessage extractMessage(GetResponse delivery) {
        try {
            String msgBody = new String(delivery.getBody(), DEFAULT_ENCODER);
            return this.messageConvertor.asWorkflowMessage(msgBody);
        }
        catch (Exception e) {
            LOGGER.error((Object)"Cannot parse the message from deliver:", (Throwable)e);
            return null;
        }
    }

    protected boolean resetReceivedMessage(Envelope envelope, WorkflowMessage message) {
        try {
            String createId = message.getCreateId();
            this.activeWorkOrderMonitor.removeActiveWO(createId);
            AtomicInteger counter = (AtomicInteger)this.redeliverCounters.get((Object)createId, () -> new AtomicInteger(0));
            if (counter.getAndIncrement() >= 20) {
                counter.set(0);
                this.rejectOverRedeliverMessage(envelope.getDeliveryTag(), message);
            } else {
                this.addNackMessageToQueue(envelope.getDeliveryTag());
            }
        }
        catch (Exception exception) {
            LOGGER.debug((Object)"Cannot reset the received message ", (Throwable)exception);
        }
        return true;
    }

    public boolean executeMessage(Envelope envelope, WorkflowMessage message) {
        try {
            if (this.processor.processMessage(message, envelope.isRedeliver()).booleanValue()) {
                this.queueInfo.handledOneMessage(message.getDelayTime());
                this.ackMessage(envelope.getDeliveryTag());
            } else {
                this.resetReceivedMessage(envelope, message);
            }
        }
        catch (RejectExecutionMessageExecption e) {
            LOGGER.error((Object)("Message [" + message.getCreateId() + "] is rejected." + e.getMessage()));
            this.rejectReceivedMessage(envelope.getDeliveryTag(), message, e.isMoveMessageToRejectFolder());
        }
        catch (Exception exception) {
            this.activeWorkOrderMonitor.removeActiveWO(message.getCreateId());
            LOGGER.error((Object)("Something wrong when handling message [" + message.getCreateId() + "] to work order. "), (Throwable)exception);
        }
        return true;
    }

    private void rejectReceivedMessage(long deliverTag, WorkflowMessage message, boolean writeRejectedMessage) {
        if (writeRejectedMessage) {
            this.writeRejectedMessage(message);
        }
        try {
            this.activeWorkOrderMonitor.removeActiveWO(message.getCreateId());
        }
        catch (Exception e) {
            LOGGER.warn((Object)("fail to remove active wo from monitor:" + e.getMessage()));
        }
        this.ackMessage(deliverTag);
    }

    private void rejectUnsuportMessage(long deliverTag, WorkflowMessage message) {
        LOGGER.error((Object)("The message[" + message.getCreateId() + "] with command type :" + message.getCommandName() + " is not supported, reject the handling"));
        this.rejectInvalidMessage(deliverTag, message);
    }

    private void rejectDuplicatedMessage(long deliverTag, WorkflowMessage message) {
        LOGGER.error((Object)("The message[" + message.getCreateId() + "] is duplicated in current handling queue, reject the handling"));
        this.rejectInvalidMessage(deliverTag, message);
    }

    private void rejectOverRedeliverMessage(long deliverTag, WorkflowMessage message) {
        LOGGER.error((Object)("The message[" + message.getCreateId() + "] is redelivered more than 20 times to execute and get a error ,reject the handling"));
        this.rejectInvalidMessage(deliverTag, message);
    }

    private void rejectInvalidMessage(long deliverTag, WorkflowMessage message) {
        this.writeRejectedMessage(message);
        this.ackMessage(deliverTag);
    }

    protected boolean ackMessage(long deliverTag) {
        try {
            this.messageReceiver.getChannel().basicAck(deliverTag, false);
        }
        catch (Exception exception) {
            LOGGER.debug((Object)"Failed to send ack to rabbitMQ: ", (Throwable)exception);
        }
        return true;
    }

    protected boolean addNackMessageToQueue(long deliverTag) {
        try {
            this.nackedMessageQueue.offer(deliverTag);
        }
        catch (Exception exception) {
            LOGGER.debug((Object)"Failed to put nack message  to pending queue: ", (Throwable)exception);
        }
        return true;
    }

    protected boolean nackMessage(long deliverTag) {
        try {
            this.messageReceiver.getChannel().basicNack(deliverTag, false, true);
        }
        catch (Exception exception) {
            LOGGER.debug((Object)"Failed to send nack to rabbitMQ: ", (Throwable)exception);
        }
        return true;
    }

    private void writeRejectedMessage(WorkflowMessage message) {
        try {
            String messageId = message.getCreateId();
            String messageXML = new WorkflowMessageConvertor().asString(message);
            String path = this.messageWriter.writeRejectedMessage(messageId, messageXML);
            LOGGER.info((Object)("Rejected Message [" + message.getCreateId() + "] is moved to : " + path));
        }
        catch (Exception e) {
            LOGGER.warn((Object)("Failed to write the rejected mesage " + e.getMessage()));
        }
    }

    private GetResponse fetchMessage() {
        GetResponse response = null;
        long retryCounter = 0L;
        do {
            try {
                response = this.messageReceiver.nextMessage();
                this.batchHandleNackedMessage();
                if (response != null) {
                    break;
                }
            }
            catch (Exception e) {
                LOGGER.error((Object)"Failed to receive message from RabbitMQ", (Throwable)e);
            }
            try {
                if (++retryCounter % 1000L == 0L) {
                    LOGGER.debug((Object)"Cannot find message from rabbit MQ around 100 seconds.");
                    retryCounter = 0L;
                }
                Thread.sleep(this.noDataSleepInterval);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        } while (!this.isStopped);
        return response;
    }

    public long getWaitingIntervalMilli() {
        return this.waitingIntervalMillis;
    }

    public void setWaitingIntervalMilli(long waitingIntervalMilli) {
        this.waitingIntervalMillis = waitingIntervalMilli;
    }

    public IMessageProcessor getProcessor() {
        return this.processor;
    }

    public void setProcessor(IMessageProcessor processor) {
        this.processor = processor;
    }

    public MessageReceiver getMessageReceiver() {
        return this.messageReceiver;
    }

    public void setMessageReceiver(MessageReceiver messageReceiver) {
        this.messageReceiver = messageReceiver;
    }

    public boolean isStopped() {
        return this.isStopped;
    }

    public void setStopped(boolean isStopped) {
        this.isStopped = isStopped;
    }

    public MessageConvertor getMessageConvertor() {
        return this.messageConvertor;
    }

    public void setMessageConvertor(MessageConvertor messageConvertor) {
        this.messageConvertor = messageConvertor;
    }

    public IRateLimiter getRateLimiter() {
        return this.rateLimiter;
    }

    public void setRateLimiter(IRateLimiter rateLimiter) {
        this.rateLimiter = rateLimiter;
    }

    public IActiveWorkOrderMonitor getActiveWorkOrderMonitor() {
        return this.activeWorkOrderMonitor;
    }

    public void setActiveWorkOrderMonitor(IActiveWorkOrderMonitor activeWorkOrderMonitor) {
        this.activeWorkOrderMonitor = activeWorkOrderMonitor;
    }

    public void setQueueInfor(IQueueInfo queueInfo) {
        this.queueInfo = queueInfo;
    }
}

