/*
 * Decompiled with CFR 0.152.
 */
package com.tandbergtv.workflow.log;

import com.tandbergtv.workflow.core.service.Service;
import com.tandbergtv.workflow.core.service.thread.ISchedulerService;
import com.tandbergtv.workflow.core.service.thread.Scheduler;
import com.tandbergtv.workflow.log.Finder;
import com.tandbergtv.workflow.log.LogDeserializer;
import com.tandbergtv.workflow.log.LogSyncCallable;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.AndFilterBuilder;
import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.FilterBuilders;
import org.hibernate.SessionFactory;
import org.jbpm.logging.log.ProcessLog;

public class LogSyncService
implements Service {
    private Finder finder;
    private ISchedulerService<Void> scheduler;
    private ISchedulerService<Long> syncScheduler;
    private TransportClient client;
    private long period;
    private long windowStart;
    private long windowEnd;
    private String windowString;
    private String scrollId;
    private static final int POOL_SIZE = 5;
    private static final long TIMEOUT = 60000L;
    private static final int MAX_RETRIES = 2;
    private static final int SEARCH_RESPONSE_SIZE = 1000;
    private static final Logger LOGGER = Logger.getLogger(LogSyncService.class);

    public LogSyncService(SessionFactory factory, long period, TransportClient client) {
        this.finder = new Finder(factory);
        this.scheduler = new Scheduler("logger-sync", 1, 1);
        this.syncScheduler = new Scheduler("sync-pool", 5, 5);
        this.period = period * 1000L;
        this.client = client;
    }

    public void start() {
        this.syncScheduler.start();
        this.scheduler.start();
        this.scheduler.schedule(new Runnable(){

            @Override
            public void run() {
                LogSyncService.this.write();
            }
        }, 0L, this.period);
    }

    public void stop() {
        this.syncScheduler.stop();
        this.scheduler.stop();
    }

    public String getServiceName() {
        return "log-sync";
    }

    private void write() {
        this.setupInverval();
        List<ProcessLog> list = null;
        for (int retries = 0; retries <= 2; ++retries) {
            try {
                if (list == null) {
                    list = this.setupScrollSearch();
                }
                if (list.isEmpty()) {
                    return;
                }
                HashSet<Future> futures = new HashSet<Future>();
                do {
                    for (LogSyncCallable sync : this.partition(list, this.windowStart, this.windowEnd)) {
                        futures.add(this.syncScheduler.schedule((Callable)sync));
                    }
                    for (Future future : futures) {
                        future.get();
                    }
                } while ((list = this.getNextScrollInterval()).size() > 0);
                LOGGER.info((Object)("Sync complete " + this.windowString));
                break;
            }
            catch (Throwable t) {
                String retryStatement = retries < 2 ? "Retrying attempt " + retries : "Maximum retries exceeded";
                LOGGER.error((Object)("Failure when attempting to sync logs to DB " + this.windowString + ". " + retryStatement), t);
                try {
                    TimeUnit.SECONDS.sleep(30L);
                }
                catch (InterruptedException e) {
                    // empty catch block
                }
                continue;
            }
        }
    }

    private Collection<LogSyncCallable> partition(List<ProcessLog> list, long windowStart, long windowEnd) {
        if (list.isEmpty()) {
            throw new IllegalArgumentException();
        }
        ArrayList<LogSyncCallable> callables = new ArrayList<LogSyncCallable>();
        int size = list.size();
        if (size <= 5) {
            return Collections.singleton(this.createCallable(list, 0, windowStart, windowEnd));
        }
        int count = (int)Math.ceil((double)size * 1.0 / 5.0);
        int start = 0;
        for (int i = 0; i < 5 && start < size; start += count, ++i) {
            int end = Math.min(start + count, size);
            LOGGER.debug((Object)("Thread " + i + " start " + start + ", end " + end));
            callables.add(this.createCallable(list.subList(start, end), i, windowStart, windowEnd));
        }
        return callables;
    }

    private LogSyncCallable createCallable(List<ProcessLog> list, int threadNum, long windowStart, long windowEnd) {
        return new LogSyncCallable(this.getSessionFactory(), list, threadNum);
    }

    private SessionFactory getSessionFactory() {
        return this.finder.getSessionFactory();
    }

    private void setupInverval() {
        long now = System.currentTimeMillis();
        this.windowEnd = now - now % this.period;
        this.windowStart = this.windowEnd - this.period;
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        this.windowString = "for window between " + sdf.format(this.windowStart) + " and " + sdf.format(this.windowEnd);
    }

    private List<ProcessLog> setupScrollSearch() {
        AndFilterBuilder filterBuilder = FilterBuilders.andFilter((FilterBuilder[])new FilterBuilder[0]).add((FilterBuilder)FilterBuilders.termFilter((String)"logType", (String)"resourceLog")).add((FilterBuilder)FilterBuilders.rangeFilter((String)"eventTimestamp").gte(this.windowStart).lte(this.windowEnd));
        SearchRequestBuilder search = this.client.prepareSearch(new String[0]).setTypes(new String[]{"log"});
        search.setPostFilter((FilterBuilder)filterBuilder);
        search.setSize(1000);
        search.setScroll(new TimeValue(60000L));
        SearchResponse response = (SearchResponse)search.execute().actionGet(60000L);
        LOGGER.debug((Object)("Found " + response.getHits().getTotalHits() + " logs " + this.windowString));
        this.scrollId = response.getScrollId();
        return LogDeserializer.convert(response.getHits().getHits());
    }

    private List<ProcessLog> getNextScrollInterval() {
        SearchResponse response = (SearchResponse)this.client.prepareSearchScroll(this.scrollId).setScroll(new TimeValue(60000L)).execute().actionGet(60000L);
        return LogDeserializer.convert(response.getHits().getHits());
    }
}

