/*
 * Decompiled with CFR 0.152.
 */
package com.tandbergtv.watchpoint.pmm.title.message;

import com.ericsson.cms.neptune.cluster.Cluster;
import com.ericsson.cms.neptune.cluster.service.IClusterService;
import com.ericsson.cms.sites.distribution.AggregateDistributionEvent;
import com.ericsson.neptune.clustermgmt.service.ClusterConstant;
import com.hazelcast.core.MemberAttributeEvent;
import com.hazelcast.core.MembershipEvent;
import com.hazelcast.core.MembershipListener;
import com.rabbitmq.client.Channel;
import com.tandbergtv.neptune.configuration.service.IConfigurationService;
import com.tandbergtv.neptune.util.InjectionUtil;
import com.tandbergtv.watchpoint.pmm.title.ITitleManager;
import com.tandbergtv.watchpoint.pmm.title.message.IAggregateDistributionStatusProducer;
import com.tandbergtv.workflow.core.service.Service;
import com.tandbergtv.workflow.core.service.ServiceRegistry;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

public class AggregateDistributionStatusConsumer
implements InitializingBean,
DisposableBean,
Service,
MembershipListener {
    public static final Logger logger = LoggerFactory.getLogger(AggregateDistributionStatusConsumer.class);
    private static final String SITE_NEPTUNE_SETTING_GROUP_PATH = "com.ericsson.cms.sites.ui";
    private static final String AGGREGATE_DISTRIBUTION_STATUS_REFRESH_INTERVAL = "site.aggregate.distributionstatus.interval";
    private static final Integer DEFAULT_INTERVAL = 30;
    private static final String MANUAL_ACKNOWLEDGE = "MANUAL";
    public static final String AGGREGATE_DISTRIBUTION_STATUS_UPDATE_CONSUMER = "AGGREGATE_DISTRIBUTION_STATUS_UPDATE_CONSUMER";
    public static final String AGGREGATE_DISTRIBUTION_STATUS_EVENT_CONSUMER = "AGGREGATE_DISTRIBUTION_STATUS_EVENT_CONSUMER";
    public static final String SERVICE_NAME = "aggregate.distribution.status.consumer.service";
    private ITitleManager titleManager;
    private IAggregateDistributionStatusProducer aggregateDistributionStatusProducer;
    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    private IConfigurationService configurationService;
    protected Map<Long, AggregateDistributionEvent> masterTitleEventMap = new ConcurrentHashMap<Long, AggregateDistributionEvent>();
    private IClusterService hzcs;
    @Autowired
    private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
    protected Map<Long, Long> deliveryTagMap = new ConcurrentHashMap<Long, Long>();

    public AggregateDistributionStatusConsumer() {
        this.scheduleNextCheck();
    }

    @RabbitListener(id="AGGREGATE_DISTRIBUTION_STATUS_EVENT_CONSUMER", queues={"cms.queue.aggregate.distribution.status"}, containerFactory="aggregateDistributionStatusRabbitListenerContainerFactory", ackMode="MANUAL", autoStartup="false")
    public void receiveDistributionStatus(Message<String> message, Channel channel) {
        if (this.isMaster()) {
            logger.info("recieved the distribution status event message " + message);
            String body = (String)message.getPayload();
            Long masterTitleId = this.getMasterTitleId(body);
            MessageHeaders headers = message.getHeaders();
            Long deliveryTag = (Long)headers.get((Object)"amqp_deliveryTag", Long.class);
            if (this.masterTitleEventMap.containsKey(masterTitleId)) {
                this.ackMessage(channel, this.masterTitleEventMap.get(masterTitleId).getDeliveryTag());
            }
            this.masterTitleEventMap.put(masterTitleId, new AggregateDistributionEvent(masterTitleId, this.getLastUpdateTime(body), deliveryTag));
            if (!this.deliveryTagMap.isEmpty()) {
                for (Map.Entry<Long, Long> entry : this.deliveryTagMap.entrySet()) {
                    this.ackMessage(channel, entry.getKey());
                    this.deliveryTagMap.remove(entry.getKey());
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @RabbitListener(id="AGGREGATE_DISTRIBUTION_STATUS_UPDATE_CONSUMER", queues={"cms.queue.aggregate.distribution.status.trigger"}, containerFactory="aggregateDistributionStatusRabbitListenerContainerFactory", autoStartup="false", ackMode="MANUAL")
    private void aggregateStatusUpdateTriggerConsumer(Message<String> message, Channel channel) {
        logger.info("receive aggregate distribution status message, handling ", message.getPayload());
        String body = (String)message.getPayload();
        MessageHeaders headers = message.getHeaders();
        Long deliveryTag = (Long)headers.get((Object)"amqp_deliveryTag", Long.class);
        String consumerTag = (String)headers.get((Object)"amqp_consumerTag", String.class);
        logger.debug("AggregateStatusUpdateTriggerConsumer handleDelivery for consumerTag: " + consumerTag);
        Long titleId = Long.valueOf(body);
        try {
            logger.info("going to update aggregate distribution status for master title : " + titleId);
            this.getTitleManager().updateDistributionStatus(titleId.longValue());
        }
        catch (Exception e) {
            logger.error("fail to update the distribution status for master title" + titleId, (Throwable)e);
        }
        finally {
            this.ackMessage(channel, deliveryTag);
        }
    }

    protected void sendDistributionStatusTriggerEvent() {
        this.scheduleNextCheck();
        long now = System.currentTimeMillis();
        for (Map.Entry<Long, AggregateDistributionEvent> entry : this.masterTitleEventMap.entrySet()) {
            if (!this.shouldDoUpdate(now, entry.getValue())) continue;
            this.getAggregateDistributionStatusProducer().sendAggregateDistributionStatusUpdateMessage(entry.getValue().getTitleId());
            logger.debug("send to task queue cms.queue.aggregate.distribution.status.trigger the message is " + entry.getValue().getTitleId().toString());
            this.deliveryTagMap.put(entry.getValue().getDeliveryTag(), 0L);
            this.masterTitleEventMap.remove(entry.getKey());
        }
    }

    private boolean shouldDoUpdate(Long now, AggregateDistributionEvent event) {
        return now - event.getEventTimeStamp() >= (long)(this.getInterval() * 1000);
    }

    private Long getMasterTitleId(String message) {
        return this.getMessagePart(message, 0);
    }

    private Long getLastUpdateTime(String message) {
        return this.getMessagePart(message, 1);
    }

    private Long getMessagePart(String message, int i) {
        return Long.valueOf(message.split(" ")[i]);
    }

    private boolean isMaster() {
        return this.getHzcs().isMaster();
    }

    private void scheduleNextCheck() {
        this.scheduler.schedule(this::sendDistributionStatusTriggerEvent, (long)this.getInterval().intValue(), TimeUnit.SECONDS);
    }

    private IConfigurationService getConfigurationService() {
        if (this.configurationService == null) {
            this.configurationService = (IConfigurationService)InjectionUtil.injectInstance((String)"cms/ConfigurationService/local", IConfigurationService.class);
        }
        return this.configurationService;
    }

    private Integer getInterval() {
        Map siteSettings = this.getConfigurationService().getProperties(SITE_NEPTUNE_SETTING_GROUP_PATH);
        if (!siteSettings.containsKey(AGGREGATE_DISTRIBUTION_STATUS_REFRESH_INTERVAL)) {
            return DEFAULT_INTERVAL;
        }
        return Integer.valueOf((String)siteSettings.get(AGGREGATE_DISTRIBUTION_STATUS_REFRESH_INTERVAL));
    }

    private IClusterService getHzcs() {
        if (this.hzcs == null) {
            this.hzcs = Cluster.getService((String)ClusterConstant.CLUSTER_CONFIG_PATH);
        }
        return this.hzcs;
    }

    protected void setConfigurationService(IConfigurationService configurationService) {
        this.configurationService = configurationService;
    }

    protected void setHzcs(IClusterService hzcs) {
        this.hzcs = hzcs;
    }

    protected boolean ackMessage(Channel channel, long deliverTag) {
        try {
            channel.basicAck(deliverTag, false);
        }
        catch (Exception exception) {
            logger.debug("Failed to send ack to rabbitMQ: ", (Throwable)exception);
        }
        return true;
    }

    private ITitleManager getTitleManager() {
        if (this.titleManager == null) {
            this.titleManager = (ITitleManager)ServiceRegistry.getDefault().lookup(ITitleManager.class);
        }
        return this.titleManager;
    }

    protected IAggregateDistributionStatusProducer getAggregateDistributionStatusProducer() {
        if (this.aggregateDistributionStatusProducer == null) {
            this.aggregateDistributionStatusProducer = (IAggregateDistributionStatusProducer)ServiceRegistry.getDefault().lookup("aggregate.distribution.status.service");
        }
        return this.aggregateDistributionStatusProducer;
    }

    public void destroy() throws Exception {
    }

    public void afterPropertiesSet() throws Exception {
        ServiceRegistry.getDefault().register(this.getServiceName(), (Service)this);
    }

    public void start() {
        this.getHzcs().getInstance().getCluster().addMembershipListener((MembershipListener)this);
        this.startConsumer();
    }

    private void startConsumer() {
        if (this.isMaster()) {
            MessageListenerContainer statusEventConsumer = this.rabbitListenerEndpointRegistry.getListenerContainer(AGGREGATE_DISTRIBUTION_STATUS_EVENT_CONSUMER);
            logger.info("start statusEventConsumer: " + statusEventConsumer);
            statusEventConsumer.start();
        }
        MessageListenerContainer statusUpdateConsumer = this.rabbitListenerEndpointRegistry.getListenerContainer(AGGREGATE_DISTRIBUTION_STATUS_UPDATE_CONSUMER);
        logger.info("start statusUpdateConsumer: " + statusUpdateConsumer);
        statusUpdateConsumer.start();
    }

    public void stop() {
        if (this.isMaster()) {
            MessageListenerContainer statusEventConsumer = this.rabbitListenerEndpointRegistry.getListenerContainer(AGGREGATE_DISTRIBUTION_STATUS_EVENT_CONSUMER);
            logger.info("stop statusEventConsumer: " + statusEventConsumer);
            statusEventConsumer.stop();
        }
        MessageListenerContainer statusUpdateConsumer = this.rabbitListenerEndpointRegistry.getListenerContainer(AGGREGATE_DISTRIBUTION_STATUS_UPDATE_CONSUMER);
        logger.info("stop statusUpdateConsumer: " + statusUpdateConsumer);
        statusUpdateConsumer.stop();
    }

    public String getServiceName() {
        return SERVICE_NAME;
    }

    public void memberAdded(MembershipEvent membershipEvent) {
        MessageListenerContainer statusEventConsumer = this.rabbitListenerEndpointRegistry.getListenerContainer(AGGREGATE_DISTRIBUTION_STATUS_EVENT_CONSUMER);
        if (!this.isMaster() && statusEventConsumer.isRunning()) {
            statusEventConsumer.stop();
            logger.info("Current node become slave node, stop statusEventConsumer: " + statusEventConsumer);
        }
        if (this.isMaster() && !statusEventConsumer.isRunning()) {
            statusEventConsumer.start();
            logger.info("Current node become master node, start statusEventConsumer: " + statusEventConsumer);
        }
    }

    public void memberRemoved(MembershipEvent membershipEvent) {
        if (this.isMaster()) {
            this.startConsumer();
        }
    }

    public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
    }
}

