package com.opendxl.streaming.client;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonIOException;
import com.google.gson.JsonSyntaxException;
import com.opendxl.streaming.client.entity.ConsumerRecords;
import com.opendxl.streaming.client.entity.ProducerRecords;
import com.opendxl.streaming.client.entity.SubscribePayload;
import com.opendxl.streaming.client.entity.Topics;
import com.opendxl.streaming.client.exception.ClientError;
import com.opendxl.streaming.client.exception.ConsumerError;
import com.opendxl.streaming.client.exception.ErrorType;
import com.opendxl.streaming.client.exception.PermanentError;
import com.opendxl.streaming.client.exception.StopError;
import com.opendxl.streaming.client.exception.TemporaryError;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/opendxl/streaming/client/Channel.class */
public class Channel implements Consumer, Producer, AutoCloseable {
    private static final String ENABLE_AUTO_COMMIT_CONFIG_SETTING = "enable.auto.commit";
    private static final String DEFAULT_CONSUMER_PATH_PREFIX = "/databus/consumer-service/v1";
    private static final String DEFAULT_PRODUCER_PATH_PREFIX = "/databus/cloudproxy/v1";
    private static final int STOP_CHANNEL_WAIT_PERIOD_MS = 1000;
    private final String base;
    private final String producerPathPrefix;
    private final String consumerPathPrefix;
    private final String consumerGroup;
    private final Properties configs;
    private final String verifyCertBundle;
    private String consumerId;
    private List<String> subscriptions;
    private final ChannelAuth auth;
    private Request request;
    private final boolean retryOnFail;
    private boolean active;
    private final AtomicBoolean running;
    private final AtomicBoolean stopRequested;
    private final AtomicBoolean destroying;
    private final boolean isAutoCommitEnabled;
    private static final long NO_CURRENT_THREAD = -1;
    private final AtomicLong currentThread;
    private final AtomicInteger refcount;
    private final boolean isHttps;
    private final HttpProxySettings httpProxySettings;
    private boolean isMultiTenant;
    private Map<String, String> requestHeaders;
    final Gson gson;
    private static Logger logger = LogManager.getLogger(Channel.class);
    private static final Map<Integer, ErrorType> CREATE_ERROR_MAP = new HashMap() { // from class: com.opendxl.streaming.client.Channel.1
        {
            put(HttpStatusCodes.BAD_REQUEST, ErrorType.PERMANENT_ERROR);
            put(HttpStatusCodes.UNAUTHORIZED, ErrorType.TEMPORARY_ERROR);
            put(HttpStatusCodes.FORBIDDEN, ErrorType.TEMPORARY_ERROR);
            put(HttpStatusCodes.NOT_FOUND, ErrorType.PERMANENT_ERROR);
            put(HttpStatusCodes.INTERNAL_SERVER_ERROR, ErrorType.TEMPORARY_ERROR);
        }
    };
    private static final Map<Integer, ErrorType> DELETE_ERROR_MAP = new HashMap() { // from class: com.opendxl.streaming.client.Channel.2
        {
            put(HttpStatusCodes.BAD_REQUEST, ErrorType.PERMANENT_ERROR);
            put(HttpStatusCodes.UNAUTHORIZED, ErrorType.TEMPORARY_ERROR);
            put(HttpStatusCodes.FORBIDDEN, ErrorType.TEMPORARY_ERROR);
            put(HttpStatusCodes.NOT_FOUND, ErrorType.CONSUMER_ERROR);
            put(HttpStatusCodes.CONFLICT, ErrorType.TEMPORARY_ERROR);
            put(HttpStatusCodes.INTERNAL_SERVER_ERROR, ErrorType.TEMPORARY_ERROR);
        }
    };
    private static final Map<Integer, ErrorType> SUBSCRIBE_ERROR_MAP = new HashMap() { // from class: com.opendxl.streaming.client.Channel.3
        {
            put(HttpStatusCodes.BAD_REQUEST, ErrorType.PERMANENT_ERROR);
            put(HttpStatusCodes.UNAUTHORIZED, ErrorType.TEMPORARY_ERROR);
            put(HttpStatusCodes.FORBIDDEN, ErrorType.TEMPORARY_ERROR);
            put(HttpStatusCodes.NOT_FOUND, ErrorType.CONSUMER_ERROR);
            put(HttpStatusCodes.CONFLICT, ErrorType.TEMPORARY_ERROR);
            put(HttpStatusCodes.INTERNAL_SERVER_ERROR, ErrorType.TEMPORARY_ERROR);
        }
    };
    private static final Map<Integer, ErrorType> GET_SUBSCRIPTIONS_ERROR_MAP = new HashMap() { // from class: com.opendxl.streaming.client.Channel.4
        {
            put(HttpStatusCodes.BAD_REQUEST, ErrorType.PERMANENT_ERROR);
            put(HttpStatusCodes.UNAUTHORIZED, ErrorType.TEMPORARY_ERROR);
            put(HttpStatusCodes.FORBIDDEN, ErrorType.TEMPORARY_ERROR);
            put(HttpStatusCodes.NOT_FOUND, ErrorType.CONSUMER_ERROR);
            put(HttpStatusCodes.CONFLICT, ErrorType.TEMPORARY_ERROR);
            put(HttpStatusCodes.INTERNAL_SERVER_ERROR, ErrorType.TEMPORARY_ERROR);
        }
    };
    private static final Map<Integer, ErrorType> CONSUME_RECORDS_ERROR_MAP = new HashMap() { // from class: com.opendxl.streaming.client.Channel.5
        {
            put(HttpStatusCodes.BAD_REQUEST, ErrorType.PERMANENT_ERROR);
            put(HttpStatusCodes.UNAUTHORIZED, ErrorType.TEMPORARY_ERROR);
            put(HttpStatusCodes.FORBIDDEN, ErrorType.TEMPORARY_ERROR);
            put(HttpStatusCodes.NOT_FOUND, ErrorType.CONSUMER_ERROR);
            put(HttpStatusCodes.CONFLICT, ErrorType.TEMPORARY_ERROR);
            put(HttpStatusCodes.INTERNAL_SERVER_ERROR, ErrorType.TEMPORARY_ERROR);
        }
    };
    private static final Map<Integer, ErrorType> COMMIT_ALL_RECORDS_ERROR_MAP = new HashMap() { // from class: com.opendxl.streaming.client.Channel.6
        {
            put(HttpStatusCodes.BAD_REQUEST, ErrorType.PERMANENT_ERROR);
            put(HttpStatusCodes.UNAUTHORIZED, ErrorType.TEMPORARY_ERROR);
            put(HttpStatusCodes.FORBIDDEN, ErrorType.TEMPORARY_ERROR);
            put(HttpStatusCodes.NOT_FOUND, ErrorType.CONSUMER_ERROR);
            put(HttpStatusCodes.CONFLICT, ErrorType.TEMPORARY_ERROR);
            put(HttpStatusCodes.INTERNAL_SERVER_ERROR, ErrorType.TEMPORARY_ERROR);
        }
    };
    private static final Map<Integer, ErrorType> PRODUCE_RECORDS_ERROR_MAP = new HashMap() { // from class: com.opendxl.streaming.client.Channel.7
        {
            put(HttpStatusCodes.BAD_REQUEST, ErrorType.PERMANENT_ERROR);
            put(HttpStatusCodes.UNAUTHORIZED, ErrorType.TEMPORARY_ERROR);
            put(HttpStatusCodes.FORBIDDEN, ErrorType.TEMPORARY_ERROR);
            put(HttpStatusCodes.NOT_FOUND, ErrorType.PERMANENT_ERROR);
            put(HttpStatusCodes.CONFLICT, ErrorType.TEMPORARY_ERROR);
            put(HttpStatusCodes.INTERNAL_SERVER_ERROR, ErrorType.TEMPORARY_ERROR);
        }
    };

    public Channel(String str, ChannelAuth channelAuth, String str2, String str3, String str4, boolean z, String str5, Properties properties, HttpProxySettings httpProxySettings) throws TemporaryError {
        this(str, channelAuth, str2, str3, str4, null, z, str5, properties, httpProxySettings);
    }

    public Channel(String str, ChannelAuth channelAuth, String str2, String str3, String str4, String str5, boolean z, String str6, Properties properties, HttpProxySettings httpProxySettings) throws TemporaryError {
        this(str, channelAuth, str2, str3, str4, str5, z, str6, properties, httpProxySettings, true, Collections.EMPTY_MAP);
    }

    public Channel(String str, ChannelAuth channelAuth, String str2, String str3, String str4, String str5, boolean z, String str6, Properties properties, HttpProxySettings httpProxySettings, boolean z2, Map<String, String> map) throws TemporaryError {
        this.configs = new Properties();
        this.currentThread = new AtomicLong(NO_CURRENT_THREAD);
        this.refcount = new AtomicInteger(0);
        this.gson = new GsonBuilder().disableHtmlEscaping().create();
        this.base = str;
        this.auth = channelAuth;
        if (str3 != null) {
            this.consumerPathPrefix = str3;
            this.producerPathPrefix = str3;
        } else {
            this.consumerPathPrefix = str4 != null ? str4 : DEFAULT_CONSUMER_PATH_PREFIX;
            this.producerPathPrefix = str5 != null ? str5 : DEFAULT_PRODUCER_PATH_PREFIX;
        }
        this.consumerGroup = str2;
        this.verifyCertBundle = str6;
        if (properties != null) {
            this.configs.putAll(properties);
        }
        if (!this.configs.containsKey(ENABLE_AUTO_COMMIT_CONFIG_SETTING)) {
            this.configs.put(ENABLE_AUTO_COMMIT_CONFIG_SETTING, "false");
        }
        this.isAutoCommitEnabled = Boolean.valueOf(this.configs.get(ENABLE_AUTO_COMMIT_CONFIG_SETTING).toString()).booleanValue();
        this.consumerId = null;
        this.subscriptions = new ArrayList();
        this.isHttps = str.toLowerCase().startsWith("https");
        this.httpProxySettings = httpProxySettings;
        this.request = new Request(str, channelAuth, this.verifyCertBundle, this.isHttps, this.httpProxySettings, map);
        this.retryOnFail = z;
        this.destroying = new AtomicBoolean(false);
        this.active = true;
        this.running = new AtomicBoolean(false);
        this.stopRequested = new AtomicBoolean(false);
        this.isMultiTenant = z2;
        this.requestHeaders = map;
    }

    public void setMultiTenant(boolean z) {
        this.isMultiTenant = z;
    }

    private void reset() {
        this.consumerId = null;
        this.subscriptions.clear();
        this.request.resetCookies();
        this.request.resetAuthorization();
    }

    @Override // com.opendxl.streaming.client.Consumer
    public void create() throws PermanentError, TemporaryError {
        acquireAndEnsureChannelIsActive();
        try {
            if (this.consumerGroup == null || this.consumerGroup.isEmpty()) {
                logger.error("No value specified for 'consumerGroup' during channel init");
                throw new PermanentError("No value specified for 'consumerGroup' during channel init");
            }
            reset();
            ConsumerConfig consumerConfig = new ConsumerConfig(this.consumerGroup, this.configs);
            Gson gson = new Gson();
            byte[] bytes = gson.toJson(consumerConfig).getBytes();
            try {
                StringBuilder append = new StringBuilder(this.consumerPathPrefix).append("/consumers");
                if (!this.isMultiTenant) {
                    append.append("?multi_tenant=false");
                }
                String post = this.request.post(append.toString(), bytes, CREATE_ERROR_MAP);
                if (post != null) {
                    this.consumerId = ((ConsumerId) gson.fromJson(post, ConsumerId.class)).getConsumerInstanceId();
                    if (logger.isDebugEnabled()) {
                        logger.debug("Created " + logConsumerId());
                    }
                }
            } catch (JsonSyntaxException | ConsumerError e) {
                TemporaryError temporaryError = new TemporaryError("Error while parsing response: " + e.getClass().getCanonicalName() + " " + e.getMessage(), (Throwable) e, "create");
                logger.error("Failed to create consumer.", temporaryError);
                throw temporaryError;
            } catch (ClientError e2) {
                e2.setApi("create");
                logger.error("Failed to create consumer.", e2);
                throw e2;
            }
        } finally {
            release();
        }
    }

    @Override // com.opendxl.streaming.client.Consumer
    public void subscribe(List<String> list) throws ConsumerError, PermanentError, TemporaryError {
        subscribe(list, Collections.emptyMap(), false);
    }

    public void subscribe(List<String> list, Map<String, Object> map, boolean z) throws ConsumerError, PermanentError, TemporaryError {
        acquireAndEnsureChannelIsActive();
        try {
            if (list == null) {
                logger.error("Non-empty value must be specified for topics.");
                throw new PermanentError("Non-empty value must be specified for topics.");
            }
            if (list.isEmpty()) {
                logger.error("Non-empty value must be specified for topics");
                throw new PermanentError("Non-empty value must be specified for topics");
            }
            if (this.consumerId == null || this.consumerId.isEmpty()) {
                create();
            }
            Gson gson = new Gson();
            byte[] bytes = (null == map || map.isEmpty()) ? gson.toJson(new Topics(list)).getBytes() : gson.toJson(new SubscribePayload(list, map, z)).getBytes();
            StringBuilder append = new StringBuilder(this.consumerPathPrefix).append("/consumers/").append(this.consumerId).append("/subscription");
            if (!this.isMultiTenant) {
                append.append("?multi_tenant=false");
            }
            String str = logConsumerId() + " to " + list + " topics.";
            try {
                this.request.post(append.toString(), bytes, SUBSCRIBE_ERROR_MAP);
                if (logger.isDebugEnabled()) {
                    logger.debug("Subscribed " + str);
                }
                this.subscriptions.clear();
                this.subscriptions.addAll(list);
            } catch (ClientError e) {
                e.setApi("subscribe");
                logger.error("Failed to subscribe " + str, e);
                throw e;
            }
        } finally {
            release();
        }
    }

    @Override // com.opendxl.streaming.client.Consumer
    public List<String> subscriptions() throws ConsumerError, PermanentError, TemporaryError {
        acquireAndEnsureChannelIsActive();
        try {
            Gson gson = new Gson();
            StringBuilder append = new StringBuilder(this.consumerPathPrefix).append("/consumers/").append(this.consumerId).append("/subscription");
            if (!this.isMultiTenant) {
                append.append("?multi_tenant=false");
            }
            ArrayList arrayList = new ArrayList();
            try {
                try {
                    String str = this.request.get(append.toString(), GET_SUBSCRIPTIONS_ERROR_MAP);
                    if (str != null) {
                        arrayList.addAll((Collection) gson.fromJson(str, List.class));
                    }
                    if (logger.isDebugEnabled()) {
                        logger.debug(logConsumerId() + " is subscribed to " + arrayList);
                    }
                    return arrayList;
                } catch (ClientError e) {
                    e.setApi("subscriptions");
                    logger.error("Failed to get subscriptions of " + logConsumerId(), e);
                    throw e;
                }
            } catch (JsonSyntaxException e2) {
                TemporaryError temporaryError = new TemporaryError("Error while parsing response: " + e2.getClass().getCanonicalName() + " " + e2.getMessage(), (Throwable) e2, "subscriptions");
                logger.error("Failed to get subscriptions of " + logConsumerId(), e2);
                throw temporaryError;
            }
        } finally {
            release();
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // com.opendxl.streaming.client.Consumer
    public void delete() throws TemporaryError, PermanentError {
        acquireAndEnsureChannelIsActive();
        try {
            if (this.consumerId == null) {
                logger.warn("Ignoring call to delete because consumerId is empty.");
                release();
                return;
            }
            try {
                try {
                    this.request.delete(this.consumerPathPrefix + "/consumers/" + this.consumerId, DELETE_ERROR_MAP);
                    if (logger.isDebugEnabled()) {
                        logger.debug(logConsumerId() + " was deleted.");
                    }
                    reset();
                } catch (Throwable th) {
                    reset();
                    throw th;
                }
            } catch (ConsumerError e) {
                logger.error(logConsumerId() + " not found. Deleting consumer anyways.", e);
                reset();
            } catch (ClientError e2) {
                e2.setApi("delete");
                logger.error("Failed to delete " + logConsumerId(), e2);
                throw e2;
            }
        } finally {
            release();
        }
    }

    @Override // com.opendxl.streaming.client.Consumer
    public ConsumerRecords consume() throws ConsumerError, PermanentError, TemporaryError {
        return consume(0);
    }

    @Override // com.opendxl.streaming.client.Consumer
    public ConsumerRecords consume(int i) throws ConsumerError, PermanentError, TemporaryError {
        return consume(i, false);
    }

    public ConsumerRecords consume(int i, boolean z) throws ConsumerError, PermanentError, TemporaryError {
        acquireAndEnsureChannelIsActive();
        try {
            if (this.subscriptions.isEmpty()) {
                logger.error("Channel is not subscribed to any topic");
                throw new PermanentError("Channel is not subscribed to any topic");
            }
            StringBuilder append = new StringBuilder(this.consumerPathPrefix).append("/consumers/").append(this.consumerId).append("/records");
            StringBuilder sb = new StringBuilder();
            if (!this.isMultiTenant) {
                sb.append("multi_tenant=false");
            }
            if (i > 0) {
                if (sb.length() > 0) {
                    sb.append("&");
                }
                sb.append("timeout=");
                sb.append(i);
            }
            if (z) {
                if (sb.length() > 0) {
                    sb.append("&");
                }
                sb.append("filter=true");
            }
            if (sb.length() > 0) {
                append.append("?").append((CharSequence) sb);
            }
            String sb2 = append.toString();
            if (sb.length() > 0) {
                append.append("?").append((CharSequence) sb);
            }
            try {
                ConsumerRecords consumerRecords = (ConsumerRecords) new Gson().fromJson(this.request.get(sb2, CONSUME_RECORDS_ERROR_MAP), ConsumerRecords.class);
                if (logger.isDebugEnabled()) {
                    logger.debug(logConsumerId() + " consumed " + ((consumerRecords == null || consumerRecords.getRecords() == null) ? 0 : consumerRecords.getRecords().size()) + " records.");
                }
                return consumerRecords;
            } catch (JsonSyntaxException e) {
                TemporaryError temporaryError = new TemporaryError("Error while parsing response: " + e.getClass().getCanonicalName() + " " + e.getMessage(), (Throwable) e, "consume");
                logger.error("Failed to consume with " + logConsumerId(), e);
                throw temporaryError;
            } catch (ClientError e2) {
                e2.setApi("consume");
                logger.error("Failed to consume with " + logConsumerId(), e2);
                throw e2;
            }
        } finally {
            release();
        }
    }

    @Override // com.opendxl.streaming.client.Consumer
    public void commit() throws ConsumerError, TemporaryError, PermanentError {
        acquireAndEnsureChannelIsActive();
        try {
            if (this.isAutoCommitEnabled) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Not sending commit request because enable.auto.commit is set to true.");
                }
                return;
            }
            try {
                this.request.post(this.consumerPathPrefix + "/consumers/" + this.consumerId + "/offsets", null, COMMIT_ALL_RECORDS_ERROR_MAP);
                if (logger.isDebugEnabled()) {
                    logger.debug(logConsumerId() + " committed records.");
                }
            } catch (ClientError e) {
                e.setApi("commit");
                logger.error("Failed to commit with " + logConsumerId(), e);
                throw e;
            }
        } finally {
            release();
        }
    }

    @Override // com.opendxl.streaming.client.Consumer
    public void run(ConsumerRecordProcessor consumerRecordProcessor, List<String> list) throws PermanentError, TemporaryError {
        run(consumerRecordProcessor, list, 0);
    }

    @Override // com.opendxl.streaming.client.Consumer
    public void run(ConsumerRecordProcessor consumerRecordProcessor, List<String> list, int i) throws PermanentError, TemporaryError {
        run(consumerRecordProcessor, list, Collections.emptyMap(), false, i);
    }

    public void run(ConsumerRecordProcessor consumerRecordProcessor, List<String> list, Map<String, Object> map, boolean z, int i) throws PermanentError, TemporaryError {
        acquireAndEnsureChannelIsActive();
        try {
            if (this.consumerGroup == null || this.consumerGroup.isEmpty()) {
                logger.error("No value specified for 'consumerGroup' during channel init");
                throw new PermanentError("No value specified for 'consumerGroup' during channel init");
            }
            if (consumerRecordProcessor == null) {
                logger.error("processCallback not provided");
                throw new PermanentError("processCallback not provided");
            }
            List<String> list2 = (list == null || list.isEmpty()) ? this.subscriptions : list;
            if (this.running.compareAndSet(false, true)) {
                logger.info("Channel is running");
                while (!this.stopRequested.get()) {
                    consumeLoop(consumerRecordProcessor, list2, map, z, i);
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("Exiting run method.");
                }
            }
        } finally {
            this.running.set(false);
            release();
        }
    }

    @Override // com.opendxl.streaming.client.Consumer
    public void run(ConsumerRecordProcessor consumerRecordProcessor, String str) throws PermanentError, TemporaryError {
        run(consumerRecordProcessor, str, 0);
    }

    @Override // com.opendxl.streaming.client.Consumer
    public void run(ConsumerRecordProcessor consumerRecordProcessor, String str, int i) throws PermanentError, TemporaryError {
        List<String> asList;
        acquireAndEnsureChannelIsActive();
        if (str != null) {
            try {
                if (!str.isEmpty()) {
                    asList = Arrays.asList(str);
                    run(consumerRecordProcessor, asList, i);
                    release();
                }
            } catch (Throwable th) {
                release();
                throw th;
            }
        }
        asList = null;
        run(consumerRecordProcessor, asList, i);
        release();
    }

    @Override // com.opendxl.streaming.client.Consumer
    public void stop() throws StopError {
        if (this.running.get()) {
            if (!this.stopRequested.compareAndSet(false, true)) {
                logger.warn("Ignoring call to stop because Channel is already being stopped.");
                return;
            }
            while (this.running.get()) {
                try {
                    try {
                        Thread.sleep(1000L);
                    } catch (Exception e) {
                        logger.error("Failed to stop channel.");
                        throw new StopError("Failed to stop channel.", e);
                    }
                } catch (Throwable th) {
                    this.stopRequested.set(false);
                    throw th;
                }
            }
            logger.info("Channel was stopped.");
            this.stopRequested.set(false);
        }
    }

    private void destroy() throws TemporaryError, StopError, PermanentError {
        if (!this.destroying.compareAndSet(false, true)) {
            logger.error("Channel is not safe for multi-threaded access.");
            throw new TemporaryError("Channel is not safe for multi-threaded access.");
        }
        try {
            if (this.active) {
                stop();
                delete();
                this.request.close();
                this.active = false;
                if (logger.isDebugEnabled()) {
                    logger.debug("Channel was destroyed.");
                }
            } else {
                logger.warn("Ignoring call to destroy because Channel is not active.");
            }
        } finally {
            this.destroying.set(false);
        }
    }

    @Override // com.opendxl.streaming.client.Consumer, java.lang.AutoCloseable, com.opendxl.streaming.client.Producer
    public void close() throws TemporaryError, StopError, PermanentError {
        destroy();
    }

    private void consumeLoop(ConsumerRecordProcessor consumerRecordProcessor, List<String> list, Map<String, Object> map, boolean z, int i) throws PermanentError, TemporaryError {
        boolean z2 = true;
        boolean z3 = false;
        while (z2) {
            if (!z3) {
                try {
                    try {
                        try {
                            subscribe(list, map, z);
                            z3 = true;
                            if (logger.isDebugEnabled()) {
                                subscriptions();
                            }
                        } catch (ConsumerError e) {
                            logger.info("Creating a new consumer to resume consuming.");
                            logger.error("Consumer error was: ", e);
                            z3 = false;
                            recreateConsumer(list, e);
                            if (!this.retryOnFail) {
                                z2 = false;
                                if (logger.isDebugEnabled()) {
                                    logger.debug("Exiting run method because retryOnFail is " + this.retryOnFail);
                                }
                            }
                            if (this.stopRequested.get()) {
                                z2 = false;
                            }
                        }
                    } catch (PermanentError | TemporaryError e2) {
                        delete();
                        e2.setApi("run");
                        logger.error("Exiting run method due to error", e2);
                        throw e2;
                    }
                } catch (Throwable th) {
                    if (this.stopRequested.get()) {
                    }
                    throw th;
                }
            }
            z2 = consumerRecordProcessor.processCallback(consume(i, (map == null || map.isEmpty()) ? false : true), this.consumerId);
            if (!z2) {
                logger.info("Callback requested to stop consuming records.");
            }
            commit();
            if (this.stopRequested.get()) {
                z2 = false;
            }
        }
    }

    @Override // com.opendxl.streaming.client.Producer
    public void produce(ProducerRecords producerRecords) throws PermanentError, TemporaryError {
        acquireAndEnsureChannelIsActive();
        try {
            try {
                produce(this.gson.toJson(producerRecords, ProducerRecords.class));
                release();
            } catch (JsonIOException e) {
                String str = "Failed to produce due to a JSON error " + e.getMessage();
                logger.error(str, e);
                throw new TemporaryError(str, (Throwable) e, "produce");
            }
        } catch (Throwable th) {
            release();
            throw th;
        }
    }

    @Override // com.opendxl.streaming.client.Producer
    public void produce(String str) throws PermanentError, TemporaryError {
        acquireAndEnsureChannelIsActive();
        try {
            try {
                StringBuilder append = new StringBuilder(this.producerPathPrefix).append("/produce");
                if (!this.isMultiTenant) {
                    append.append("?multi_tenant=false");
                }
                this.request.post(append.toString(), str.getBytes(), PRODUCE_RECORDS_ERROR_MAP);
                if (logger.isDebugEnabled()) {
                    logger.debug("produced records.");
                }
                release();
            } catch (ConsumerError e) {
                e.setApi("produce");
                logger.error("Failed to produce", e);
                release();
            } catch (PermanentError | TemporaryError e2) {
                e2.setApi("produce");
                logger.error("Failed to produce", e2);
                throw e2;
            }
        } catch (Throwable th) {
            release();
            throw th;
        }
    }

    private void recreateConsumer(List<String> list, Exception exc) throws PermanentError, TemporaryError {
        if (logger.isDebugEnabled()) {
            logger.debug("Deleting current consumer and creating a brand new one.");
        }
        delete();
        this.request.close();
        this.request = new Request(this.base, this.auth, this.verifyCertBundle, this.isHttps, this.httpProxySettings, this.requestHeaders);
        create();
    }

    private void acquireAndEnsureChannelIsActive() throws PermanentError, TemporaryError {
        acquire();
        if (this.active) {
            return;
        }
        release();
        throw new PermanentError("Channel has been destroyed.");
    }

    private void acquire() throws TemporaryError {
        long id = Thread.currentThread().getId();
        if (id != this.currentThread.get() && !this.currentThread.compareAndSet(NO_CURRENT_THREAD, id)) {
            throw new TemporaryError("Channel is not safe for multi-threaded access");
        }
        this.refcount.incrementAndGet();
    }

    private void release() {
        if (this.refcount.decrementAndGet() == 0) {
            this.currentThread.set(NO_CURRENT_THREAD);
        }
    }

    private String logConsumerId() {
        return "consumer " + this.consumerId;
    }
}
