package com.microsoft.azure.iot.service.transport.amqps;

import com.microsoft.azure.iot.service.sdk.FeedbackBatch;
import com.microsoft.azure.iot.service.sdk.FeedbackBatchMessage;
import com.microsoft.azure.iot.service.sdk.IotHubServiceClientProtocol;
import java.io.IOException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.reactor.Reactor;

/* loaded from: input_file:WEB-INF/lib/iothub-java-service-client-1.0.11.jar:com/microsoft/azure/iot/service/transport/amqps/AmqpReceive.class */
public class AmqpReceive extends BaseHandler implements AmqpFeedbackReceivedEvent {
    private final String hostName;
    private final String userName;
    private final String sasToken;
    private AmqpFeedbackReceivedHandler amqpReceiveHandler;
    private IotHubServiceClientProtocol iotHubServiceClientProtocol;
    private Reactor reactor = null;
    private Semaphore semaphore = new Semaphore(0);
    private FeedbackBatch feedbackBatch;

    public AmqpReceive(String str, String str2, String str3, IotHubServiceClientProtocol iotHubServiceClientProtocol) {
        this.hostName = str;
        this.userName = str2;
        this.sasToken = str3;
        this.iotHubServiceClientProtocol = iotHubServiceClientProtocol;
    }

    @Override // org.apache.qpid.proton.engine.BaseHandler, org.apache.qpid.proton.engine.CoreHandler
    public void onReactorInit(Event event) {
        event.getReactor().connection(this.amqpReceiveHandler);
    }

    public void open() {
        this.amqpReceiveHandler = new AmqpFeedbackReceivedHandler(this.hostName, this.userName, this.sasToken, this.iotHubServiceClientProtocol, this);
    }

    public void close() {
        this.amqpReceiveHandler = null;
    }

    public synchronized FeedbackBatch receive(long j) throws IOException, InterruptedException {
        this.feedbackBatch = null;
        if (this.amqpReceiveHandler == null) {
            throw new IOException("receive handler is not initialized. call open before receive");
        }
        this.reactor = Proton.reactor(this);
        this.reactor.run();
        this.reactor.free();
        if (j == 0) {
            this.semaphore.acquire();
        } else {
            this.semaphore.tryAcquire(j, TimeUnit.MILLISECONDS);
        }
        return this.feedbackBatch;
    }

    @Override // com.microsoft.azure.iot.service.transport.amqps.AmqpFeedbackReceivedEvent
    public void onFeedbackReceived(String str) {
        this.feedbackBatch = FeedbackBatchMessage.parse(str);
        this.semaphore.release();
    }
}
