package org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
import org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput.BufferedElements;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.class */
public class BufferingDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
    private final DoFnRunner<InputT, OutputT> underlying;
    private final ListState<CheckpointElement> notYetAcknowledgedSnapshots;
    private final BufferingElementsHandlerFactory bufferingElementsHandlerFactory;
    private String currentStateId = generateNewId();
    private BufferingElementsHandler currentBufferingElementsHandler;

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner$BufferingElementsHandlerFactory.class */
    private interface BufferingElementsHandlerFactory {
        BufferingElementsHandler get(String str) throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner$CheckpointElement.class */
    public static class CheckpointElement {
        final String internalId;
        final long checkpointId;

        CheckpointElement(String str, long j) {
            this.internalId = str;
            this.checkpointId = j;
        }
    }

    public static <InputT, OutputT> BufferingDoFnRunner<InputT, OutputT> create(DoFnRunner<InputT, OutputT> doFnRunner, String str, Coder coder, Coder coder2, OperatorStateBackend operatorStateBackend, @Nullable KeyedStateBackend<Object> keyedStateBackend) throws Exception {
        return new BufferingDoFnRunner<>(doFnRunner, str, coder, coder2, operatorStateBackend, keyedStateBackend);
    }

    private BufferingDoFnRunner(DoFnRunner<InputT, OutputT> doFnRunner, String str, Coder coder, Coder coder2, OperatorStateBackend operatorStateBackend, @Nullable KeyedStateBackend keyedStateBackend) throws Exception {
        this.underlying = doFnRunner;
        this.notYetAcknowledgedSnapshots = operatorStateBackend.getUnionListState(new ListStateDescriptor("notYetAcknowledgedSnapshots", CheckpointElement.class));
        this.bufferingElementsHandlerFactory = str2 -> {
            ListStateDescriptor listStateDescriptor = new ListStateDescriptor(str + str2, new CoderTypeSerializer(new BufferedElements.Coder(coder, coder2)));
            return keyedStateBackend != null ? KeyedBufferingElementsHandler.create(keyedStateBackend, listStateDescriptor) : NonKeyedBufferingElementsHandler.create(operatorStateBackend.getListState(listStateDescriptor));
        };
        this.currentBufferingElementsHandler = this.bufferingElementsHandlerFactory.get(this.currentStateId);
    }

    @Override // org.apache.beam.runners.core.DoFnRunner
    public void startBundle() {
    }

    @Override // org.apache.beam.runners.core.DoFnRunner
    public void processElement(WindowedValue<InputT> windowedValue) {
        this.currentBufferingElementsHandler.buffer(new BufferedElements.Element(windowedValue));
    }

    @Override // org.apache.beam.runners.core.DoFnRunner
    public void onTimer(String str, String str2, BoundedWindow boundedWindow, Instant instant, Instant instant2, TimeDomain timeDomain) {
        this.currentBufferingElementsHandler.buffer(new BufferedElements.Timer(str, str2, boundedWindow, instant, instant2, timeDomain));
    }

    @Override // org.apache.beam.runners.core.DoFnRunner
    public void finishBundle() {
    }

    @Override // org.apache.beam.runners.core.DoFnRunner
    public DoFn<InputT, OutputT> getFn() {
        return this.underlying.getFn();
    }

    public void checkpoint(long j) throws Exception {
        addToBeAcknowledgedCheckpoint(j, this.currentStateId);
        this.currentStateId = generateNewId();
        this.currentBufferingElementsHandler = this.bufferingElementsHandlerFactory.get(this.currentStateId);
    }

    public void checkpointCompleted(long j) throws Exception {
        Iterator<CheckpointElement> it = removeToBeAcknowledgedCheckpoints(j).iterator();
        while (it.hasNext()) {
            BufferingElementsHandler bufferingElementsHandler = this.bufferingElementsHandlerFactory.get(it.next().internalId);
            Iterator<BufferedElement> it2 = bufferingElementsHandler.getElements().iterator();
            boolean hasNext = it2.hasNext();
            if (hasNext) {
                this.underlying.startBundle();
            }
            while (it2.hasNext()) {
                it2.next().processWith(this.underlying);
            }
            if (hasNext) {
                this.underlying.finishBundle();
            }
            bufferingElementsHandler.clear();
        }
    }

    private void addToBeAcknowledgedCheckpoint(long j, String str) throws Exception {
        this.notYetAcknowledgedSnapshots.addAll(Collections.singletonList(new CheckpointElement(str, j)));
    }

    private List<CheckpointElement> removeToBeAcknowledgedCheckpoints(long j) throws Exception {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (CheckpointElement checkpointElement : this.notYetAcknowledgedSnapshots.get()) {
            if (checkpointElement.checkpointId <= j) {
                arrayList.add(checkpointElement);
            } else {
                arrayList2.add(checkpointElement);
            }
        }
        this.notYetAcknowledgedSnapshots.update(arrayList2);
        arrayList.sort(Comparator.comparingLong(checkpointElement2 -> {
            return checkpointElement2.checkpointId;
        }));
        return arrayList;
    }

    private static String generateNewId() {
        return UUID.randomUUID().toString();
    }
}
