package org.apache.pulsar.functions.instance;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.gson.Gson;
import org.apache.pulsar.shade.com.google.gson.reflect.TypeToken;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/pulsar/functions/instance/ContextImpl.class */
public class ContextImpl implements Context, SinkContext, SourceContext {
    private InstanceConfig config;
    private Logger logger;
    private Record<?> record;
    private ConcurrentMap<String, AccumulatedMetricDatum> currentAccumulatedMetrics = new ConcurrentHashMap();
    private ConcurrentMap<String, AccumulatedMetricDatum> accumulatedMetrics = new ConcurrentHashMap();
    private Map<String, Producer<?>> publishProducers = new HashMap();
    private ProducerBuilderImpl<?> producerBuilder;
    private final List<String> inputTopics;
    private final TopicSchema topicSchema;
    private StateContextImpl stateContext;
    private Map<String, Object> userConfigs;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/functions/instance/ContextImpl$AccumulatedMetricDatum.class */
    public class AccumulatedMetricDatum {
        private double count = 0.0d;
        private double sum = 0.0d;
        private double max = Double.MIN_VALUE;
        private double min = Double.MAX_VALUE;

        AccumulatedMetricDatum() {
        }

        public void update(double d) {
            this.count += 1.0d;
            this.sum += d;
            if (this.max < d) {
                this.max = d;
            }
            if (this.min > d) {
                this.min = d;
            }
        }

        public double getCount() {
            return this.count;
        }

        public double getSum() {
            return this.sum;
        }

        public double getMax() {
            return this.max;
        }

        public double getMin() {
            return this.min;
        }

        public void setCount(double d) {
            this.count = d;
        }

        public void setSum(double d) {
            this.sum = d;
        }

        public void setMax(double d) {
            this.max = d;
        }

        public void setMin(double d) {
            this.min = d;
        }
    }

    public ContextImpl(InstanceConfig instanceConfig, Logger logger, PulsarClient pulsarClient, List<String> list) {
        this.config = instanceConfig;
        this.logger = logger;
        this.inputTopics = list;
        this.topicSchema = new TopicSchema(pulsarClient);
        this.producerBuilder = (ProducerBuilderImpl) pulsarClient.newProducer().blockIfQueueFull(true).enableBatching(true).batchingMaxPublishDelay(1L, TimeUnit.MILLISECONDS);
        if (instanceConfig.getFunctionDetails().getUserConfig().isEmpty()) {
            this.userConfigs = new HashMap();
        } else {
            this.userConfigs = (Map) new Gson().fromJson(instanceConfig.getFunctionDetails().getUserConfig(), new TypeToken<Map<String, Object>>() { // from class: org.apache.pulsar.functions.instance.ContextImpl.1
            }.getType());
        }
    }

    public void setCurrentMessageContext(Record<?> record) {
        this.record = record;
    }

    @Override // org.apache.pulsar.functions.api.Context
    public Record<?> getCurrentRecord() {
        return this.record;
    }

    @Override // org.apache.pulsar.functions.api.Context
    public Collection<String> getInputTopics() {
        return this.inputTopics;
    }

    @Override // org.apache.pulsar.functions.api.Context
    public String getOutputTopic() {
        return this.config.getFunctionDetails().getSink().getTopic();
    }

    @Override // org.apache.pulsar.functions.api.Context
    public String getOutputSchemaType() {
        Function.SinkSpec sink = this.config.getFunctionDetails().getSink();
        return !StringUtils.isEmpty(sink.getSchemaType()) ? sink.getSchemaType() : sink.getSerDeClassName();
    }

    @Override // org.apache.pulsar.functions.api.Context
    public String getTenant() {
        return this.config.getFunctionDetails().getTenant();
    }

    @Override // org.apache.pulsar.functions.api.Context
    public String getNamespace() {
        return this.config.getFunctionDetails().getNamespace();
    }

    @Override // org.apache.pulsar.functions.api.Context
    public String getFunctionName() {
        return this.config.getFunctionDetails().getName();
    }

    @Override // org.apache.pulsar.functions.api.Context
    public String getFunctionId() {
        return this.config.getFunctionId().toString();
    }

    @Override // org.apache.pulsar.functions.api.Context, org.apache.pulsar.io.core.SinkContext, org.apache.pulsar.io.core.SourceContext
    public int getInstanceId() {
        return this.config.getInstanceId();
    }

    @Override // org.apache.pulsar.functions.api.Context, org.apache.pulsar.io.core.SinkContext, org.apache.pulsar.io.core.SourceContext
    public int getNumInstances() {
        return this.config.getFunctionDetails().getParallelism();
    }

    @Override // org.apache.pulsar.functions.api.Context
    public String getFunctionVersion() {
        return this.config.getFunctionVersion();
    }

    @Override // org.apache.pulsar.functions.api.Context
    public Logger getLogger() {
        return this.logger;
    }

    @Override // org.apache.pulsar.functions.api.Context
    public Optional<Object> getUserConfigValue(String str) {
        return Optional.ofNullable(this.userConfigs.getOrDefault(str, null));
    }

    @Override // org.apache.pulsar.functions.api.Context
    public Object getUserConfigValueOrDefault(String str, Object obj) {
        return getUserConfigValue(str).orElse(obj);
    }

    @Override // org.apache.pulsar.functions.api.Context
    public Map<String, Object> getUserConfigMap() {
        return this.userConfigs;
    }

    private void ensureStateEnabled() {
        Preconditions.checkState(null != this.stateContext, "State is not enabled.");
    }

    @Override // org.apache.pulsar.functions.api.Context
    public void incrCounter(String str, long j) {
        ensureStateEnabled();
        try {
            this.stateContext.incr(str, j);
        } catch (Exception e) {
            throw new RuntimeException("Failed to increment key '" + str + "' by amount '" + j + "'", e);
        }
    }

    @Override // org.apache.pulsar.functions.api.Context
    public long getCounter(String str) {
        ensureStateEnabled();
        try {
            return this.stateContext.getAmount(str);
        } catch (Exception e) {
            throw new RuntimeException("Failed to retrieve counter from key '" + str + "'");
        }
    }

    @Override // org.apache.pulsar.functions.api.Context
    public void putState(String str, ByteBuffer byteBuffer) {
        ensureStateEnabled();
        try {
            this.stateContext.put(str, byteBuffer);
        } catch (Exception e) {
            throw new RuntimeException("Failed to update the state value for key '" + str + "'");
        }
    }

    @Override // org.apache.pulsar.functions.api.Context
    public ByteBuffer getState(String str) {
        ensureStateEnabled();
        try {
            return this.stateContext.getValue(str);
        } catch (Exception e) {
            throw new RuntimeException("Failed to retrieve the state value for key '" + str + "'");
        }
    }

    @Override // org.apache.pulsar.functions.api.Context
    public <O> CompletableFuture<Void> publish(String str, O o) {
        return publish(str, (String) o, "");
    }

    @Override // org.apache.pulsar.functions.api.Context
    public <O> CompletableFuture<Void> publish(String str, O o, String str2) {
        return publish(str, (String) o, (Schema<String>) this.topicSchema.getSchema(str, (Object) o, str2, false));
    }

    public <O> CompletableFuture<Void> publish(String str, O o, Schema<O> schema) {
        Producer<?> producer = this.publishProducers.get(str);
        if (producer == null) {
            try {
                Producer<?> create = ((ProducerBuilderImpl) this.producerBuilder.m905clone()).schema(schema).topic(str).create();
                Producer<?> putIfAbsent = this.publishProducers.putIfAbsent(str, create);
                if (putIfAbsent != null) {
                    create.close();
                    producer = putIfAbsent;
                } else {
                    producer = create;
                }
            } catch (PulsarClientException e) {
                this.logger.error("Failed to create Producer while doing user publish", (Throwable) e);
                return FutureUtil.failedFuture(e);
            }
        }
        return producer.sendAsync((Producer<?>) o).thenApply(messageId -> {
            return null;
        });
    }

    @Override // org.apache.pulsar.functions.api.Context, org.apache.pulsar.io.core.SinkContext, org.apache.pulsar.io.core.SourceContext
    public void recordMetric(String str, double d) {
        this.currentAccumulatedMetrics.putIfAbsent(str, new AccumulatedMetricDatum());
        this.currentAccumulatedMetrics.get(str).update(d);
    }

    public InstanceCommunication.MetricsData getAndResetMetrics() {
        InstanceCommunication.MetricsData metrics = getMetrics();
        resetMetrics();
        return metrics;
    }

    public void resetMetrics() {
        this.accumulatedMetrics.clear();
        this.accumulatedMetrics.putAll(this.currentAccumulatedMetrics);
        this.currentAccumulatedMetrics.clear();
    }

    public InstanceCommunication.MetricsData getMetrics() {
        InstanceCommunication.MetricsData.Builder newBuilder = InstanceCommunication.MetricsData.newBuilder();
        for (String str : this.accumulatedMetrics.keySet()) {
            InstanceCommunication.MetricsData.DataDigest.Builder newBuilder2 = InstanceCommunication.MetricsData.DataDigest.newBuilder();
            newBuilder2.setSum(this.accumulatedMetrics.get(str).getSum());
            newBuilder2.setCount(this.accumulatedMetrics.get(str).getCount());
            newBuilder2.setMax(this.accumulatedMetrics.get(str).getMax());
            newBuilder2.setMin(this.accumulatedMetrics.get(str).getMax());
            newBuilder.putMetrics(str, newBuilder2.build());
        }
        return newBuilder.build();
    }

    public StateContextImpl getStateContext() {
        return this.stateContext;
    }

    public void setStateContext(StateContextImpl stateContextImpl) {
        this.stateContext = stateContextImpl;
    }
}
