/*
 * Decompiled with CFR 0.152.
 */
package com.tandbergtv.workflow.message.producer;

import com.ericsson.cms.neptune.rabbitmq.sender.IMessageSender;
import com.ericsson.cms.neptune.rabbitmq.workflow.WorkflowExchangeType;
import com.google.common.collect.Maps;
import com.rabbitmq.client.AlreadyClosedException;
import com.tandbergtv.workflow.core.service.Service;
import com.tandbergtv.workflow.core.service.ServiceLookup;
import com.tandbergtv.workflow.message.WorkflowMessage;
import com.tandbergtv.workflow.message.createid.CreateIDContextFactory;
import com.tandbergtv.workflow.message.producer.ComponentType;
import com.tandbergtv.workflow.message.producer.IMessageProducer;
import com.tandbergtv.workflow.message.producer.MessageConvertor;
import com.tandbergtv.workflow.message.producer.MessagePublishException;
import com.tandbergtv.workflow.message.producer.MessageQueuePriorityMapper;
import com.tandbergtv.workflow.message.util.MarshalException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;

public class MessageProducer
implements IMessageProducer,
Service {
    private static final Logger LOGGER = Logger.getLogger(MessageProducer.class);
    public static final String MESSAGE_PRODUCER_SERVICE = "Workflow-Message-Producer";
    public static final WorkflowExchangeType DEFAULT_WORKORDER_EXCHANGE = WorkflowExchangeType.COMPLETION;
    private IMessageSender messageSender;
    private MessageConvertor messageConvertor;
    private MessageQueuePriorityMapper priorityMapper;
    private volatile Boolean inited = false;
    private boolean autoReConnect = true;
    private long retryInterval = 3000L;
    private long connectionTimeout = 30000L;

    public MessageProducer(IMessageSender messageSender, MessageConvertor messageConvertor, MessageQueuePriorityMapper priorityMapper) {
        this.messageSender = messageSender;
        this.messageConvertor = messageConvertor;
        this.priorityMapper = priorityMapper;
        if (messageSender.getMQConfiguration() != null) {
            this.retryInterval = messageSender.getMQConfiguration().getNetworkRecoveryInterval();
        }
    }

    @Override
    public String publish(ComponentType sourceComponet, WorkflowMessage message) throws MessagePublishException {
        String exchangeName = DEFAULT_WORKORDER_EXCHANGE.getValue();
        int priority = this.priorityMapper.calcuRabbitMQPriority(sourceComponet, message);
        HashMap headers = Maps.newHashMap();
        headers.put("sourceComponent", sourceComponet.toString());
        try {
            message.setPublishTime(System.currentTimeMillis());
            message.setSourceComponent(sourceComponet);
            LOGGER.debug((Object)("The message is bypass ratelimiter:" + message));
            return this.sendMessage(exchangeName, headers, priority, message);
        }
        catch (AlreadyClosedException e) {
            if (!this.autoReConnect) {
                throw new MessagePublishException("Failed to connect to RabbitMQ.", (Exception)((Object)e));
            }
            String msg = "Cannot connect to RabbitMq due to error " + e.getMessage() + ", try to re-setup the connection.";
            LOGGER.error((Object)msg);
            this.recoveryConnection();
            return this.sendMessage(exchangeName, headers, priority, message);
        }
    }

    protected String sendMessage(String exchangeType, Map<String, Object> headers, int priority, WorkflowMessage message) throws MessagePublishException {
        String createId = message.getCreateId();
        if (StringUtils.isBlank((String)createId)) {
            createId = this.createUUID();
            this.addCreateId(createId, message);
        }
        Boolean isAcked = Boolean.TRUE;
        String error = "Failed to publish message to RabbitMQ.";
        try {
            String messageBytes = this.messageConvertor.asString(message);
            LOGGER.debug((Object)("Message to be sent: " + messageBytes));
            String routingKey = message.getSourceComponent() != null ? message.getSourceComponent().toString() : "";
            isAcked = this.messageSender.send(exchangeType, routingKey, headers, priority, messageBytes);
            LOGGER.debug((Object)("Response to send action, isAck:  " + isAcked));
        }
        catch (MarshalException | IOException e) {
            LOGGER.error((Object)error, (Throwable)e);
            throw new MessagePublishException(error, e);
        }
        if (!isAcked.booleanValue()) {
            throw new MessagePublishException(error);
        }
        return createId;
    }

    private void addCreateId(String createId, WorkflowMessage message) {
        message.getCommand().addParameter("CreateID", createId);
    }

    private String createUUID() {
        return CreateIDContextFactory.createJavaUUIDGenerator().newUUID();
    }

    public IMessageSender getMessageSender() {
        return this.messageSender;
    }

    public boolean isInitialized() {
        return this.inited;
    }

    public void start() {
        try {
            this.messageSender.start();
            this.inited = true;
        }
        catch (Exception e) {
            LOGGER.warn((Object)"Failed to start the connection to RabbitMQ, try to reconnect  it  late.");
            this.recoveryConnection();
        }
    }

    private void recoveryConnection() {
        Boolean setup = (Boolean)ServiceLookup.forceExecute(() -> {
            this.messageSender.recoverConnection();
            return true;
        }, (String)"Creating the connection to RabbitMQ ", (long)this.connectionTimeout, (long)this.retryInterval);
        if (setup == null || !setup.booleanValue()) {
            throw new IllegalStateException("The Connection is closed, and cannot re-setup it after 10 retries");
        }
        LOGGER.info((Object)"Re-Connected to RabbitMQ after last failure, ready to send  message now");
    }

    public void stop() {
        this.messageSender.stop();
    }

    public void setConnectionTimeout(long connectionTimeout) {
        this.connectionTimeout = connectionTimeout;
    }

    public String getServiceName() {
        return MESSAGE_PRODUCER_SERVICE;
    }
}

