/*
 * Decompiled with CFR 0.152.
 */
package com.tandbergtv.workflow.driver.message.queue;

import com.google.common.collect.Lists;
import com.rabbitmq.client.Channel;
import com.tandbergtv.workflow.driver.message.queue.IArchiveMessageProcessor;
import com.tandbergtv.workflow.message.ArchiveMessage;
import com.tandbergtv.workflow.message.ArchiveMessageChunk;
import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

public class ArchiveMessageConsumer {
    private static final Logger LOGGER = Logger.getLogger(ArchiveMessageConsumer.class);
    private static final String ARCHIVE_MESSAGE_CONSUMER = "Archived-Message-Consumer";
    private static final String ARCHIVED_QUEUE_NAME = "cms.queue.archive.wo";
    private IArchiveMessageProcessor archiveMessageProcessor;

    @RabbitListener(id="Archived-Message-Consumer", queues={"cms.queue.archive.wo"}, ackMode="MANUAL")
    public void consumeArchiveMessage(Message<List<ArchiveMessage>> message, Channel channel) {
        List archiveMessageList = (List)message.getPayload();
        MessageHeaders headers = message.getHeaders();
        Long deliveryTag = (Long)headers.get((Object)"amqp_deliveryTag", Long.class);
        String traceId = (String)headers.get((Object)"traceId", String.class);
        String response = "acked";
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug((Object)("Archive-Consuming-Start traceId: " + traceId + ", ts: " + System.currentTimeMillis()));
        }
        try {
            List<ArchiveMessageChunk> archiveMessages = this.splitList(archiveMessageList, 200);
            for (ArchiveMessageChunk archiveMessage : archiveMessages) {
                Boolean result = this.archiveMessageProcessor.processMessages(archiveMessage);
                if (result.booleanValue()) continue;
                this.nackMessage(channel, deliveryTag);
                response = "nacked";
                return;
            }
            this.ackMessages(channel, deliveryTag);
        }
        catch (Exception e) {
            LOGGER.error((Object)"consume archive message error.", (Throwable)e);
            this.rejectMessage(channel, deliveryTag);
            response = "rejected";
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug((Object)("Archive-Consuming-End traceId: " + traceId + ", ts: " + System.currentTimeMillis() + ", wo_count: " + archiveMessageList.size() + ", response: " + response));
        }
    }

    private List<ArchiveMessageChunk> splitList(List<ArchiveMessage> message, int len) {
        ArrayList result = Lists.newArrayList();
        int size = message.size();
        int count = (size + len - 1) / len;
        for (int i = 0; i < count; ++i) {
            List<ArchiveMessage> subList = message.subList(i * len, (i + 1) * len > size ? size : len * (i + 1));
            ArchiveMessageChunk messages = new ArchiveMessageChunk(len);
            subList.forEach(arg_0 -> ((ArchiveMessageChunk)messages).setMessage(arg_0));
            result.add(messages);
        }
        return result;
    }

    private void ackMessages(Channel channel, Long deliverTag) {
        try {
            channel.basicAck(deliverTag.longValue(), false);
        }
        catch (Exception exception) {
            LOGGER.error((Object)"Failed to send ack to rabbitMQ: ", (Throwable)exception);
        }
    }

    private void nackMessage(Channel channel, long deliverTag) {
        try {
            channel.basicNack(deliverTag, false, true);
        }
        catch (Exception exception) {
            LOGGER.error((Object)("Failed to send nack to rabbitMQ deliverTag[" + deliverTag + "]: "), (Throwable)exception);
        }
    }

    private void rejectMessage(Channel channel, long deliverTag) {
        try {
            channel.basicReject(deliverTag, false);
        }
        catch (Exception exception) {
            LOGGER.error((Object)("Exception to send Reject to rabbitMQ deliverTag[" + deliverTag + "]: "), (Throwable)exception);
        }
    }

    public void setArchiveMessageProcessor(IArchiveMessageProcessor archiveMessageProcessor) {
        this.archiveMessageProcessor = archiveMessageProcessor;
    }
}

