/*
 * Decompiled with CFR 0.152.
 */
package com.tandbergtv.workflow.adaptor.dispatcher;

import com.tandbergtv.workflow.adaptor.conf.IDispatcherConfiguration;
import com.tandbergtv.workflow.adaptor.dispatcher.DispatcherException;
import com.tandbergtv.workflow.adaptor.dispatcher.TCPDispatcher;
import com.tandbergtv.workflow.adaptor.util.QueueManager;
import com.tandbergtv.workflow.comm.IDestination;
import com.tandbergtv.workflow.message.IMessage;
import com.tandbergtv.workflow.message.TCPMessage;
import java.io.IOException;
import org.apache.log4j.Logger;

public abstract class TCPReadWriteDispatcher
extends TCPDispatcher {
    private static final Logger LOGGER = Logger.getLogger(TCPReadWriteDispatcher.class);
    private QueueManager<String> queueMgr;
    private Thread readingThread;
    private WorkerThread readingWorkerThread;

    public TCPReadWriteDispatcher(IDispatcherConfiguration conf, IDestination destination) {
        super(conf, destination);
    }

    @Override
    protected synchronized void initialize() throws DispatcherException {
        super.initialize();
        if (this.readingWorkerThread == null && this.readingThread == null) {
            LOGGER.debug((Object)"Initializing the threads ...");
            this.queueMgr = new QueueManager();
            this.initializeProcessingWorkerThread();
            this.initializeReadingThread();
        }
    }

    private void initializeProcessingWorkerThread() {
        this.readingWorkerThread = new WorkerThread(this.getDeviceIP() + " - Processing Worker Thread");
    }

    private void initializeReadingThread() {
        this.readingThread = new Thread(new Runnable(){

            @Override
            public void run() {
                TCPReadWriteDispatcher.this.startReading();
            }
        }, this.getDeviceIP() + " - Reading Thread");
        this.readingThread.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void startReading() {
        try {
            String strReading;
            while ((strReading = this.getReader().readLine()) != null) {
                LOGGER.debug((Object)("Received message: " + strReading));
                final String finalStringRead = strReading;
                new Thread(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            TCPReadWriteDispatcher.this.queueMgr.push(finalStringRead);
                        }
                        catch (Exception ex) {
                            LOGGER.warn((Object)"could not able to push to the queue", (Throwable)ex);
                        }
                    }
                }).start();
            }
        }
        catch (IOException ioe) {
            LOGGER.error((Object)"Shutting down", (Throwable)ioe);
        }
        finally {
            try {
                this.destroy();
            }
            catch (Exception ex) {
                LOGGER.error((Object)"Could not destroy", (Throwable)ex);
            }
        }
    }

    @Override
    protected synchronized void destroy() throws DispatcherException {
        LOGGER.info((Object)"Destroying all the threads associated");
        try {
            if (this.readingThread != null) {
                this.readingThread.interrupt();
            }
        }
        catch (Exception ex) {
            LOGGER.warn((Object)"Interrupting the reading thread failed", (Throwable)ex);
        }
        this.readingThread = null;
        try {
            if (this.readingWorkerThread != null) {
                this.readingWorkerThread.setStop(true);
            }
        }
        catch (Exception ex) {
            LOGGER.warn((Object)"Stopping the worker thread failed", (Throwable)ex);
        }
        this.readingWorkerThread = null;
        super.destroy();
    }

    @Override
    public IMessage send(IMessage msg) throws DispatcherException {
        this.initialize();
        return this.sendMessage(msg);
    }

    protected abstract String getDeviceIP();

    @Override
    protected abstract TCPMessage sendMessage(IMessage var1) throws DispatcherException;

    protected abstract void processReadMessage(String var1) throws DispatcherException;

    private class WorkerThread
    extends Thread {
        private boolean stop;

        protected synchronized boolean isStop() {
            return this.stop;
        }

        protected synchronized void setStop(boolean stop) {
            this.stop = stop;
        }

        public WorkerThread(String name) {
            super(name);
            this.stop = false;
            this.start();
        }

        @Override
        public void run() {
            try {
                while (!(this.isInterrupted() || this.isStop() && TCPReadWriteDispatcher.this.queueMgr.size() == 0)) {
                    this.popAndProcess();
                }
            }
            catch (Exception ex) {
                LOGGER.warn((Object)("Worker thread name: " + this.getName() + " got interrupted"), (Throwable)ex);
            }
            LOGGER.info((Object)"Finished working...");
        }

        private void popAndProcess() {
            try {
                String readString = (String)TCPReadWriteDispatcher.this.queueMgr.pop();
                LOGGER.debug((Object)("Processing the popped string: " + readString));
                TCPReadWriteDispatcher.this.processReadMessage(readString);
                LOGGER.debug((Object)"Done processing...");
            }
            catch (Exception ex) {
                LOGGER.warn((Object)"Could not process the read message", (Throwable)ex);
            }
        }
    }
}

