package com.vmware.vapi.internal.bindings;

import com.vmware.vapi.bindings.client.InvocationConfig;
import com.vmware.vapi.bindings.type.StructType;
import com.vmware.vapi.bindings.type.Type;
import com.vmware.vapi.core.AsyncHandle;
import com.vmware.vapi.core.Consumer;
import com.vmware.vapi.core.ExecutionContext;
import com.vmware.vapi.core.MethodIdentifier;
import com.vmware.vapi.core.MethodResult;
import com.vmware.vapi.data.DataValue;
import com.vmware.vapi.data.StructValue;
import com.vmware.vapi.diagnostics.LogDiagnosticUtil;
import com.vmware.vapi.diagnostics.Slf4jMDCLogConfigurator;
import com.vmware.vapi.internal.core.abort.AbortHandle;
import com.vmware.vapi.internal.core.abort.AbortHandleImpl;
import com.vmware.vapi.internal.core.abort.AbortableAsyncHandle;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/vmware/vapi/internal/bindings/StreamPublisher.class */
public class StreamPublisher<T> implements Publisher<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) StreamPublisher.class);
    private volatile Subscriber<? super T> subscriber;
    private volatile Consumer<AsyncHandle<MethodResult>> nextChunkConsumer;
    private final Stub stub;
    private final String serviceId;
    private final String operationId;
    private final ExecutionContext execCtx;
    private final Type outputType;
    private final Collection<Type> errorTypes;
    private StructValue inputValue;
    private final AbortHandle abortHandle = new AbortHandleImpl();
    private final AtomicBoolean subscribedTo = new AtomicBoolean();
    final AtomicBoolean finished = new AtomicBoolean();
    final AtomicLong demandCount = new AtomicLong();

    /* loaded from: input_file:com/vmware/vapi/internal/bindings/StreamPublisher$AsyncSubscription.class */
    private class AsyncSubscription implements Subscription {
        private AtomicBoolean isStreamingInitiated;

        private AsyncSubscription() {
            this.isStreamingInitiated = new AtomicBoolean();
        }

        @Override // org.reactivestreams.Subscription
        public void request(long j) {
            StreamPublisher.LOGGER.debug("Requested demand - {} .", Long.valueOf(j));
            if (StreamPublisher.this.finished.get()) {
                StreamPublisher.LOGGER.debug("Streaming has finished, no more requests are processed.");
                return;
            }
            if (j <= 0) {
                StreamPublisher.LOGGER.debug("Invalid request count received - {}.", Long.valueOf(j));
                StreamPublisher.this.abort(new IllegalArgumentException("non-positive subscription request signals are illegal"));
            } else if (guardedAddAndGet(j) == j) {
                initiateStreaming();
            }
        }

        private void initiateStreaming() {
            StreamPublisher.LOGGER.debug("Stream initiation requested.");
            if (this.isStreamingInitiated.compareAndSet(false, true)) {
                StreamPublisher.LOGGER.trace("Publisher is initiating communication.");
                StreamPublisher.this.invoke(StreamPublisher.this.createHandle(StreamPublisher.this.subscriber), StreamPublisher.this.execCtx, 0);
                return;
            }
            Consumer consumer = StreamPublisher.this.nextChunkConsumer;
            if (consumer != null) {
                StreamPublisher.LOGGER.trace(String.format("Publisher continues suspended communication via consumer - %h.", consumer));
                consumer.accept(StreamPublisher.this.createHandle(StreamPublisher.this.subscriber));
            }
        }

        private long guardedAddAndGet(long j) {
            long j2;
            long j3;
            do {
                j2 = StreamPublisher.this.demandCount.get();
                j3 = j2 + j;
                if (j3 < 0) {
                    j3 = Long.MAX_VALUE;
                }
            } while (!StreamPublisher.this.demandCount.compareAndSet(j2, j3));
            return j3;
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
            StreamPublisher.this.abort(null);
        }
    }

    public StreamPublisher(Stub stub, MethodIdentifier methodIdentifier, StructValue structValue, StructType structType, Type type, Collection<Type> collection, InvocationConfig invocationConfig) {
        this.stub = stub;
        this.serviceId = methodIdentifier.getInterfaceIdentifier().getName();
        this.operationId = methodIdentifier.getName();
        this.inputValue = structValue;
        this.execCtx = stub.getExecutionContext(invocationConfig);
        this.outputType = type;
        this.errorTypes = collection;
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super T> subscriber) {
        Objects.requireNonNull(subscriber);
        if (!this.subscribedTo.compareAndSet(false, true)) {
            rejectSubscription(subscriber);
        } else {
            this.subscriber = subscriber;
            subscriber.onSubscribe(new AsyncSubscription());
        }
    }

    private void rejectSubscription(Subscriber<? super T> subscriber) {
        subscriber.onSubscribe(new Subscription() { // from class: com.vmware.vapi.internal.bindings.StreamPublisher.1
            @Override // org.reactivestreams.Subscription
            public void request(long j) {
            }

            @Override // org.reactivestreams.Subscription
            public void cancel() {
            }
        });
        subscriber.onError(new IllegalStateException("This instance has already been subscribed to"));
    }

    public ResultTranslatingHandle<T> createHandle(final Subscriber subscriber) {
        return new ResultTranslatingHandle<T>(this.stub, this.outputType, this.errorTypes) { // from class: com.vmware.vapi.internal.bindings.StreamPublisher.2
            @Override // com.vmware.vapi.core.AsyncHandle
            public void updateProgress(DataValue dataValue) {
            }

            @Override // com.vmware.vapi.internal.bindings.ResultTranslatingHandle
            protected void postProcessResponse(MethodResult methodResult) {
            }

            /* JADX INFO: Access modifiers changed from: package-private */
            @Override // com.vmware.vapi.internal.bindings.ResultTranslatingHandle
            public void onSuccess(T t, Consumer<AsyncHandle<MethodResult>> consumer) {
                long decrementAndGet;
                if (StreamPublisher.this.finished.get()) {
                    StreamPublisher.LOGGER.debug("Finished safeguard triggered.");
                    return;
                }
                StreamPublisher.this.nextChunkConsumer = consumer;
                if (t != null) {
                    try {
                        StreamPublisher.LOGGER.trace("Executing subscriber#onNext with result - {}", t);
                        subscriber.onNext(t);
                    } finally {
                        StreamPublisher.this.demandCount.decrementAndGet();
                    }
                }
                if (consumer == null) {
                    StreamPublisher.this.finished.set(true);
                    StreamPublisher.LOGGER.trace("Streaming complete - next handle not received.");
                    subscriber.onComplete();
                } else {
                    StreamPublisher.LOGGER.trace("Demand is {} .", Long.valueOf(decrementAndGet));
                    if (decrementAndGet > 0) {
                        StreamPublisher.LOGGER.trace("Resume requesting.");
                        consumer.accept(StreamPublisher.this.createHandle(subscriber));
                    }
                }
            }

            /* JADX INFO: Access modifiers changed from: package-private */
            @Override // com.vmware.vapi.internal.bindings.ResultTranslatingHandle
            public void onFailure(RuntimeException runtimeException) {
                StreamPublisher.this.abort(runtimeException);
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void invoke(final ResultTranslatingHandle<T> resultTranslatingHandle, ExecutionContext executionContext, final int i) {
        ResultTranslatingHandle<T> resultTranslatingHandle2 = resultTranslatingHandle;
        if (this.stub.isRetryingConfigured()) {
            LOGGER.trace("Creating retrying handle.");
            resultTranslatingHandle2 = new RetryingHandle<T>(this.stub, this.outputType, this.errorTypes, resultTranslatingHandle, this.serviceId, this.operationId, executionContext, this.inputValue, i) { // from class: com.vmware.vapi.internal.bindings.StreamPublisher.3
                /* JADX INFO: Access modifiers changed from: package-private */
                @Override // com.vmware.vapi.internal.bindings.RetryingHandle, com.vmware.vapi.internal.bindings.ResultTranslatingHandle
                public void onFailure(RuntimeException runtimeException) {
                    StreamPublisher.LOGGER.debug("Error during streaming occurred.", (Throwable) runtimeException);
                    if (StreamPublisher.this.finished.get()) {
                        StreamPublisher.LOGGER.debug("Streaming has finished prior receiving the error.", (Throwable) runtimeException);
                    } else {
                        super.onFailure(runtimeException);
                    }
                }

                @Override // com.vmware.vapi.internal.bindings.RetryingHandle
                void onRetry(ExecutionContext executionContext2) {
                    StreamPublisher.LOGGER.debug("Retrying invocation of the streaming request {} {}.", StreamPublisher.this.serviceId, StreamPublisher.this.operationId);
                    StreamPublisher.this.invoke(resultTranslatingHandle, executionContext2, i + 1);
                }
            };
        }
        AbortableAsyncHandle abortableAsyncHandle = new AbortableAsyncHandle(resultTranslatingHandle2, this.abortHandle);
        Slf4jMDCLogConfigurator slf4jMDCLogConfigurator = new Slf4jMDCLogConfigurator();
        try {
            slf4jMDCLogConfigurator.configureContext(LogDiagnosticUtil.getDiagnosticContext(executionContext));
            LOGGER.trace("Starting streaming invocation request {} {}.", this.serviceId, this.operationId);
            this.stub.apiProvider.invoke(this.serviceId, this.operationId, this.inputValue, executionContext, abortableAsyncHandle);
            this.inputValue = null;
            slf4jMDCLogConfigurator.cleanUpContext(LogDiagnosticUtil.getDiagnosticKeys());
        } catch (Throwable th) {
            slf4jMDCLogConfigurator.cleanUpContext(LogDiagnosticUtil.getDiagnosticKeys());
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void abort(Throwable th) {
        LOGGER.debug("Stream processing failed.", th);
        if (this.finished.compareAndSet(false, true)) {
            this.abortHandle.abort();
            if (th != null) {
                try {
                    this.subscriber.onError(th);
                } catch (RuntimeException e) {
                    LOGGER.warn(String.format("Exception while invoking %s.onError for %s.%s", this.subscriber, this.serviceId, this.operationId), (Throwable) e);
                }
            }
            this.subscriber = null;
        }
    }
}
