/*
 * Decompiled with CFR 0.152.
 */
package com.tandbergtv.workflow.adaptor.dispatcher;

import com.tandbergtv.workflow.adaptor.conf.IDispatcherConfiguration;
import com.tandbergtv.workflow.adaptor.dispatcher.DispatcherException;
import com.tandbergtv.workflow.adaptor.dispatcher.TCPDispatcher;
import com.tandbergtv.workflow.adaptor.util.QueueManager;
import com.tandbergtv.workflow.comm.IDestination;
import com.tandbergtv.workflow.message.IMessage;
import com.tandbergtv.workflow.message.TCPMessage;
import java.io.IOException;
import org.apache.log4j.Logger;

public abstract class TCPReadWriteDispatcher
extends TCPDispatcher {
    private static final Logger logger = Logger.getLogger(TCPReadWriteDispatcher.class);
    private QueueManager<String> queueMgr;
    private Thread readingThread;
    private WorkerThread readingWorkerThread;

    public TCPReadWriteDispatcher(IDispatcherConfiguration conf, IDestination destination) {
        super(conf, destination);
    }

    @Override
    protected synchronized void initialize() throws DispatcherException {
        super.initialize();
        if (this.readingWorkerThread == null && this.readingThread == null) {
            logger.debug((Object)"Initializing the threads ...");
            this.queueMgr = new QueueManager();
            this.initializeProcessingWorkerThread();
            this.initializeReadingThread();
        }
    }

    private void initializeProcessingWorkerThread() {
        this.readingWorkerThread = new WorkerThread(String.valueOf(this.getDeviceIP()) + " - Processing Worker Thread");
    }

    private void initializeReadingThread() {
        this.readingThread = new Thread(new Runnable(){

            @Override
            public void run() {
                TCPReadWriteDispatcher.this.startReading();
            }
        }, String.valueOf(this.getDeviceIP()) + " - Reading Thread");
        this.readingThread.start();
    }

    private void startReading() {
        try {
            try {
                String strReading;
                while ((strReading = this.getReader().readLine()) != null) {
                    logger.debug((Object)("Received message: " + strReading));
                    final String finalStringRead = strReading;
                    new Thread(new Runnable(){

                        @Override
                        public void run() {
                            try {
                                TCPReadWriteDispatcher.this.queueMgr.push(finalStringRead);
                            }
                            catch (Exception ex) {
                                logger.warn((Object)"could not able to push to the queue", (Throwable)ex);
                            }
                        }
                    }).start();
                }
            }
            catch (IOException ioe) {
                logger.error((Object)"Shutting down", (Throwable)ioe);
                try {
                    this.destroy();
                }
                catch (Exception ex) {
                    logger.error((Object)"Could not destroy", (Throwable)ex);
                }
            }
        }
        finally {
            try {
                this.destroy();
            }
            catch (Exception ex) {
                logger.error((Object)"Could not destroy", (Throwable)ex);
            }
        }
    }

    @Override
    protected synchronized void destroy() throws DispatcherException {
        logger.info((Object)"Destroying all the threads associated");
        try {
            if (this.readingThread != null) {
                this.readingThread.interrupt();
            }
        }
        catch (Exception ex) {
            logger.warn((Object)"Interrupting the reading thread failed", (Throwable)ex);
        }
        this.readingThread = null;
        try {
            if (this.readingWorkerThread != null) {
                this.readingWorkerThread.setStop(true);
            }
        }
        catch (Exception ex) {
            logger.warn((Object)"Stopping the worker thread failed", (Throwable)ex);
        }
        this.readingWorkerThread = null;
        super.destroy();
    }

    @Override
    public IMessage send(IMessage msg) throws DispatcherException {
        this.initialize();
        return this.sendMessage(msg);
    }

    protected abstract String getDeviceIP();

    @Override
    protected abstract TCPMessage sendMessage(IMessage var1) throws DispatcherException;

    protected abstract void processReadMessage(String var1) throws DispatcherException;

    private class WorkerThread
    extends Thread {
        private boolean stop;

        protected synchronized boolean isStop() {
            return this.stop;
        }

        protected synchronized void setStop(boolean stop) {
            this.stop = stop;
        }

        public WorkerThread(String name) {
            super(name);
            this.stop = false;
            this.start();
        }

        @Override
        public void run() {
            try {
                while (!this.isInterrupted()) {
                    if (!this.isStop() || TCPReadWriteDispatcher.this.queueMgr.size() != 0) {
                        this.popAndProcess();
                        continue;
                    }
                    break;
                }
            }
            catch (Exception ex) {
                logger.warn((Object)("Worker thread name: " + this.getName() + " got interrupted"), (Throwable)ex);
            }
            logger.info((Object)"Finished working...");
        }

        public void popAllAndProcess() {
            while (TCPReadWriteDispatcher.this.queueMgr.size() > 0) {
                this.popAndProcess();
            }
        }

        private void popAndProcess() {
            try {
                String readString = (String)TCPReadWriteDispatcher.this.queueMgr.pop();
                logger.debug((Object)("Processing the popped string: " + readString));
                TCPReadWriteDispatcher.this.processReadMessage(readString);
                logger.debug((Object)"Done processing...");
            }
            catch (Exception ex) {
                logger.warn((Object)"Could not process the read message", (Throwable)ex);
            }
        }
    }
}

