package org.apache.gobblin.test;

import avro.shaded.com.google.common.base.Throwables;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.source.Source;
import org.apache.gobblin.source.extractor.CheckpointableWatermark;
import org.apache.gobblin.source.extractor.DataRecordException;
import org.apache.gobblin.source.extractor.DefaultCheckpointableWatermark;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.extractor.StreamingExtractor;
import org.apache.gobblin.source.extractor.WatermarkInterval;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.gobblin.source.extractor.limiter.LimiterConfigurationKeys;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.ExtractFactory;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.test.avro.TestRecord;
import org.apache.gobblin.test.proto.TestRecordProtos;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.AsyncWriterManager;
import org.apache.gobblin.writer.WatermarkStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/test/SequentialTestSource.class */
public class SequentialTestSource implements Source<Object, Object> {
    private static final int DEFAULT_NUM_PARALLELISM = 1;
    private static final String DEFAULT_NAMESPACE = "TestDB";
    private static final String DEFAULT_TABLE = "TestTable";
    public static final String WORK_UNIT_INDEX = "workUnitIndex";
    public static final String MEMORY_FORMAT_KEY = "inMemoryFormat";
    private int num_parallelism;
    private String namespace;
    private String table;
    private int numRecordsPerExtract;
    private long sleepTimePerRecord;
    private InMemoryFormat inMemFormat;
    private static final Logger log = LoggerFactory.getLogger(SequentialTestSource.class);
    private static final Integer DEFAULT_NUM_RECORDS_PER_EXTRACT = 100;
    private static final Long DEFAULT_SLEEP_TIME_PER_RECORD_MILLIS = 10L;
    public static final String DEFAULT_IN_MEMORY_FORMAT = InMemoryFormat.POJO.toString();
    private final AtomicBoolean configured = new AtomicBoolean(false);
    private final Extract.TableType tableType = Extract.TableType.APPEND_ONLY;
    private final ExtractFactory _extractFactory = new ExtractFactory("yyyyMMddHHmmss");
    private boolean streaming = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.gobblin.test.SequentialTestSource$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/gobblin/test/SequentialTestSource$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$gobblin$test$SequentialTestSource$InMemoryFormat = new int[InMemoryFormat.values().length];

        static {
            try {
                $SwitchMap$org$apache$gobblin$test$SequentialTestSource$InMemoryFormat[InMemoryFormat.POJO.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$gobblin$test$SequentialTestSource$InMemoryFormat[InMemoryFormat.AVRO.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$gobblin$test$SequentialTestSource$InMemoryFormat[InMemoryFormat.PROTOBUF.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/gobblin/test/SequentialTestSource$InMemoryFormat.class */
    public enum InMemoryFormat {
        POJO,
        AVRO,
        PROTOBUF
    }

    /* loaded from: input_file:org/apache/gobblin/test/SequentialTestSource$TestBatchExtractor.class */
    static class TestBatchExtractor implements Extractor<Object, Object> {
        private long recordsExtracted = 0;
        private final long numRecordsPerExtract;
        private LongWatermark currentWatermark;
        private long sleepTimePerRecord;
        private int partition;
        private final InMemoryFormat inMemoryFormat;
        private final Object schema;
        WorkUnitState workUnitState;

        public TestBatchExtractor(int i, LongWatermark longWatermark, long j, long j2, WorkUnitState workUnitState) {
            this.partition = i;
            this.currentWatermark = longWatermark;
            this.numRecordsPerExtract = j;
            this.sleepTimePerRecord = j2;
            this.workUnitState = workUnitState;
            this.inMemoryFormat = InMemoryFormat.valueOf(this.workUnitState.getProp(SequentialTestSource.MEMORY_FORMAT_KEY));
            this.schema = getSchema(this.inMemoryFormat);
        }

        public Object getSchema() throws IOException {
            return this.schema;
        }

        private Object getSchema(InMemoryFormat inMemoryFormat) {
            switch (AnonymousClass1.$SwitchMap$org$apache$gobblin$test$SequentialTestSource$InMemoryFormat[inMemoryFormat.ordinal()]) {
                case 1:
                    return TestRecord.class;
                case 2:
                    return TestRecord.getClassSchema();
                case AsyncWriterManager.MIN_RETRY_INTERVAL_MILLIS_DEFAULT /* 3 */:
                    return TestRecordProtos.TestRecord.class;
                default:
                    throw new RuntimeException("Not implemented " + inMemoryFormat.name());
            }
        }

        public RecordEnvelope readRecordEnvelope() throws DataRecordException, IOException {
            TestRecord build;
            if (this.recordsExtracted >= this.numRecordsPerExtract) {
                return null;
            }
            try {
                Thread.sleep(this.sleepTimePerRecord);
            } catch (InterruptedException e) {
                Throwables.propagate(e);
            }
            switch (AnonymousClass1.$SwitchMap$org$apache$gobblin$test$SequentialTestSource$InMemoryFormat[this.inMemoryFormat.ordinal()]) {
                case 1:
                    build = new TestRecord(this.partition, this.currentWatermark.getValue(), "I am a POJO message");
                    break;
                case 2:
                    build = TestRecord.newBuilder().setPartition(this.partition).setSequence(this.currentWatermark.getValue()).setPayload("I am an Avro message").build();
                    break;
                case AsyncWriterManager.MIN_RETRY_INTERVAL_MILLIS_DEFAULT /* 3 */:
                    build = TestRecordProtos.TestRecord.newBuilder().setPartition(this.partition).setSequence(this.currentWatermark.getValue()).setPayload("I am a Protobuf message").build();
                    break;
                default:
                    throw new RuntimeException(LimiterConfigurationKeys.DEFAULT_LIMITER_REPORT_KEY_LIST);
            }
            SequentialTestSource.log.debug("Extracted record -> {}", build);
            RecordEnvelope recordEnvelope = new RecordEnvelope(build, new DefaultCheckpointableWatermark(String.valueOf(this.partition), new LongWatermark(this.currentWatermark.getValue())));
            this.currentWatermark.increment();
            this.recordsExtracted++;
            return recordEnvelope;
        }

        public long getExpectedRecordCount() {
            return this.numRecordsPerExtract;
        }

        public long getHighWatermark() {
            return this.workUnitState.getHighWaterMark();
        }

        public void close() throws IOException {
            this.workUnitState.setActualHighWatermark(this.currentWatermark);
        }

        public void setCurrentWatermark(LongWatermark longWatermark) {
            this.currentWatermark = longWatermark;
        }
    }

    /* loaded from: input_file:org/apache/gobblin/test/SequentialTestSource$TestStreamingExtractor.class */
    static class TestStreamingExtractor implements StreamingExtractor<Object, Object> {
        private Optional<WatermarkStorage> watermarkStorage;
        private final TestBatchExtractor extractor;

        public TestStreamingExtractor(TestBatchExtractor testBatchExtractor) {
            this.extractor = testBatchExtractor;
        }

        public void close() throws IOException {
            this.extractor.close();
        }

        public Object getSchema() throws IOException {
            return this.extractor.getSchema();
        }

        public RecordEnvelope<Object> readRecordEnvelope() throws DataRecordException, IOException {
            return this.extractor.readRecordEnvelope();
        }

        public long getExpectedRecordCount() {
            return this.extractor.getExpectedRecordCount();
        }

        public long getHighWatermark() {
            return this.extractor.getHighWatermark();
        }

        public void start(WatermarkStorage watermarkStorage) throws IOException {
            Map map;
            this.watermarkStorage = Optional.of(watermarkStorage);
            try {
                map = ((WatermarkStorage) this.watermarkStorage.get()).getCommittedWatermarks(DefaultCheckpointableWatermark.class, ImmutableList.of(LimiterConfigurationKeys.DEFAULT_LIMITER_REPORT_KEY_LIST + this.extractor.partition));
            } catch (IOException e) {
                SequentialTestSource.log.warn("Failed to get watermarks... will start from the beginning", e);
                map = Collections.EMPTY_MAP;
            }
            for (Map.Entry entry : map.entrySet()) {
                SequentialTestSource.log.info("{}: Found these committed watermarks: key: {}, value: {}", new Object[]{this, entry.getKey(), entry.getValue()});
            }
            LongWatermark longWatermark = (map.isEmpty() || !map.containsKey(new StringBuilder().append(LimiterConfigurationKeys.DEFAULT_LIMITER_REPORT_KEY_LIST).append(this.extractor.partition).toString())) ? new LongWatermark(-1L) : (LongWatermark) ((CheckpointableWatermark) map.get(LimiterConfigurationKeys.DEFAULT_LIMITER_REPORT_KEY_LIST + this.extractor.partition)).getWatermark();
            this.extractor.setCurrentWatermark(longWatermark);
            SequentialTestSource.log.info("{}: Set current watermark to : {}", this, longWatermark);
        }
    }

    private void configureIfNeeded(Config config) {
        if (this.configured.get()) {
            return;
        }
        this.num_parallelism = ConfigUtils.getInt(config, "source.numParallelism", 1).intValue();
        this.namespace = ConfigUtils.getString(config, "source.namespace", DEFAULT_NAMESPACE);
        this.table = ConfigUtils.getString(config, "source.table", DEFAULT_TABLE);
        this.numRecordsPerExtract = ConfigUtils.getInt(config, "source.numRecordsPerExtract", DEFAULT_NUM_RECORDS_PER_EXTRACT).intValue();
        this.sleepTimePerRecord = ConfigUtils.getLong(config, "source.sleepTimePerRecordMillis", DEFAULT_SLEEP_TIME_PER_RECORD_MILLIS).longValue();
        this.streaming = ConfigUtils.getString(config, "task.executionMode", "BATCH").equalsIgnoreCase("STREAMING");
        if (this.streaming) {
            this.numRecordsPerExtract = Integer.MAX_VALUE;
        }
        this.inMemFormat = InMemoryFormat.valueOf(ConfigUtils.getString(config, "source.inMemoryFormat", DEFAULT_IN_MEMORY_FORMAT));
        log.info("Source configured with: num_parallelism: {}, namespace: {}, table: {}, numRecordsPerExtract: {}, sleepTimePerRecord: {}, streaming: {}, inMemFormat: {}", new Object[]{Integer.valueOf(this.num_parallelism), this.namespace, this.table, Integer.valueOf(this.numRecordsPerExtract), Long.valueOf(this.sleepTimePerRecord), Boolean.valueOf(this.streaming), this.inMemFormat});
        this.configured.set(true);
    }

    public List<WorkUnit> getWorkunits(SourceState sourceState) {
        WorkUnit create;
        configureIfNeeded(ConfigFactory.parseProperties(sourceState.getProperties()));
        List<WorkUnitState> previousWorkUnitStates = sourceState.getPreviousWorkUnitStates();
        if (previousWorkUnitStates.isEmpty()) {
            return initialWorkUnits();
        }
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(previousWorkUnitStates.size());
        for (WorkUnitState workUnitState : previousWorkUnitStates) {
            if (workUnitState.getWorkingState().equals(WorkUnitState.WorkingState.COMMITTED)) {
                LongWatermark actualHighWatermark = workUnitState.getActualHighWatermark(LongWatermark.class);
                WatermarkInterval watermarkInterval = new WatermarkInterval(actualHighWatermark, new LongWatermark(actualHighWatermark.getValue() + this.numRecordsPerExtract));
                create = WorkUnit.create(newExtract(this.tableType, this.namespace, this.table), watermarkInterval);
                log.debug("Will be setting watermark interval to " + watermarkInterval.toJson());
                create.setProp(WORK_UNIT_INDEX, workUnitState.getWorkunit().getProp(WORK_UNIT_INDEX));
                create.setProp(MEMORY_FORMAT_KEY, this.inMemFormat.toString());
            } else {
                LongWatermark lowWatermark = workUnitState.getWorkunit().getLowWatermark(LongWatermark.class);
                WatermarkInterval watermarkInterval2 = new WatermarkInterval(lowWatermark, new LongWatermark(lowWatermark.getValue() + this.numRecordsPerExtract));
                create = WorkUnit.create(newExtract(this.tableType, this.namespace, this.table), watermarkInterval2);
                log.debug("Will be setting watermark interval to " + watermarkInterval2.toJson());
                create.setProp(WORK_UNIT_INDEX, workUnitState.getWorkunit().getProp(WORK_UNIT_INDEX));
                create.setProp(MEMORY_FORMAT_KEY, this.inMemFormat.toString());
            }
            newArrayListWithCapacity.add(create);
        }
        return newArrayListWithCapacity;
    }

    private List<WorkUnit> initialWorkUnits() {
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < this.num_parallelism; i++) {
            WorkUnit create = WorkUnit.create(newExtract(Extract.TableType.APPEND_ONLY, this.namespace, this.table));
            create.setWatermarkInterval(new WatermarkInterval(new LongWatermark((i * this.numRecordsPerExtract) + 1), new LongWatermark((i + 1) * this.numRecordsPerExtract)));
            create.setProp(WORK_UNIT_INDEX, Integer.valueOf(i));
            create.setProp(MEMORY_FORMAT_KEY, this.inMemFormat.toString());
            newArrayList.add(create);
        }
        return newArrayList;
    }

    private Extract newExtract(Extract.TableType tableType, String str, String str2) {
        return this._extractFactory.getUniqueExtract(tableType, str, str2);
    }

    public Extractor<Object, Object> getExtractor(WorkUnitState workUnitState) throws IOException {
        configureIfNeeded(ConfigFactory.parseProperties(workUnitState.getProperties()));
        TestBatchExtractor testBatchExtractor = new TestBatchExtractor(workUnitState.getPropAsInt(WORK_UNIT_INDEX), workUnitState.getWorkunit().getLowWatermark(LongWatermark.class), this.numRecordsPerExtract, this.sleepTimePerRecord, workUnitState);
        return !this.streaming ? testBatchExtractor : new TestStreamingExtractor(testBatchExtractor);
    }

    public void shutdown(SourceState sourceState) {
    }
}
