/*
 * Decompiled with CFR 0.152.
 */
package com.tandbergtv.workflow.elasticsearch.geosync;

import com.tandbergtv.workflow.elasticsearch.geosync.TransportClientFactory;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.SearchHit;

public class ESGeoReplicationService {
    private static final String GEO_CONFIG_FILE = "conf/geosync.properties";
    private static final String INDEX_PREFIX = "history_logs_";
    private static final String INDEX_ENTRY_TYPE_NAME = "_doc";
    private static final String INDEX_ARTICLE_TYPE = "_doc";
    private static final String LAST_SYNC_INDEX = "lastsyncindex";
    private static final long TIMEOUT_MS = 60000L;
    private static final int SEARCH_RESPONSE_SIZE = 1000;
    private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
    private static CommandLine cl;
    private static int syncInterval;
    private static int retentionPeriods;
    private static int flushInterval;
    private static long period;
    private static long now;
    private static long windowEnd;
    private static long windowStart;
    private static String scrollId;
    private static String indexSuffix;
    private static TransportClient primaryClient;
    private static TransportClient secondaryClient;
    private static BulkProcessor bulkCache;
    private static String hostname;
    private static long syncSearchTotalHits;
    private static long currentSearchHits;

    public static void main(String[] args) throws Exception {
        ESGeoReplicationService.initialize(args);
        ESGeoReplicationService.runBatch();
        ESGeoReplicationService.purgeOldIndex();
        Thread.sleep(TimeUnit.SECONDS.toMillis(flushInterval * 5));
        bulkCache.flush();
    }

    private static void initialize(String[] args) throws UnknownHostException {
        System.out.println("Parameters:");
        for (int i = 0; i < args.length; ++i) {
            System.out.print(" <" + args[i] + ">");
        }
        System.out.println();
        BasicParser parser = new BasicParser();
        try {
            cl = parser.parse(ESGeoReplicationService.getOptions(), args);
        }
        catch (ParseException e) {
            ESGeoReplicationService.printUsageAndExit();
        }
        ESGeoReplicationService.validateOptions();
        Properties geoProperties = ESGeoReplicationService.getProperties(GEO_CONFIG_FILE);
        try {
            ESGeoReplicationService.setPrimaryClient(geoProperties);
        }
        catch (UnknownHostException ex) {
            System.out.println(ex.getMessage());
            throw ex;
        }
        try {
            ESGeoReplicationService.setSecondaryClient(geoProperties);
        }
        catch (UnknownHostException ex) {
            System.out.println(ex.getMessage());
            throw ex;
        }
        try {
            hostname = InetAddress.getLocalHost().getHostName();
        }
        catch (UnknownHostException e) {
            System.out.println("WARN: getting hostname failed.");
        }
        System.out.println("Initializing at " + hostname + ", current time: " + System.currentTimeMillis());
        syncInterval = Integer.valueOf(geoProperties.getProperty("sync.interval"));
        retentionPeriods = Integer.parseInt(cl.getOptionValue("rp"));
        period = TimeUnit.MINUTES.toMillis(syncInterval);
        now = System.currentTimeMillis();
        windowEnd = now - now % period - TimeUnit.MINUTES.toMillis(5L);
        int numberOfCache = Integer.parseInt(geoProperties.getProperty("number.cached.request"));
        flushInterval = Integer.valueOf(geoProperties.getProperty("flush.interval"));
        bulkCache = BulkProcessor.builder((Client)secondaryClient, (BulkProcessor.Listener)ESGeoReplicationService.createBulkListener()).setBulkActions(numberOfCache).setFlushInterval(TimeValue.timeValueSeconds((long)flushInterval)).build();
    }

    private static void validateOption(String name) {
        if (cl.getOptionValue(name) == null) {
            System.out.println("Option '" + name + "' is required.");
            ESGeoReplicationService.printUsageAndExit();
        }
    }

    private static void validateOptions() {
        ESGeoReplicationService.validateOption("pe");
        ESGeoReplicationService.validateOption("pc");
        ESGeoReplicationService.validateOption("se");
        ESGeoReplicationService.validateOption("sc");
        ESGeoReplicationService.validateOption("rp");
    }

    private static void setPrimaryClient(Properties properties) throws UnknownHostException {
        String dns = cl.getOptionValue("pe");
        String clusterName = cl.getOptionValue("pc");
        properties.setProperty("hosts", dns);
        properties.setProperty("cluster.name", clusterName);
        System.out.println("Connecting to Primary Elasticsearch. DNS: " + dns + ", cluster: " + clusterName);
        primaryClient = TransportClientFactory.createTransportClient(properties);
    }

    private static void setSecondaryClient(Properties properties) throws UnknownHostException {
        String dns = cl.getOptionValue("se");
        String clusterName = cl.getOptionValue("sc");
        properties.setProperty("hosts", dns);
        properties.setProperty("cluster.name", clusterName);
        System.out.println("Connecting to Secondary Elasticsearch. DNS: " + dns + ", cluster: " + clusterName);
        secondaryClient = TransportClientFactory.createTransportClient(properties);
    }

    private static void printUsageAndExit() {
        HelpFormatter help = new HelpFormatter();
        help.printHelp("ESGeoReplicationService", ESGeoReplicationService.getOptions());
        System.exit(1);
    }

    private static Options getOptions() {
        Options options = new Options();
        options.addOption("pe", "primaryes", true, "primary elasticsearch host");
        options.addOption("se", "secondaryes", true, "secondary elasticsearch host");
        options.addOption("pc", "primarycluster", true, "primary elasticsearch cluster name");
        options.addOption("sc", "secondarycluster", true, "secondary elasticsearch cluster name");
        options.addOption("rp", "retentionperiod", true, "history retention period");
        return options;
    }

    private static Properties getProperties(String propertiesFile) {
        Properties props = new Properties();
        try {
            FileReader reader = new FileReader(propertiesFile);
            props.load(reader);
            reader.close();
        }
        catch (FileNotFoundException e) {
            System.out.println("Could not find properties file: " + propertiesFile + e);
        }
        catch (IOException e) {
            System.out.println("There was an error while reading properties" + e);
        }
        return props;
    }

    private static BulkProcessor.Listener createBulkListener() {
        return new BulkProcessor.Listener(){

            public void beforeBulk(long id, BulkRequest request) {
                System.out.println(hostname + ": Sending " + request.numberOfActions() + " Bulk Request...");
            }

            public void afterBulk(long id, BulkRequest request, Throwable t) {
                System.out.println("Bulk request failed with a throwable" + t);
            }

            public void afterBulk(long id, BulkRequest request, BulkResponse response) {
                if (response.hasFailures()) {
                    System.out.println("Bulk request failed: " + response.buildFailureMessage());
                } else {
                    System.out.println(hostname + ": Bulk request succeeded, saved " + response.getItems().length + " items at endtime : " + windowEnd);
                    ESGeoReplicationService.logLastRunBatchTime();
                }
            }
        };
    }

    private static void logLastSuccessTime(TransportClient client) {
        IndexRequestBuilder indexRequest = client.prepareIndex(LAST_SYNC_INDEX, "_doc", "3");
        HashMap<String, String> map = new HashMap<String, String>();
        map.put("lastHistoryGeoSyncDate", String.valueOf(windowEnd));
        indexRequest.setSource(map);
        IndexResponse response = (IndexResponse)indexRequest.execute().actionGet();
        if (response == null || !"3".equals(response.getId())) {
            System.out.println("ERROR: ES did not write lastsyncindex.");
        }
    }

    private static long readTimeFromLog() {
        SearchRequestBuilder searchBuilder = secondaryClient.prepareSearch(new String[]{LAST_SYNC_INDEX});
        TermQueryBuilder filterBuilder = QueryBuilders.termQuery((String)"_id", (String)"3");
        searchBuilder.setPostFilter((QueryBuilder)filterBuilder);
        SearchResponse response = (SearchResponse)searchBuilder.execute().actionGet(60000L);
        if (response != null && response.getHits().getTotalHits() > 0L) {
            String value = (String)response.getHits().getHits()[0].getSourceAsMap().get("lastHistoryGeoSyncDate");
            return Long.valueOf(value);
        }
        return 0L;
    }

    private static void runBatch() {
        long timeInFile;
        System.out.println("Syncing is running...");
        windowStart = timeInFile = ESGeoReplicationService.readTimeFromLog();
        String startIndex = ESGeoReplicationService.getIndexDate(windowStart);
        String endIndex = ESGeoReplicationService.getIndexDate(windowEnd);
        if (!startIndex.equals(endIndex) || windowEnd - windowStart > TimeUnit.MINUTES.toMillis(syncInterval)) {
            ESGeoReplicationService.catchUp(windowStart, windowEnd);
        } else {
            ESGeoReplicationService.setupScrollSearch(windowStart, windowEnd);
            syncSearchTotalHits = currentSearchHits;
            SearchHit[] hits = ESGeoReplicationService.getNextScrollInterval();
            while (hits.length > 0) {
                ESGeoReplicationService.addToBulkCache(hits);
                hits = ESGeoReplicationService.getNextScrollInterval();
            }
        }
        if (syncSearchTotalHits == 0L) {
            System.out.println("No records to sync, log the sync time only");
            ESGeoReplicationService.logLastRunBatchTime();
        }
    }

    private static void catchUp(long start, long end) {
        long cutoffDate = end - TimeUnit.DAYS.toMillis(retentionPeriods);
        long duration = TimeUnit.MINUTES.toMillis(syncInterval);
        if (start < cutoffDate) {
            start = cutoffDate;
        }
        for (long newEnd = end; newEnd >= start; newEnd -= duration) {
            ESGeoReplicationService.setupScrollSearch(newEnd - duration, newEnd);
            syncSearchTotalHits += currentSearchHits;
            SearchHit[] hits = ESGeoReplicationService.getNextScrollInterval();
            while (hits.length > 0) {
                ESGeoReplicationService.addToBulkCache(hits);
                hits = ESGeoReplicationService.getNextScrollInterval();
            }
        }
    }

    private static void purgeOldIndex() {
        System.out.println("Purging is running...");
        Date date = ESGeoReplicationService.getCutoffDate();
        AdminClient admin = secondaryClient.admin();
        try {
            MetaData metadata = ((ClusterStateResponse)admin.cluster().prepareState().get(TimeValue.timeValueMillis((long)60000L))).getState().getMetaData();
            for (String index : metadata.getConcreteAllIndices()) {
                if (!ESGeoReplicationService.shouldBeDeleted(index, date)) continue;
                System.out.println("Deleting process history index : " + index);
                admin.indices().delete(new DeleteIndexRequest(index));
            }
        }
        catch (Exception e) {
            System.out.println("There was an error when getting a list of indexes.");
        }
    }

    private static void setupScrollSearch(long start, long end) {
        BoolQueryBuilder filterBuilder = new BoolQueryBuilder().must((QueryBuilder)QueryBuilders.termsQuery((String)"logType", (String[])new String[]{"nodeLog", "messageLog", "resourceLog"})).must((QueryBuilder)QueryBuilders.rangeQuery((String)"eventTimestamp").gt((Object)start).lte((Object)end));
        SearchRequestBuilder searchBuilder = primaryClient.prepareSearch(new String[0]);
        searchBuilder.setPostFilter((QueryBuilder)filterBuilder);
        searchBuilder.setSize(1000);
        searchBuilder.setScroll(new TimeValue(60000L));
        SearchResponse response = (SearchResponse)searchBuilder.execute().actionGet(60000L);
        if (response.getHits() != null) {
            System.out.println(hostname + ": Reading " + response.getHits().getTotalHits() + " records from primary site from " + start + " to " + end);
        }
        currentSearchHits = response.getHits().getTotalHits();
        scrollId = response.getScrollId();
        indexSuffix = ESGeoReplicationService.getIndexDate(start);
        ESGeoReplicationService.addToBulkCache(response.getHits().getHits());
    }

    private static void addToBulkCache(SearchHit[] hits) {
        for (SearchHit hit : hits) {
            IndexRequestBuilder addLogRequestBuilder = secondaryClient.prepareIndex(INDEX_PREFIX + indexSuffix, "_doc", hit.getId());
            addLogRequestBuilder.setSource(hit.getSourceAsMap());
            bulkCache.add((IndexRequest)addLogRequestBuilder.request());
        }
    }

    private static SearchHit[] getNextScrollInterval() {
        SearchResponse response = (SearchResponse)primaryClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000L)).execute().actionGet(60000L);
        return response.getHits().getHits();
    }

    private static String getIndexDate(long timeInMillis) {
        return new SimpleDateFormat("yyyy-MM-dd").format(new Date(timeInMillis));
    }

    private static Date getCutoffDate() {
        return new Date(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(retentionPeriods));
    }

    private static boolean shouldBeDeleted(String index, Date date) {
        if (!index.contains(INDEX_PREFIX)) {
            return false;
        }
        String indexDate = index.substring(INDEX_PREFIX.length());
        try {
            Date date1 = ESGeoReplicationService.toDateOnly(indexDate);
            Date cutoff = ESGeoReplicationService.toDateOnly(sdf.format(date));
            if (date1.before(cutoff)) {
                return true;
            }
        }
        catch (java.text.ParseException e) {
            System.out.println("There is a parse error in the date format of the index " + index);
        }
        return false;
    }

    private static Date toDateOnly(String dateStr) throws java.text.ParseException {
        return sdf.parse(dateStr);
    }

    private static void logLastRunBatchTime() {
        ESGeoReplicationService.logLastSuccessTime(secondaryClient);
        ESGeoReplicationService.logLastSuccessTime(primaryClient);
    }
}

