package org.rzo.netty.ahessian.io;

import java.io.InputStream;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.rzo.netty.ahessian.Constants;
import org.rzo.netty.ahessian.stopable.StopableHandler;

/* loaded from: input_file:org/atricore/josso/tooling/wrapper/all/lib/core/yajsw/ahessian.jar:org/rzo/netty/ahessian/io/PullInputStreamConsumer.class */
public class PullInputStreamConsumer extends SimpleChannelUpstreamHandler implements StopableHandler {
    final InputStreamConsumer _consumer;
    final Executor _executor;
    volatile ChannelHandlerContext _ctx;
    volatile InputStream _inputStream;
    static AtomicInteger _threadCounter = new AtomicInteger(0);
    final Lock _lock = new ReentrantLock();
    final Condition _hasData = this._lock.newCondition();
    volatile boolean _stop = false;
    volatile boolean _waiting = false;
    private boolean _stopEnabled = true;

    public PullInputStreamConsumer(InputStreamConsumer inputStreamConsumer, Executor executor) {
        this._consumer = inputStreamConsumer;
        this._executor = executor;
        this._executor.execute(new Runnable() { // from class: org.rzo.netty.ahessian.io.PullInputStreamConsumer.1
            @Override // java.lang.Runnable
            public void run() {
                String name = Thread.currentThread().getName();
                Thread.currentThread().setName("ahessian-PullInputStreamConsumer-#" + PullInputStreamConsumer._threadCounter.incrementAndGet());
                try {
                    PullInputStreamConsumer.this.waitForData();
                    while (!PullInputStreamConsumer.this._stop) {
                        PullInputStreamConsumer.this._consumer.consume(PullInputStreamConsumer.this._ctx, PullInputStreamConsumer.this._inputStream);
                        PullInputStreamConsumer.this.waitForData();
                    }
                    Thread.currentThread().setName(name);
                    PullInputStreamConsumer._threadCounter.decrementAndGet();
                } catch (Throwable th) {
                    Thread.currentThread().setName(name);
                    PullInputStreamConsumer._threadCounter.decrementAndGet();
                    throw th;
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void waitForData() {
        while (!this._stop) {
            if (this._consumer != null && !this._consumer.isBufferEmpty() && this._ctx != null && this._ctx.getChannel().isConnected()) {
                return;
            }
            this._lock.lock();
            try {
                try {
                    this._waiting = true;
                    this._hasData.await(500L, TimeUnit.MILLISECONDS);
                    this._waiting = false;
                    this._lock.unlock();
                } catch (InterruptedException e) {
                    Constants.ahessianLogger.warn("", e);
                    this._waiting = false;
                    this._lock.unlock();
                }
            } catch (Throwable th) {
                this._waiting = false;
                this._lock.unlock();
                throw th;
            }
        }
    }

    @Override // org.jboss.netty.channel.SimpleChannelUpstreamHandler
    public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception {
        if (this._ctx != channelHandlerContext) {
            this._ctx = channelHandlerContext;
        }
        if (this._inputStream != messageEvent.getMessage()) {
            this._inputStream = (InputStream) messageEvent.getMessage();
            ((InputStreamBuffer) this._inputStream).setReadTimeout(-1L);
        }
        if (this._waiting) {
            this._lock.lock();
            this._hasData.signal();
            this._lock.unlock();
        }
    }

    @Override // org.jboss.netty.channel.SimpleChannelUpstreamHandler
    public void channelConnected(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
        this._lock.lock();
        this._consumer.setContext(channelHandlerContext);
        this._ctx = channelHandlerContext;
        this._lock.unlock();
        channelHandlerContext.sendUpstream(channelStateEvent);
    }

    @Override // org.rzo.netty.ahessian.stopable.StopableHandler
    public boolean isStopEnabled() {
        return this._stopEnabled;
    }

    @Override // org.rzo.netty.ahessian.stopable.StopableHandler
    public void setStopEnabled(boolean z) {
        this._stopEnabled = z;
    }

    @Override // org.rzo.netty.ahessian.stopable.StopableHandler
    public void stop() {
        this._stop = true;
    }
}
