/*
 * Decompiled with CFR 0.152.
 */
package com.tandbergtv.workflow.core.service.internal;

import com.hazelcast.core.DistributedTask;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.Member;
import com.hazelcast.core.MemberLeftException;
import com.hazelcast.core.MultiTask;
import com.tandbergtv.workflow.core.service.Cluster;
import com.tandbergtv.workflow.core.service.internal.DistributedScheduler;
import com.tandbergtv.workflow.core.service.thread.DistributedCallable;
import com.tandbergtv.workflow.core.service.thread.DistributedLocalCallable;
import com.tandbergtv.workflow.core.service.thread.DistributedMasterCallable;
import com.tandbergtv.workflow.core.service.thread.IRecoverableSchedulerService;
import com.tandbergtv.workflow.core.service.thread.ISchedulerService;
import com.tandbergtv.workflow.core.service.thread.ITaskCompletionListener;
import com.tandbergtv.workflow.core.service.thread.Scheduler;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;

public class RecoverableDistributedScheduler<T>
extends DistributedScheduler<T>
implements IRecoverableSchedulerService<T> {
    private static final Logger logger = Logger.getLogger(RecoverableDistributedScheduler.class);
    private final Set<Class<? extends Exception>> recoverableErrors = new CopyOnWriteArraySet<Class<? extends Exception>>();
    private final List<ITaskCompletionListener> listeners = new CopyOnWriteArrayList<ITaskCompletionListener>();
    private final int MAX_RECOVERY_COUNT = 10;
    private ISchedulerService<Void> recoveryScheduler;
    private int recoverySize = 1;
    private int maximumRetryAttempts = 10;
    private final AtomicInteger queueSize = new AtomicInteger(0);

    public RecoverableDistributedScheduler(String name) {
        super(name);
    }

    public RecoverableDistributedScheduler(String name, int size) {
        super(name, size);
    }

    public RecoverableDistributedScheduler(String name, int size, int recoverySize) {
        this(name, size);
        this.recoverySize = recoverySize;
    }

    @Override
    public Future<T> schedule(Callable<T> callable) {
        RunnableFuture task = this.newTaskFor((Callable)callable);
        RecoverableTask recoverableTask = new RecoverableTask(task);
        recoverableTask.lock.lock();
        try {
            this.getExecutor().execute(task);
            logger.debug((Object)(String.valueOf(this.getServiceName()) + " - Queue Size: " + this.queueSize.incrementAndGet()));
        }
        finally {
            recoverableTask.lock.unlock();
        }
        return recoverableTask;
    }

    @Override
    protected IDistributedTaskWithNotification<T> newTaskFor(Callable<T> callable) {
        Object key;
        IDistributedTaskWithNotification<T> future = null;
        future = callable instanceof DistributedMasterCallable ? new DistributedTaskWithNotification<T>(callable, Cluster.masterMember()) : (callable instanceof DistributedLocalCallable ? new DistributedTaskWithNotification<T>(callable, Cluster.localMember()) : (callable instanceof DistributedCallable ? ((key = ((DistributedCallable)DistributedCallable.class.cast(callable)).getKey()) != null ? (key instanceof InetSocketAddress ? new DistributedTaskWithNotification<T>(callable, Cluster.memberForAddress((InetSocketAddress)InetSocketAddress.class.cast(key))) : new DistributedTaskWithNotification<T>(callable, key)) : new MultiTaskWithNotification<T>(callable)) : new DistributedTaskWithNotification<T>(callable)));
        return future;
    }

    @Override
    public void addRecoverableError(Class<? extends Exception> errorClass) {
        this.recoverableErrors.add(errorClass);
    }

    private boolean isRecoverableError(Throwable error) {
        for (Class<? extends Exception> errorClass : this.recoverableErrors) {
            if (!errorClass.isInstance(error)) continue;
            return true;
        }
        return false;
    }

    @Override
    public int getMaximumRetryAttempts() {
        return this.maximumRetryAttempts;
    }

    @Override
    public void setMaximumRetryAttempts(int maximumRetryAttempts) {
        this.maximumRetryAttempts = maximumRetryAttempts;
    }

    @Override
    public void addTaskCompletionListener(ITaskCompletionListener listener) {
        if (listener != null) {
            this.listeners.add(listener);
        }
    }

    @Override
    public void removeTaskCompletionListener(ITaskCompletionListener listener) {
        if (listener != null) {
            this.listeners.remove(listener);
        }
    }

    @Override
    public void start() {
        super.start();
        String name = String.valueOf(this.getServiceName()) + "-recovery";
        this.recoveryScheduler = new Scheduler<Void>(name, 1, this.recoverySize);
        this.recoveryScheduler.start();
    }

    @Override
    public void stop() {
        this.recoveryScheduler.stop();
        this.recoveryScheduler = null;
        super.stop();
    }

    private static final class DistributedTaskWithNotification<R>
    extends DistributedTask<R>
    implements IDistributedTaskWithNotification<R> {
        private final Callable<R> callable;
        private transient ExecutionCallback<R> callback;
        private final transient AtomicBoolean notified = new AtomicBoolean(false);
        private final transient Lock lock = new ReentrantLock();

        DistributedTaskWithNotification(Callable<R> callable, Member member) {
            super(callable, member);
            this.callable = callable;
        }

        DistributedTaskWithNotification(Callable<R> callable, Object key) {
            super(callable, key);
            this.callable = callable;
        }

        DistributedTaskWithNotification(Callable<R> callable) {
            super(callable);
            this.callable = callable;
        }

        protected void done() {
            super.done();
            if (this.notified.compareAndSet(false, true) && this.callback != null) {
                this.callback.done((Future)this);
            }
        }

        @Override
        public R get() throws ExecutionException, InterruptedException {
            this.lock.lockInterruptibly();
            try {
                Object object = super.get();
                return (R)object;
            }
            finally {
                this.lock.unlock();
            }
        }

        @Override
        public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            long timeoutInMillis = unit.toMillis(timeout);
            long now = System.currentTimeMillis();
            if (this.lock.tryLock(timeout, unit)) {
                if ((timeoutInMillis -= System.currentTimeMillis() - now) <= 0L) {
                    throw new TimeoutException();
                }
                try {
                    Object object = super.get(timeoutInMillis, TimeUnit.MILLISECONDS);
                    return (R)object;
                }
                finally {
                    this.lock.unlock();
                }
            }
            throw new TimeoutException();
        }

        @Override
        public Callable<R> getCallable() {
            return this.callable;
        }

        @Override
        public void setNotificationCallback(ExecutionCallback<R> callback) {
            this.callback = callback;
        }
    }

    private static interface IDistributedTaskWithNotification<R>
    extends RunnableFuture<R> {
        public Callable<R> getCallable();

        public void setNotificationCallback(ExecutionCallback<R> var1);
    }

    private static final class MultiTaskWithNotification<R>
    extends MultiTask<R>
    implements IDistributedTaskWithNotification {
        private final Callable<R> callable;
        private transient ExecutionCallback<R> callback = null;
        private final transient AtomicBoolean notified = new AtomicBoolean(false);
        private final transient Lock lock = new ReentrantLock();

        MultiTaskWithNotification(Callable<R> callable) {
            super(callable, Hazelcast.getCluster().getMembers());
            this.callable = callable;
        }

        protected void done() {
            super.done();
            if (this.notified.compareAndSet(false, true) && this.callback != null) {
                this.callback.done((Future)this);
            }
        }

        @Override
        public Collection<R> get() throws ExecutionException, InterruptedException {
            this.lock.lockInterruptibly();
            try {
                Collection collection = super.get();
                return collection;
            }
            finally {
                this.lock.unlock();
            }
        }

        @Override
        public Collection<R> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            long timeoutInMillis = unit.toMillis(timeout);
            long now = System.currentTimeMillis();
            if (this.lock.tryLock(timeout, unit)) {
                if ((timeoutInMillis -= System.currentTimeMillis() - now) <= 0L) {
                    throw new TimeoutException();
                }
                try {
                    Collection collection = super.get(timeoutInMillis, TimeUnit.MILLISECONDS);
                    return collection;
                }
                finally {
                    this.lock.unlock();
                }
            }
            throw new TimeoutException();
        }

        public Callable getCallable() {
            return this.callable;
        }

        public void setNotificationCallback(ExecutionCallback callback) {
            this.callback = callback;
        }
    }

    private class RecoverableTask
    implements Future<T>,
    ExecutionCallback<T> {
        private IDistributedTaskWithNotification<T> task;
        private boolean done = false;
        private int recoveryCount = 0;
        private final Lock lock = new ReentrantLock();

        RecoverableTask(IDistributedTaskWithNotification<T> task) {
            this.task = task;
            this.task.setNotificationCallback(this);
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            this.lock.lock();
            try {
                boolean bl = this.task.cancel(mayInterruptIfRunning);
                return bl;
            }
            finally {
                this.lock.unlock();
            }
        }

        @Override
        public boolean isCancelled() {
            this.lock.lock();
            try {
                boolean bl = this.task.isCancelled();
                return bl;
            }
            finally {
                this.lock.unlock();
            }
        }

        @Override
        public boolean isDone() {
            this.lock.lock();
            try {
                boolean bl = this.done;
                return bl;
            }
            finally {
                this.lock.unlock();
            }
        }

        @Override
        public T get() throws InterruptedException, ExecutionException {
            while (!this.isDone()) {
                try {
                    this.task.get();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
            this.lock.lock();
            try {
                Object v = this.task.get();
                return v;
            }
            finally {
                this.lock.unlock();
            }
        }

        @Override
        public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            long timeoutInMillis = unit.toMillis(timeout);
            while (!this.isDone()) {
                if (timeoutInMillis <= 0L) {
                    throw new TimeoutException();
                }
                long now = System.currentTimeMillis();
                try {
                    this.task.get(timeoutInMillis, TimeUnit.MILLISECONDS);
                }
                catch (TimeoutException e) {
                    throw e;
                }
                catch (Exception exception) {
                    // empty catch block
                }
                timeoutInMillis -= System.currentTimeMillis() - now;
            }
            this.lock.lock();
            try {
                Object v = this.task.get();
                return v;
            }
            finally {
                this.lock.unlock();
            }
        }

        public void done(Future<T> future) {
            logger.debug((Object)(String.valueOf(RecoverableDistributedScheduler.this.getServiceName()) + " - Queue Size: " + RecoverableDistributedScheduler.this.queueSize.decrementAndGet()));
            RecoverableDistributedScheduler.this.recoveryScheduler.schedule(new RecoveryCallable(this));
        }

        public void tryToRecover() {
            this.lock.lock();
            try {
                boolean recover = false;
                try {
                    this.task.get();
                }
                catch (MemberLeftException e) {
                    recover = true;
                }
                catch (ExecutionException e) {
                    if (RecoverableDistributedScheduler.this.isRecoverableError(e.getCause())) {
                        recover = true;
                    }
                }
                catch (Exception e) {
                    // empty catch block
                }
                String name = this.task.getCallable().toString();
                if (recover && this.recoveryCount < RecoverableDistributedScheduler.this.getMaximumRetryAttempts()) {
                    logger.debug((Object)("Recovering task: " + name + ", rescheduling..."));
                    RunnableFuture newTask = RecoverableDistributedScheduler.this.newTaskFor(this.task.getCallable());
                    newTask.setNotificationCallback(this);
                    RecoverableDistributedScheduler.this.getExecutor().submit(newTask);
                    logger.debug((Object)(String.valueOf(RecoverableDistributedScheduler.this.getServiceName()) + " - Queue Size: " + RecoverableDistributedScheduler.this.queueSize.incrementAndGet()));
                    this.task.setNotificationCallback(null);
                    this.task = newTask;
                    ++this.recoveryCount;
                } else {
                    if (recover) {
                        logger.warn((Object)("Task: " + name + " has exceeded " + RecoverableDistributedScheduler.this.getMaximumRetryAttempts() + " attempts to recover, failing task."));
                    }
                    logger.debug((Object)("Task: " + name + ", completed."));
                    this.task.setNotificationCallback(null);
                    this.setDone();
                }
            }
            finally {
                this.lock.unlock();
            }
        }

        private void setDone() {
            if (!this.done) {
                this.done = true;
                for (ITaskCompletionListener listener : RecoverableDistributedScheduler.this.listeners) {
                    try {
                        listener.onTaskCompleted(this.task.getCallable(), this);
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
            }
        }
    }

    private static final class RecoveryCallable
    implements Callable<Void> {
        private final RecoverableTask task;

        public RecoveryCallable(RecoverableTask task) {
            this.task = task;
        }

        @Override
        public Void call() throws Exception {
            long start = System.currentTimeMillis();
            try {
                this.task.tryToRecover();
                return null;
            }
            finally {
                long duration = System.currentTimeMillis() - start;
                logger.debug((Object)("Completed recovery attempt for: " + this.task.task.getCallable() + " in " + duration + " ms."));
            }
        }
    }
}

