/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.transport.local;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.ThrowableObjectInputStream;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.HandlesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.NodeNotConnectedException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.ResponseHandlerFailureTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportSerializationException;
import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.local.LocalTransportChannel;
import org.elasticsearch.transport.support.TransportStatus;

public class LocalTransport
extends AbstractLifecycleComponent<Transport>
implements Transport {
    private final ThreadPool threadPool;
    private final Version version;
    private volatile TransportServiceAdapter transportServiceAdapter;
    private volatile BoundTransportAddress boundAddress;
    private volatile LocalTransportAddress localAddress;
    private static final ConcurrentMap<TransportAddress, LocalTransport> transports = ConcurrentCollections.newConcurrentMap();
    private static final AtomicLong transportAddressIdGenerator = new AtomicLong();
    private final ConcurrentMap<DiscoveryNode, LocalTransport> connectedNodes = ConcurrentCollections.newConcurrentMap();

    @Inject
    public LocalTransport(Settings settings, ThreadPool threadPool, Version version) {
        super(settings);
        this.threadPool = threadPool;
        this.version = version;
    }

    @Override
    public TransportAddress[] addressesFromString(String address) {
        return new TransportAddress[]{new LocalTransportAddress(address)};
    }

    @Override
    public boolean addressSupported(Class<? extends TransportAddress> address) {
        return LocalTransportAddress.class.equals(address);
    }

    @Override
    protected void doStart() throws ElasticsearchException {
        this.localAddress = new LocalTransportAddress(Long.toString(transportAddressIdGenerator.incrementAndGet()));
        transports.put(this.localAddress, this);
        this.boundAddress = new BoundTransportAddress(this.localAddress, this.localAddress);
    }

    @Override
    protected void doStop() throws ElasticsearchException {
        transports.remove(this.localAddress);
        for (LocalTransport targetTransport : transports.values()) {
            for (Map.Entry entry : targetTransport.connectedNodes.entrySet()) {
                if (entry.getValue() != this) continue;
                targetTransport.disconnectFromNode((DiscoveryNode)entry.getKey());
            }
        }
    }

    @Override
    protected void doClose() throws ElasticsearchException {
    }

    @Override
    public void transportServiceAdapter(TransportServiceAdapter transportServiceAdapter) {
        this.transportServiceAdapter = transportServiceAdapter;
    }

    @Override
    public BoundTransportAddress boundAddress() {
        return this.boundAddress;
    }

    @Override
    public boolean nodeConnected(DiscoveryNode node) {
        return this.connectedNodes.containsKey(node);
    }

    @Override
    public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException {
        this.connectToNode(node);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
        LocalTransport localTransport = this;
        synchronized (localTransport) {
            if (this.connectedNodes.containsKey(node)) {
                return;
            }
            LocalTransport targetTransport = (LocalTransport)transports.get(node.address());
            if (targetTransport == null) {
                throw new ConnectTransportException(node, "Failed to connect");
            }
            this.connectedNodes.put(node, targetTransport);
            this.transportServiceAdapter.raiseNodeConnected(node);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void disconnectFromNode(DiscoveryNode node) {
        LocalTransport localTransport = this;
        synchronized (localTransport) {
            LocalTransport removed = (LocalTransport)this.connectedNodes.remove(node);
            if (removed != null) {
                this.transportServiceAdapter.raiseNodeDisconnected(node);
            }
        }
    }

    @Override
    public long serverOpen() {
        return 0L;
    }

    @Override
    public void sendRequest(DiscoveryNode node, final long requestId, final String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
        final Version version = Version.smallest(node.version(), this.version);
        BytesStreamOutput bStream = new BytesStreamOutput();
        HandlesStreamOutput stream = new HandlesStreamOutput(bStream);
        ((StreamOutput)stream).setVersion(version);
        ((StreamOutput)stream).writeLong(requestId);
        byte status = 0;
        status = TransportStatus.setRequest(status);
        ((StreamOutput)stream).writeByte(status);
        ((StreamOutput)stream).writeString(action);
        request.writeTo(stream);
        ((StreamOutput)stream).close();
        final LocalTransport targetTransport = (LocalTransport)this.connectedNodes.get(node);
        if (targetTransport == null) {
            throw new NodeNotConnectedException(node, "Node not connected");
        }
        final byte[] data = bStream.bytes().toBytes();
        this.transportServiceAdapter.sent(data.length);
        this.threadPool.generic().execute(new Runnable(){

            @Override
            public void run() {
                targetTransport.messageReceived(data, action, LocalTransport.this, version, requestId);
            }
        });
    }

    ThreadPool threadPool() {
        return this.threadPool;
    }

    protected void messageReceived(byte[] data, String action, LocalTransport sourceTransport, Version version, @Nullable Long sendRequestId) {
        try {
            this.transportServiceAdapter.received(data.length);
            StreamInput stream = new BytesStreamInput(data, false);
            stream = CachedStreamInput.cachedHandles(stream);
            stream.setVersion(version);
            long requestId = stream.readLong();
            byte status = stream.readByte();
            boolean isRequest = TransportStatus.isRequest(status);
            if (isRequest) {
                this.handleRequest(stream, requestId, sourceTransport, version);
            } else {
                TransportResponseHandler handler = this.transportServiceAdapter.remove(requestId);
                if (handler != null) {
                    if (TransportStatus.isError(status)) {
                        this.handlerResponseError(stream, handler);
                    } else {
                        this.handleResponse(stream, handler);
                    }
                }
            }
        }
        catch (Throwable e) {
            if (sendRequestId != null) {
                TransportResponseHandler handler = this.transportServiceAdapter.remove(sendRequestId);
                if (handler != null) {
                    this.handleException(handler, new RemoteTransportException(this.nodeName(), this.localAddress, action, e));
                }
            }
            this.logger.warn("Failed to receive message for action [" + action + "]", e, new Object[0]);
        }
    }

    private void handleRequest(StreamInput stream, long requestId, LocalTransport sourceTransport, Version version) throws Exception {
        String action = stream.readString();
        LocalTransportChannel transportChannel = new LocalTransportChannel(this, sourceTransport, action, requestId, version);
        try {
            final TransportRequestHandler handler = this.transportServiceAdapter.handler(action);
            if (handler == null) {
                throw new ActionNotFoundTransportException("Action [" + action + "] not found");
            }
            Object request = handler.newInstance();
            ((TransportRequest)request).readFrom(stream);
            if (handler.executor() == "same") {
                handler.messageReceived(request, transportChannel);
            } else {
                this.threadPool.executor(handler.executor()).execute(new AbstractRunnable((TransportRequest)request, transportChannel, action){
                    final /* synthetic */ TransportRequest val$request;
                    final /* synthetic */ LocalTransportChannel val$transportChannel;
                    final /* synthetic */ String val$action;
                    {
                        this.val$request = transportRequest;
                        this.val$transportChannel = localTransportChannel;
                        this.val$action = string;
                    }

                    @Override
                    public void run() {
                        block4: {
                            try {
                                handler.messageReceived(this.val$request, this.val$transportChannel);
                            }
                            catch (Throwable e) {
                                if (LocalTransport.this.lifecycleState() != Lifecycle.State.STARTED) break block4;
                                try {
                                    this.val$transportChannel.sendResponse(e);
                                }
                                catch (Throwable e1) {
                                    LocalTransport.this.logger.warn("Failed to send error message back to client for action [" + this.val$action + "]", e1, new Object[0]);
                                    LocalTransport.this.logger.warn("Actual Exception", e, new Object[0]);
                                }
                            }
                        }
                    }

                    @Override
                    public boolean isForceExecution() {
                        return handler.isForceExecution();
                    }
                });
            }
        }
        catch (Throwable e) {
            try {
                transportChannel.sendResponse(e);
            }
            catch (Throwable e1) {
                this.logger.warn("Failed to send error message back to client for action [" + action + "]", e, new Object[0]);
                this.logger.warn("Actual Exception", e1, new Object[0]);
            }
        }
    }

    protected void handleResponse(StreamInput buffer, TransportResponseHandler handler) {
        Object response = handler.newInstance();
        try {
            ((TransportResponse)response).readFrom(buffer);
        }
        catch (Throwable e) {
            this.handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
            return;
        }
        this.handleParsedResponse((TransportResponse)response, handler);
    }

    protected void handleParsedResponse(final TransportResponse response, final TransportResponseHandler handler) {
        this.threadPool.executor(handler.executor()).execute(new Runnable(){

            @Override
            public void run() {
                try {
                    handler.handleResponse(response);
                }
                catch (Throwable e) {
                    LocalTransport.this.handleException(handler, new ResponseHandlerFailureTransportException(e));
                }
            }
        });
    }

    private void handlerResponseError(StreamInput buffer, TransportResponseHandler handler) {
        Throwable error;
        try {
            ThrowableObjectInputStream ois = new ThrowableObjectInputStream(buffer, this.settings.getClassLoader());
            error = (Throwable)ois.readObject();
        }
        catch (Throwable e) {
            error = new TransportSerializationException("Failed to deserialize exception response from stream", e);
        }
        this.handleException(handler, error);
    }

    private void handleException(TransportResponseHandler handler, Throwable error) {
        if (!(error instanceof RemoteTransportException)) {
            error = new RemoteTransportException("None remote transport exception", error);
        }
        RemoteTransportException rtx = (RemoteTransportException)error;
        try {
            handler.handleException(rtx);
        }
        catch (Throwable t) {
            this.logger.error("failed to handle exception response [{}]", t, handler);
        }
    }
}

