package org.apache.beam.runners.flink.translation.wrappers;

import java.io.IOException;
import java.util.List;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplitAssigner;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.class */
public class SourceInputFormat<T> extends RichInputFormat<WindowedValue<T>, SourceInputSplit<T>> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SourceInputFormat.class);
    private final String stepName;
    private final BoundedSource<T> initialSource;
    private transient PipelineOptions options;
    private final SerializablePipelineOptions serializedOptions;
    private transient BoundedSource.BoundedReader<T> reader;
    private boolean inputAvailable = false;
    private transient ReaderInvocationUtil<T, BoundedSource.BoundedReader<T>> readerInvoker;
    private transient FlinkMetricContainer metricContainer;

    public SourceInputFormat(String str, BoundedSource<T> boundedSource, PipelineOptions pipelineOptions) {
        this.stepName = str;
        this.initialSource = boundedSource;
        this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
    }

    @Override // org.apache.flink.api.common.io.InputFormat
    public void configure(Configuration configuration) {
        this.options = this.serializedOptions.get();
    }

    @Override // org.apache.flink.api.common.io.InputFormat
    public void open(SourceInputSplit<T> sourceInputSplit) throws IOException {
        this.metricContainer = new FlinkMetricContainer(getRuntimeContext());
        this.readerInvoker = new ReaderInvocationUtil<>(this.stepName, this.serializedOptions.get(), this.metricContainer);
        this.reader = ((BoundedSource) sourceInputSplit.getSource()).createReader(this.options);
        this.inputAvailable = this.readerInvoker.invokeStart(this.reader);
    }

    @Override // org.apache.flink.api.common.io.InputFormat
    public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException {
        try {
            final long estimatedSizeBytes = this.initialSource.getEstimatedSizeBytes(this.options);
            return new BaseStatistics() { // from class: org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.1
                @Override // org.apache.flink.api.common.io.statistics.BaseStatistics
                public long getTotalInputSize() {
                    return estimatedSizeBytes;
                }

                @Override // org.apache.flink.api.common.io.statistics.BaseStatistics
                public long getNumberOfRecords() {
                    return -1L;
                }

                @Override // org.apache.flink.api.common.io.statistics.BaseStatistics
                public float getAverageRecordWidth() {
                    return -1.0f;
                }
            };
        } catch (Exception e) {
            LOG.warn("Could not read Source statistics: {}", (Throwable) e);
            return null;
        }
    }

    @Override // org.apache.flink.api.common.io.InputFormat, org.apache.flink.core.io.InputSplitSource
    public SourceInputSplit<T>[] createInputSplits(int i) throws IOException {
        try {
            List<? extends BoundedSource<T>> split = this.initialSource.split(this.initialSource.getEstimatedSizeBytes(this.options) / i, this.options);
            int size = split.size();
            SourceInputSplit<T>[] sourceInputSplitArr = new SourceInputSplit[size];
            for (int i2 = 0; i2 < size; i2++) {
                sourceInputSplitArr[i2] = new SourceInputSplit<>(split.get(i2), i2);
            }
            return sourceInputSplitArr;
        } catch (Exception e) {
            throw new IOException("Could not create input splits from Source.", e);
        }
    }

    @Override // org.apache.flink.api.common.io.InputFormat, org.apache.flink.core.io.InputSplitSource
    public InputSplitAssigner getInputSplitAssigner(SourceInputSplit[] sourceInputSplitArr) {
        return new DefaultInputSplitAssigner(sourceInputSplitArr);
    }

    @Override // org.apache.flink.api.common.io.InputFormat
    public boolean reachedEnd() throws IOException {
        return !this.inputAvailable;
    }

    @Override // org.apache.flink.api.common.io.InputFormat
    public WindowedValue<T> nextRecord(WindowedValue<T> windowedValue) throws IOException {
        if (!this.inputAvailable) {
            return null;
        }
        T current = this.reader.getCurrent();
        Instant currentTimestamp = this.reader.getCurrentTimestamp();
        this.inputAvailable = this.readerInvoker.invokeAdvance(this.reader);
        return WindowedValue.of(current, currentTimestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
    }

    @Override // org.apache.flink.api.common.io.InputFormat
    public void close() throws IOException {
        this.metricContainer.registerMetricsForPipelineResult();
        if (this.reader != null) {
            this.reader.close();
        }
    }
}
