/*
 * Decompiled with CFR 0.152.
 */
package com.tandbergtv.workflow.driver.message.queue;

import com.ericsson.cms.neptune.cluster.service.IClusterService;
import com.tandbergtv.workflow.core.service.thread.ISchedulerService;
import com.tandbergtv.workflow.core.service.thread.Scheduler;
import com.tandbergtv.workflow.driver.message.queue.FileMessageReader;
import com.tandbergtv.workflow.driver.message.queue.IFileMessageConsumer;
import com.tandbergtv.workflow.driver.message.queue.IFileWatcher;
import com.tandbergtv.workflow.driver.message.queue.MessageWriter;
import com.tandbergtv.workflow.driver.message.queue.WorkflowMessageValidator;
import com.tandbergtv.workflow.message.WorkflowMessage;
import com.tandbergtv.workflow.message.producer.ComponentType;
import com.tandbergtv.workflow.message.producer.IMessageProducer;
import com.tandbergtv.workflow.message.producer.MessagePublishException;
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import org.springframework.util.Assert;

public class FileMessageConsumer
implements IFileMessageConsumer {
    protected static final Logger LOGGER = Logger.getLogger(FileMessageConsumer.class);
    private IClusterService clusterService;
    private IMessageProducer messageProducer;
    private String dir;
    private static final String NAME = "com.tandbergtv.workflow.LegacyFileMessageConsumer";
    protected boolean stopped = false;
    protected ISchedulerService<Void> scheduler;
    private IFileWatcher fileWatcher;
    private long serviceStartDelayTime = 1000L;
    private FileMessageReader messageReader = new FileMessageReader();
    private MessageWriter messageWriter;

    public FileMessageConsumer(IMessageProducer messageProduce, String dir, IFileWatcher fileWatcher) {
        this.dir = dir;
        if (!new File(this.getWorkDir()).exists()) {
            boolean flag = new File(this.getWorkDir()).mkdirs();
            Assert.isTrue((boolean)flag, (String)("Failed to create file message queue work folder on:" + this.getWorkDir()));
        }
        this.setMessageProducer(messageProduce);
        this.scheduler = new Scheduler("LegacyFileMessageConsumer", 1, 1);
        this.fileWatcher = fileWatcher;
        fileWatcher.setFileMessageConsumer(this);
        this.messageWriter = new MessageWriter(dir);
    }

    public void setMessageProducer(IMessageProducer messageProducer) {
        this.messageProducer = messageProducer;
    }

    public void setClusterService(IClusterService clusterService) {
        this.clusterService = clusterService;
    }

    @Override
    public String getWorkDir() {
        return this.dir + File.separator + "mq";
    }

    public String getServiceName() {
        return NAME;
    }

    public void start() {
        LOGGER.debug((Object)"Starting File Message Consumer");
        this.stopped = false;
        this.scheduler.start();
        this.scheduler.schedule((Callable)new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                FileMessageConsumer.this.scanExistedFiles();
                FileMessageConsumer.this.fileWatcher.start();
                return null;
            }
        }, this.serviceStartDelayTime);
    }

    private void scanExistedFiles() {
        new FileMessageReader().scan(this);
    }

    public void stop() {
        try {
            this.fileWatcher.stop();
            this.scheduler.stop();
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        LOGGER.debug((Object)"Stopped");
    }

    @Override
    public long handle(List<File> files) {
        if (files.isEmpty()) {
            return 0L;
        }
        TreeMap<String, WorkflowMessage> messages = new TreeMap<String, WorkflowMessage>();
        for (File file : files) {
            if (!this.isOwner(file.getName())) continue;
            try {
                WorkflowMessage document = this.messageReader.readMessage(file);
                if (document == null) continue;
                WorkflowMessageValidator.validate(document);
                messages.put(file.getAbsolutePath(), document);
            }
            catch (Exception e) {
                LOGGER.warn((Object)("Invalid format for the message file:" + file.getAbsolutePath() + ". Due to " + e.getMessage() + ". Reject the message."));
                this.messageWriter.moveToRejectedFolder(file);
            }
        }
        LOGGER.debug((Object)("Number of files to handle " + messages.size()));
        long result = 0L;
        try {
            result = this.sendMessages(messages);
        }
        catch (MessagePublishException e) {
            this.sleepForNextRetry(e);
        }
        return result;
    }

    private void sleepForNextRetry(MessagePublishException e) {
        LOGGER.error((Object)"Fail to connect to RabbitMQ, sleep the Message Consumer for 30 seconds, retry it later", (Throwable)e);
        try {
            Thread.sleep(30000L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    protected long sendMessages(Map<String, WorkflowMessage> documents) throws MessagePublishException {
        long count = 0L;
        for (Map.Entry<String, WorkflowMessage> entry : documents.entrySet()) {
            this.messageProducer.publish(ComponentType.WORKFLOW_FILE_MESSAGE, entry.getValue());
            LOGGER.debug((Object)("Remove the message file :" + entry.getKey() + " , it has been commited into RabbitMQ"));
            FileUtils.deleteQuietly((File)new File(entry.getKey()));
            ++count;
        }
        return count;
    }

    public void setServiceStartDelayTime(long serviceStartDelayTime) {
        this.serviceStartDelayTime = serviceStartDelayTime;
    }

    @Override
    public boolean isOwner(String fileName) {
        return this.clusterService.isOwner((Object)fileName);
    }
}

