package org.apache.flume.sink;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import org.apache.avro.AvroRemoteException;
import org.apache.avro.ipc.NettyTransceiver;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.flume.source.avro.AvroSourceProtocol;
import org.apache.flume.source.avro.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/sink/AvroSink.class */
public class AvroSink extends AbstractSink implements PollableSink, Configurable {
    private static final Logger logger = LoggerFactory.getLogger(AvroSink.class);
    private static final Integer defaultBatchSize = 100;
    private String hostname;
    private Integer port;
    private Integer batchSize;
    private AvroSourceProtocol client;
    private Transceiver transceiver;
    private CounterGroup counterGroup = new CounterGroup();

    @Override // org.apache.flume.conf.Configurable
    public void configure(Context context) {
        this.hostname = (String) context.get("hostname", String.class);
        this.port = Integer.valueOf(Integer.parseInt((String) context.get("port", String.class)));
        this.batchSize = Integer.valueOf(Integer.parseInt((String) context.get("batch-size", String.class)));
        if (this.batchSize == null) {
            this.batchSize = defaultBatchSize;
        }
        Preconditions.checkState(this.hostname != null, "No hostname specified");
        Preconditions.checkState(this.port != null, "No port specified");
    }

    private void createConnection() throws IOException {
        if (this.transceiver == null) {
            logger.debug("Creating new tranceiver connection to hostname:{} port:{}", this.hostname, this.port);
            this.transceiver = new NettyTransceiver(new InetSocketAddress(this.hostname, this.port.intValue()));
        }
        if (this.client == null) {
            logger.debug("Creating Avro client with tranceiver:{}", this.transceiver);
            this.client = (AvroSourceProtocol) SpecificRequestor.getClient(AvroSourceProtocol.class, this.transceiver);
        }
    }

    private void destroyConnection() {
        if (this.transceiver != null) {
            logger.debug("Destroying tranceiver:{}", this.transceiver);
            try {
                this.transceiver.close();
            } catch (IOException e) {
                logger.error("Attempt to clean up avro tranceiver after client error failed. Exception follows.", e);
            }
            this.transceiver = null;
        }
        this.client = null;
    }

    @Override // org.apache.flume.sink.AbstractSink, org.apache.flume.lifecycle.LifecycleAware
    public void start() {
        logger.info("Avro sink starting");
        try {
            createConnection();
            super.start();
            logger.debug("Avro sink started");
        } catch (Exception e) {
            logger.error("Unable to create avro client using hostname:" + this.hostname + " port:" + this.port + ". Exception follows.", e);
            destroyConnection();
        }
    }

    @Override // org.apache.flume.sink.AbstractSink, org.apache.flume.lifecycle.LifecycleAware
    public void stop() {
        logger.info("Avro sink stopping");
        destroyConnection();
        super.stop();
        logger.debug("Avro sink stopped. Metrics:{}", this.counterGroup);
    }

    @Override // org.apache.flume.PollableSink
    public PollableSink.Status process() throws EventDeliveryException {
        PollableSink.Status status = PollableSink.Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        try {
            try {
                transaction.begin();
                createConnection();
                LinkedList linkedList = new LinkedList();
                int i = 0;
                while (true) {
                    if (i >= this.batchSize.intValue()) {
                        break;
                    }
                    Event take = channel.take();
                    if (take == null) {
                        this.counterGroup.incrementAndGet("batch.underflow");
                        break;
                    }
                    AvroFlumeEvent avroFlumeEvent = new AvroFlumeEvent();
                    avroFlumeEvent.body = ByteBuffer.wrap(take.getBody());
                    avroFlumeEvent.headers = new HashMap();
                    for (Map.Entry<String, String> entry : take.getHeaders().entrySet()) {
                        avroFlumeEvent.headers.put(entry.getKey(), entry.getValue());
                    }
                    linkedList.add(avroFlumeEvent);
                    i++;
                }
                if (linkedList.isEmpty()) {
                    this.counterGroup.incrementAndGet("batch.empty");
                    status = PollableSink.Status.BACKOFF;
                } else if (!this.client.appendBatch(linkedList).equals(Status.OK)) {
                    throw new AvroRemoteException("RPC communication returned FAILED");
                }
                transaction.commit();
                this.counterGroup.incrementAndGet("batch.success");
                transaction.close();
            } catch (AvroRemoteException e) {
                transaction.rollback();
                logger.error("Unable to send event batch. Exception follows.", e);
                status = PollableSink.Status.BACKOFF;
                transaction.close();
            } catch (ChannelException e2) {
                transaction.rollback();
                logger.error("Unable to get event from channel. Exception follows.", e2);
                status = PollableSink.Status.BACKOFF;
                transaction.close();
            } catch (Exception e3) {
                transaction.rollback();
                logger.error("Unable to communicate with Avro server. Exception follows.", e3);
                status = PollableSink.Status.BACKOFF;
                destroyConnection();
                transaction.close();
            }
            return status;
        } catch (Throwable th) {
            transaction.close();
            throw th;
        }
    }
}
