/*
 * Decompiled with CFR 0.152.
 */
package org.openorb.notify.impl;

import org.omg.CORBA.BAD_PARAM;
import org.omg.CORBA.NO_IMPLEMENT;
import org.omg.CosEventChannelAdmin.AlreadyConnected;
import org.omg.CosEventChannelAdmin.TypeError;
import org.omg.CosNotification.EventType;
import org.omg.CosNotification.NamedPropertyRangeSeqHolder;
import org.omg.CosNotification.Property;
import org.omg.CosNotification.UnsupportedQoS;
import org.omg.CosNotifyChannelAdmin.ConnectionAlreadyActive;
import org.omg.CosNotifyChannelAdmin.ConnectionAlreadyInactive;
import org.omg.CosNotifyChannelAdmin.NotConnected;
import org.omg.CosNotifyChannelAdmin.ObtainInfoMode;
import org.omg.CosNotifyChannelAdmin.ProxyType;
import org.omg.CosNotifyChannelAdmin.StructuredProxyPullConsumerPOA;
import org.omg.CosNotifyChannelAdmin.SupplierAdmin;
import org.omg.CosNotifyComm.InvalidEventType;
import org.omg.CosNotifyComm.StructuredPullSupplier;
import org.omg.CosNotifyComm.StructuredPullSupplierHelper;
import org.omg.CosNotifyFilter.Filter;
import org.omg.CosNotifyFilter.FilterHelper;
import org.omg.CosNotifyFilter.FilterNotFound;
import org.omg.PortableServer.POAPackage.ObjectNotActive;
import org.omg.PortableServer.POAPackage.WrongPolicy;
import org.openorb.notify.Logger;
import org.openorb.notify.PersistenceManagement;
import org.openorb.notify.PersistenceRepository;
import org.openorb.notify.PropertiesRepository;
import org.openorb.notify.SupplierProxyManagement;
import org.openorb.notify.Util;
import org.openorb.notify.impl.SupplierAdminImpl;
import org.openorb.notify.persistence.EventTypeInfo;
import org.openorb.notify.persistence.ProxyConsumer;
import org.openorb.notify.persistence.ReferenceInfo;
import org.openorb.notify.queue.EventQueueFactory;
import org.openorb.notify.queue.FilterableEventQueue;
import org.openorb.notify.queue.StructuredPuller;

public class StructuredProxyPullConsumerImpl
extends StructuredProxyPullConsumerPOA
implements SupplierProxyManagement,
PersistenceManagement {
    private ProxyConsumer m_pssProxyConsumer;
    private SupplierAdminImpl m_supplierAdmin;
    private FilterableEventQueue m_queue;
    private StructuredPuller m_puller;
    private PropertiesRepository m_propertiesRepository;
    private Logger m_logger;

    public StructuredProxyPullConsumerImpl(SupplierAdminImpl supplierAdmin, int pullConsumerId, int adminId, Property[] qosProperties, Logger logger) {
        this.m_propertiesRepository = new PropertiesRepository(qosProperties);
        this.m_pssProxyConsumer = PersistenceRepository.getProxyConsumerHome(this.m_propertiesRepository.isConnectionReliable()).create(pullConsumerId, adminId, ProxyType.PULL_STRUCTURED, null, false, qosProperties, new ReferenceInfo[0], new ReferenceInfo[0], 0, new EventTypeInfo[0]);
        this.m_supplierAdmin = supplierAdmin;
        this.m_logger = logger;
        this.configureQueue();
        this.configurePuller();
    }

    public StructuredProxyPullConsumerImpl(SupplierAdminImpl supplierAdmin, ProxyConsumer pssProxyConsumer, Logger logger) {
        this.m_supplierAdmin = supplierAdmin;
        this.m_pssProxyConsumer = pssProxyConsumer;
        this.m_propertiesRepository = new PropertiesRepository(pssProxyConsumer.qos_properties());
        this.m_logger = logger;
        this.configureQueue();
        this.configurePuller();
        this.recoverConnections();
    }

    private void recoverConnections() {
        if (this.m_pssProxyConsumer.connected_supplier() != null) {
            this.m_puller.setSupplier(StructuredPullSupplierHelper.narrow(this.m_pssProxyConsumer.connected_supplier()));
            if (this.m_pssProxyConsumer.is_connected()) {
                this.m_puller.setConnectionActive(true);
            }
        }
        ReferenceInfo[] filters = this.m_pssProxyConsumer.filters();
        int i = 0;
        while (i < filters.length) {
            this.m_queue.addFilter(FilterHelper.narrow(filters[i].reference));
            ++i;
        }
    }

    private void configureQueue() {
        this.m_queue = EventQueueFactory.createConsumerProxyQueue("[C STRPULL] " + this.getId(), this.m_propertiesRepository, this.m_supplierAdmin.isQueueMaxPerformance(), this.m_supplierAdmin.isEventQueueLogged(), this.m_supplierAdmin.isFilterEvaluationLogged(), this.getLogger().getChildLogger("queue"));
    }

    private void configurePuller() {
        Integer latency = (Integer)this.m_supplierAdmin.getNotifyProperty("PullThreadLatency");
        this.m_puller = new StructuredPuller("[C STRPULL] " + this.getId(), this.m_queue, this.m_supplierAdmin, this, latency.intValue(), this.getLogger());
        this.m_puller.startThread();
    }

    public void connect_structured_pull_supplier(StructuredPullSupplier pull_supplier) throws AlreadyConnected, TypeError {
        if (this.m_pssProxyConsumer.connected_supplier() != null) {
            throw new AlreadyConnected();
        }
        if (pull_supplier == null) {
            throw new BAD_PARAM();
        }
        this.m_pssProxyConsumer.connected_supplier(pull_supplier);
        this.m_pssProxyConsumer.is_connected(true);
        this.m_puller.setSupplier(pull_supplier);
        this.m_puller.setConnectionActive(true);
    }

    public void suspend_connection() throws ConnectionAlreadyInactive, NotConnected {
        if (this.m_pssProxyConsumer.connected_supplier() == null) {
            throw new NotConnected();
        }
        if (!this.m_pssProxyConsumer.is_connected()) {
            throw new ConnectionAlreadyInactive();
        }
        this.m_pssProxyConsumer.is_connected(false);
        this.m_puller.setConnectionActive(false);
    }

    public void resume_connection() throws ConnectionAlreadyActive, NotConnected {
        if (this.m_pssProxyConsumer.connected_supplier() == null) {
            throw new NotConnected();
        }
        if (this.m_pssProxyConsumer.is_connected()) {
            throw new ConnectionAlreadyActive();
        }
        this.m_pssProxyConsumer.is_connected(true);
        this.m_puller.setConnectionActive(true);
    }

    public ProxyType MyType() {
        return this.m_pssProxyConsumer.type();
    }

    public SupplierAdmin MyAdmin() {
        return this.m_supplierAdmin._this();
    }

    public EventType[] obtain_subscription_types(ObtainInfoMode mode) {
        throw new NO_IMPLEMENT();
    }

    public void validate_event_qos(Property[] required_qos, NamedPropertyRangeSeqHolder available_qos) throws UnsupportedQoS {
        throw new NO_IMPLEMENT();
    }

    public void disconnect_structured_pull_consumer() {
        if (this.getLogger().isDebugEnabled()) {
            this.getLogger().debug(this.getId() + "Proxy disconnected and destroyed.");
        }
        this.m_puller.finishWorkAndStopThread();
        this.m_supplierAdmin.reportPullSupplierDisconnection(this.m_pssProxyConsumer.get_pid());
        try {
            this.m_supplierAdmin.getPOA().deactivate_object(this.getPid());
        }
        catch (ObjectNotActive ex) {
        }
        catch (WrongPolicy wrongPolicy) {
            // empty catch block
        }
        this.m_pssProxyConsumer.destroy_object();
    }

    public void offer_change(EventType[] added, EventType[] removed) throws InvalidEventType {
        if (this.getLogger().isDebugEnabled()) {
            this.getLogger().debug(this.getId() + " Event types publish modified.");
        }
        EventTypeInfo[] eventTypes = this.m_pssProxyConsumer.event_types();
        EventType[] validatedAdd = new EventType[]{};
        EventType[] validatedRemove = new EventType[]{};
        EventTypeInfo[] addedEventTypes = new EventTypeInfo[eventTypes.length + validatedAdd.length];
        int i = 0;
        while (i < eventTypes.length) {
            addedEventTypes[i] = eventTypes[i];
            ++i;
        }
        int i2 = eventTypes.length;
        while (i2 < validatedAdd.length) {
            addedEventTypes[i2] = new EventTypeInfo(validatedAdd[i2], 1);
            ++i2;
        }
        EventTypeInfo[] removedEventTypes = new EventTypeInfo[addedEventTypes.length - validatedRemove.length];
        int j = 0;
        int i3 = 0;
        while (i3 < addedEventTypes.length) {
            if (!Util.isMatchingEventType(addedEventTypes[i3].event_type, validatedRemove)) {
                removedEventTypes[j++] = addedEventTypes[i3];
            }
            ++i3;
        }
        this.m_pssProxyConsumer.event_types(removedEventTypes);
    }

    public Property[] get_qos() {
        return this.m_pssProxyConsumer.qos_properties();
    }

    public void set_qos(Property[] qos) throws UnsupportedQoS {
        this.m_pssProxyConsumer.qos_properties(qos);
    }

    public void validate_qos(Property[] required_qos, NamedPropertyRangeSeqHolder available_qos) throws UnsupportedQoS {
        throw new NO_IMPLEMENT();
    }

    public synchronized int add_filter(Filter new_filter) {
        if (new_filter == null) {
            return -1;
        }
        ProxyConsumer proxyConsumer = this.m_pssProxyConsumer;
        synchronized (proxyConsumer) {
            ReferenceInfo[] filters = this.m_pssProxyConsumer.filters();
            if (this.m_queue.addFilter(new_filter)) {
                ReferenceInfo[] new_filters = new ReferenceInfo[filters.length + 1];
                int i = 0;
                while (i < filters.length) {
                    new_filters[i] = filters[i];
                    ++i;
                }
                int filterId = this.m_pssProxyConsumer.next_filter_id();
                new_filters[filters.length] = new ReferenceInfo(filterId, new_filter);
                this.m_pssProxyConsumer.next_filter_id(filterId + 1);
                this.m_pssProxyConsumer.filters(new_filters);
                int n = filterId;
                return n;
            }
            int i = 0;
            while (i < filters.length) {
                if (filters[i].reference.equals(new_filter)) {
                    int n = filters[i].id;
                    return n;
                }
                ++i;
            }
            int n = -1;
            return n;
        }
    }

    public synchronized void remove_filter(int filter) throws FilterNotFound {
        ProxyConsumer proxyConsumer = this.m_pssProxyConsumer;
        synchronized (proxyConsumer) {
            ReferenceInfo[] filters = this.m_pssProxyConsumer.filters();
            ReferenceInfo[] new_filters = new ReferenceInfo[filters.length - 1];
            int i = 0;
            while (i < filters.length) {
                if (filters[i].id == filter) {
                    int j = i;
                    while (j < new_filters.length) {
                        new_filters[j] = filters[j + 1];
                        ++j;
                    }
                    this.m_pssProxyConsumer.filters(new_filters);
                    this.m_queue.removeFilter(FilterHelper.narrow(filters[i].reference));
                    return;
                }
                new_filters[i] = filters[i];
                ++i;
            }
            throw new FilterNotFound();
        }
    }

    public Filter get_filter(int filter) throws FilterNotFound {
        ProxyConsumer proxyConsumer = this.m_pssProxyConsumer;
        synchronized (proxyConsumer) {
            ReferenceInfo[] filters = this.m_pssProxyConsumer.filters();
            int i = 0;
            while (i < filters.length) {
                if (filters[i].id == filter) {
                    Filter filter2 = FilterHelper.narrow(filters[i].reference);
                    return filter2;
                }
                ++i;
            }
            throw new FilterNotFound();
        }
    }

    public int[] get_all_filters() {
        ProxyConsumer proxyConsumer = this.m_pssProxyConsumer;
        synchronized (proxyConsumer) {
            ReferenceInfo[] filters = this.m_pssProxyConsumer.filters();
            int[] result = new int[filters.length];
            int i = 0;
            while (i < result.length) {
                result[i] = filters[i].id;
                ++i;
            }
            int[] nArray = result;
            return nArray;
        }
    }

    public void remove_all_filters() {
        ProxyConsumer proxyConsumer = this.m_pssProxyConsumer;
        synchronized (proxyConsumer) {
            this.m_queue.removeAllFilters();
            this.m_pssProxyConsumer.filters(new ReferenceInfo[0]);
        }
    }

    public byte[] getPid() {
        return this.m_pssProxyConsumer.get_pid();
    }

    public String getId() {
        return this.m_supplierAdmin.getId() + "[" + this.m_pssProxyConsumer.id() + "]";
    }

    public void reportClientDisconnection() {
        if (this.getLogger().isDebugEnabled()) {
            this.getLogger().debug("Client sent an unexpected Disconnected exception.");
        }
        this.disconnect_structured_pull_consumer();
    }

    private org.apache.avalon.framework.logger.Logger getLogger() {
        return this.m_logger.getSProxyLogger();
    }
}

