package org.apache.flume.sink.kite;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.DatasetException;
import org.kitesdk.data.DatasetWriter;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.URIBuilder;
import org.kitesdk.data.View;
import org.kitesdk.data.spi.Registration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/sink/kite/DatasetSink.class */
public class DatasetSink extends AbstractSink implements Configurable {
    private String datasetName = null;
    private long batchSize = DatasetSinkConstants.DEFAULT_BATCH_SIZE;
    private URI target = null;
    private Schema targetSchema = null;
    private DatasetWriter<GenericRecord> writer = null;
    private UserGroupInformation login = null;
    private SinkCounter counter = null;
    private int rollIntervalS = DatasetSinkConstants.DEFAULT_ROLL_INTERVAL;
    private long lastRolledMs = 0;
    private GenericRecord datum = null;
    private boolean reuseDatum = true;
    private BinaryDecoder decoder = null;
    private LoadingCache<Schema, DatumReader<GenericRecord>> readers = CacheBuilder.newBuilder().build(new CacheLoader<Schema, DatumReader<GenericRecord>>() { // from class: org.apache.flume.sink.kite.DatasetSink.1
        public DatumReader<GenericRecord> load(Schema schema) {
            return new GenericDatumReader(schema, DatasetSink.this.targetSchema);
        }
    });
    private static final Logger LOG = LoggerFactory.getLogger(DatasetSink.class);
    static Configuration conf = new Configuration();
    private static LoadingCache<String, Schema> schemasFromLiteral = CacheBuilder.newBuilder().build(new CacheLoader<String, Schema>() { // from class: org.apache.flume.sink.kite.DatasetSink.2
        public Schema load(String str) {
            Preconditions.checkNotNull(str, "Schema literal cannot be null without a Schema URL");
            return new Schema.Parser().parse(str);
        }
    });
    private static LoadingCache<String, Schema> schemasFromURL = CacheBuilder.newBuilder().build(new CacheLoader<String, Schema>() { // from class: org.apache.flume.sink.kite.DatasetSink.3
        public Schema load(String str) throws IOException {
            Schema.Parser parser = new Schema.Parser();
            FSDataInputStream fSDataInputStream = null;
            try {
                fSDataInputStream = str.toLowerCase().startsWith("hdfs:/") ? FileSystem.get(URI.create(str), DatasetSink.conf).open(new Path(str)) : new URL(str).openStream();
                Schema parse = parser.parse(fSDataInputStream);
                if (fSDataInputStream != null) {
                    fSDataInputStream.close();
                }
                return parse;
            } catch (Throwable th) {
                if (fSDataInputStream != null) {
                    fSDataInputStream.close();
                }
                throw th;
            }
        }
    });

    protected List<String> allowedFormats() {
        return Lists.newArrayList(new String[]{"avro", "parquet"});
    }

    public void configure(Context context) {
        this.login = KerberosUtil.login(context.getString(DatasetSinkConstants.AUTH_PRINCIPAL), context.getString(DatasetSinkConstants.AUTH_KEYTAB));
        String string = context.getString(DatasetSinkConstants.AUTH_PROXY_USER);
        if (string != null) {
            this.login = KerberosUtil.proxyAs(string, this.login);
        }
        String string2 = context.getString(DatasetSinkConstants.CONFIG_KITE_DATASET_URI);
        if (string2 != null) {
            this.target = URI.create(string2);
            this.datasetName = uriToName(this.target);
        } else {
            String string3 = context.getString(DatasetSinkConstants.CONFIG_KITE_REPO_URI);
            Preconditions.checkNotNull(string3, "Repository URI is missing");
            this.datasetName = context.getString(DatasetSinkConstants.CONFIG_KITE_DATASET_NAME);
            Preconditions.checkNotNull(this.datasetName, "Dataset name is missing");
            this.target = new URIBuilder(string3, "default", this.datasetName).build();
        }
        setName(this.target.toString());
        this.batchSize = context.getLong(DatasetSinkConstants.CONFIG_KITE_BATCH_SIZE, Long.valueOf(DatasetSinkConstants.DEFAULT_BATCH_SIZE)).longValue();
        this.rollIntervalS = context.getInteger(DatasetSinkConstants.CONFIG_KITE_ROLL_INTERVAL, Integer.valueOf(DatasetSinkConstants.DEFAULT_ROLL_INTERVAL)).intValue();
        this.counter = new SinkCounter(this.datasetName);
    }

    public synchronized void start() {
        this.lastRolledMs = System.currentTimeMillis();
        this.counter.start();
        LOG.info("Started DatasetSink " + getName());
        super.start();
    }

    @VisibleForTesting
    public void roll() {
        this.lastRolledMs = 0L;
    }

    public synchronized void stop() {
        this.counter.stop();
        if (this.writer != null) {
            this.writer.close();
            this.writer = null;
            this.lastRolledMs = System.currentTimeMillis();
        }
        LOG.info("Stopped dataset sink: " + getName());
        super.stop();
    }

    public Sink.Status process() throws EventDeliveryException {
        Event take;
        if (this.writer == null) {
            try {
                this.writer = newWriter(this.login, this.target);
            } catch (DatasetException e) {
                throw new EventDeliveryException("Cannot write to " + getName(), e);
            }
        }
        if ((System.currentTimeMillis() - this.lastRolledMs) / 1000 > this.rollIntervalS) {
            this.writer.close();
            this.writer = newWriter(this.login, this.target);
            this.lastRolledMs = System.currentTimeMillis();
            LOG.info("Rolled writer for " + getName());
        }
        Channel channel = getChannel();
        Transaction transaction = null;
        try {
            try {
                long j = 0;
                Transaction transaction2 = channel.getTransaction();
                transaction2.begin();
                while (j < this.batchSize && (take = channel.take()) != null) {
                    this.datum = deserialize(take, this.reuseDatum ? this.datum : null);
                    this.writer.write(this.datum);
                    j++;
                }
                this.writer.flush();
                transaction2.commit();
                if (j == 0) {
                    this.counter.incrementBatchEmptyCount();
                    Sink.Status status = Sink.Status.BACKOFF;
                    if (transaction2 != null) {
                        transaction2.close();
                    }
                    return status;
                }
                if (j < this.batchSize) {
                    this.counter.incrementBatchUnderflowCount();
                } else {
                    this.counter.incrementBatchCompleteCount();
                }
                this.counter.addToEventDrainSuccessCount(j);
                Sink.Status status2 = Sink.Status.READY;
                if (transaction2 != null) {
                    transaction2.close();
                }
                return status2;
            } catch (Throwable th) {
                if (0 != 0) {
                    try {
                        transaction.rollback();
                    } catch (Exception e2) {
                        LOG.error("Transaction rollback failed", e2);
                        throw Throwables.propagate(e2);
                    }
                }
                this.writer.close();
                this.writer = null;
                this.lastRolledMs = System.currentTimeMillis();
                Throwables.propagateIfInstanceOf(th, Error.class);
                Throwables.propagateIfInstanceOf(th, EventDeliveryException.class);
                throw new EventDeliveryException(th);
            }
        } catch (Throwable th2) {
            if (0 != 0) {
                transaction.close();
            }
            throw th2;
        }
    }

    private DatasetWriter<GenericRecord> newWriter(UserGroupInformation userGroupInformation, final URI uri) {
        View view = (View) KerberosUtil.runPrivileged(userGroupInformation, new PrivilegedExceptionAction<Dataset<GenericRecord>>() { // from class: org.apache.flume.sink.kite.DatasetSink.4
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.security.PrivilegedExceptionAction
            public Dataset<GenericRecord> run() {
                return Datasets.load(uri);
            }
        });
        DatasetDescriptor descriptor = view.getDataset().getDescriptor();
        String name = descriptor.getFormat().getName();
        Preconditions.checkArgument(allowedFormats().contains(name), "Unsupported format: " + name);
        Schema schema = descriptor.getSchema();
        if (this.targetSchema == null || !schema.equals(this.targetSchema)) {
            this.targetSchema = descriptor.getSchema();
            this.readers.invalidateAll();
        }
        this.reuseDatum = !"parquet".equals(name);
        this.datasetName = view.getDataset().getName();
        return view.newWriter();
    }

    private GenericRecord deserialize(Event event, GenericRecord genericRecord) throws EventDeliveryException {
        this.decoder = DecoderFactory.get().binaryDecoder(event.getBody(), this.decoder);
        try {
            return (GenericRecord) ((DatumReader) this.readers.getUnchecked(schema(event))).read(genericRecord, this.decoder);
        } catch (IOException e) {
            throw new EventDeliveryException("Cannot deserialize event", e);
        }
    }

    private static Schema schema(Event event) throws EventDeliveryException {
        Map headers = event.getHeaders();
        try {
            return headers.get(DatasetSinkConstants.AVRO_SCHEMA_URL_HEADER) != null ? (Schema) schemasFromURL.get((String) headers.get(DatasetSinkConstants.AVRO_SCHEMA_URL_HEADER)) : (Schema) schemasFromLiteral.get(headers.get(DatasetSinkConstants.AVRO_SCHEMA_LITERAL_HEADER));
        } catch (ExecutionException e) {
            throw new EventDeliveryException("Cannot get schema", e.getCause());
        }
    }

    private static String uriToName(URI uri) {
        return (String) ((Map) Registration.lookupDatasetUri(URI.create(uri.getRawSchemeSpecificPart())).second()).get("dataset");
    }
}
