package org.apache.flume.source;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.avro.AvroRemoteException;
import org.apache.avro.ipc.NettyServer;
import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.flume.source.avro.AvroSourceProtocol;
import org.apache.flume.source.avro.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/source/AvroSource.class */
public class AvroSource extends AbstractSource implements EventDrivenSource, Configurable, AvroSourceProtocol {
    private static final Logger logger = LoggerFactory.getLogger(AvroSource.class);
    private int port;
    private String bindAddress;
    private Server server;
    private CounterGroup counterGroup = new CounterGroup();

    @Override // org.apache.flume.conf.Configurable
    public void configure(Context context) {
        this.port = Integer.parseInt((String) context.get("port", String.class));
        this.bindAddress = (String) context.get("bind", String.class);
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public void start() {
        logger.info("Avro source starting:{}", this);
        this.server = new NettyServer(new SpecificResponder(AvroSourceProtocol.class, this), new InetSocketAddress(this.bindAddress, this.port));
        this.server.start();
        super.start();
        logger.debug("Avro source started");
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public void stop() {
        logger.info("Avro source stopping:{}", this);
        this.server.close();
        try {
            this.server.join();
        } catch (InterruptedException e) {
            logger.info("Interrupted while waiting for Avro server to stop. Exiting.");
        }
        super.stop();
        logger.debug("Avro source stopped. Metrics:{}", this.counterGroup);
    }

    public String toString() {
        return "AvroSource: { bindAddress:" + this.bindAddress + " port:" + this.port + " }";
    }

    @Override // org.apache.flume.source.avro.AvroSourceProtocol
    public Status append(AvroFlumeEvent avroFlumeEvent) throws AvroRemoteException {
        logger.debug("Received avro event:{}", avroFlumeEvent);
        this.counterGroup.incrementAndGet("rpc.received");
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        try {
            try {
                transaction.begin();
                HashMap hashMap = new HashMap();
                for (Map.Entry<CharSequence, CharSequence> entry : avroFlumeEvent.headers.entrySet()) {
                    hashMap.put(entry.getKey().toString(), entry.getValue().toString());
                }
                channel.put(EventBuilder.withBody(avroFlumeEvent.body.array(), hashMap));
                this.counterGroup.incrementAndGet("rpc.events");
                transaction.commit();
                transaction.close();
                this.counterGroup.incrementAndGet("rpc.successful");
                return Status.OK;
            } catch (ChannelException e) {
                transaction.rollback();
                Status status = Status.FAILED;
                transaction.close();
                return status;
            }
        } catch (Throwable th) {
            transaction.close();
            throw th;
        }
    }

    @Override // org.apache.flume.source.avro.AvroSourceProtocol
    public Status appendBatch(List<AvroFlumeEvent> list) {
        this.counterGroup.incrementAndGet("rpc.received.batch");
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        try {
            try {
                transaction.begin();
                for (AvroFlumeEvent avroFlumeEvent : list) {
                    HashMap hashMap = new HashMap();
                    for (Map.Entry<CharSequence, CharSequence> entry : avroFlumeEvent.headers.entrySet()) {
                        hashMap.put(entry.getKey().toString(), entry.getValue().toString());
                    }
                    channel.put(EventBuilder.withBody(avroFlumeEvent.body.array(), hashMap));
                    this.counterGroup.incrementAndGet("rpc.events");
                }
                transaction.commit();
                transaction.close();
                this.counterGroup.incrementAndGet("rpc.successful");
                return Status.OK;
            } catch (ChannelException e) {
                transaction.rollback();
                Status status = Status.FAILED;
                transaction.close();
                return status;
            }
        } catch (Throwable th) {
            transaction.close();
            throw th;
        }
    }
}
