package org.apache.beam.runners.core;

import java.io.Serializable;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.beam_runners_core_java.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.repackaged.beam_runners_core_java.com.google.common.base.Preconditions;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/core/WatermarkHold.class */
public class WatermarkHold<W extends BoundedWindow> implements Serializable {

    @VisibleForTesting
    public static final StateTag<WatermarkHoldState> EXTRA_HOLD_TAG = StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal("extra", TimestampCombiner.EARLIEST));
    private final TimerInternals timerInternals;
    private final WindowingStrategy<?, W> windowingStrategy;
    private final StateTag<WatermarkHoldState> elementHoldTag;

    /* loaded from: input_file:org/apache/beam/runners/core/WatermarkHold$OldAndNewHolds.class */
    public static class OldAndNewHolds {
        public final Instant oldHold;

        @Nullable
        public final Instant newHold;

        public OldAndNewHolds(Instant instant, @Nullable Instant instant2) {
            this.oldHold = instant;
            this.newHold = instant2;
        }
    }

    public static <W extends BoundedWindow> StateTag<WatermarkHoldState> watermarkHoldTagForTimestampCombiner(TimestampCombiner timestampCombiner) {
        return StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal("hold", timestampCombiner));
    }

    public WatermarkHold(TimerInternals timerInternals, WindowingStrategy<?, W> windowingStrategy) {
        this.timerInternals = timerInternals;
        this.windowingStrategy = windowingStrategy;
        this.elementHoldTag = watermarkHoldTagForTimestampCombiner(windowingStrategy.getTimestampCombiner());
    }

    @Nullable
    public Instant addHolds(ReduceFn<?, ?, ?, W>.ProcessValueContext processValueContext) {
        Instant addElementHold = addElementHold(processValueContext.timestamp(), processValueContext);
        if (addElementHold == null) {
            addElementHold = addGarbageCollectionHold(processValueContext, false);
        }
        return addElementHold;
    }

    private Instant shift(Instant instant, W w) {
        Instant assign = this.windowingStrategy.getTimestampCombiner().assign(w, this.windowingStrategy.getWindowFn().getOutputTime(instant, w));
        Preconditions.checkState(!assign.isBefore(instant), "TimestampCombiner moved element from %s to earlier time %s for window %s", BoundedWindow.formatTimestamp(instant), BoundedWindow.formatTimestamp(assign), w);
        Preconditions.checkState(instant.isAfter(w.maxTimestamp()) || !assign.isAfter(w.maxTimestamp()), "TimestampCombiner moved element from %s to %s which is beyond end of window %s", instant, assign, w);
        return assign;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.beam.sdk.transforms.windowing.BoundedWindow] */
    @Nullable
    private Instant addElementHold(Instant instant, ReduceFn<?, ?, ?, W>.Context context) {
        Object obj;
        boolean z;
        Instant shift = shift(instant, context.window());
        Instant currentOutputWatermarkTime = this.timerInternals.currentOutputWatermarkTime();
        Instant currentInputWatermarkTime = this.timerInternals.currentInputWatermarkTime();
        if (currentOutputWatermarkTime != null && shift.isBefore(currentOutputWatermarkTime)) {
            obj = "too late to effect output watermark";
            z = true;
        } else if (context.window().maxTimestamp().isBefore(currentInputWatermarkTime)) {
            obj = "too late for end-of-window timer";
            z = true;
        } else {
            obj = "on time";
            z = false;
            Preconditions.checkState(!shift.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), "Element hold %s is beyond end-of-time", shift);
            ((WatermarkHoldState) context.state().access(this.elementHoldTag)).add(shift);
        }
        WindowTracing.trace("WatermarkHold.addHolds: element hold at {} is {} for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", shift, obj, context.key(), context.window(), currentInputWatermarkTime, currentOutputWatermarkTime);
        if (z) {
            return null;
        }
        return shift;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Type inference failed for: r0v7, types: [org.apache.beam.sdk.transforms.windowing.BoundedWindow] */
    @Nullable
    public Instant addGarbageCollectionHold(ReduceFn<?, ?, ?, W>.Context context, boolean z) {
        Instant currentOutputWatermarkTime = this.timerInternals.currentOutputWatermarkTime();
        Instant currentInputWatermarkTime = this.timerInternals.currentInputWatermarkTime();
        Instant garbageCollectionTime = LateDataUtils.garbageCollectionTime((BoundedWindow) context.window(), this.windowingStrategy);
        if (garbageCollectionTime.isBefore(currentInputWatermarkTime)) {
            WindowTracing.trace("{}.addGarbageCollectionHold: gc hold would be before the input watermark for key:{}; window: {}; inputWatermark: {}; outputWatermark: {}", getClass().getSimpleName(), context.key(), context.window(), currentInputWatermarkTime, currentOutputWatermarkTime);
            return null;
        }
        if (z && context.windowingStrategy().getClosingBehavior() == Window.ClosingBehavior.FIRE_IF_NON_EMPTY) {
            WindowTracing.trace("WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary since empty pane and FIRE_IF_NON_EMPTY for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", garbageCollectionTime, context.key(), context.window(), currentInputWatermarkTime, currentOutputWatermarkTime);
            return null;
        }
        if (!garbageCollectionTime.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
            garbageCollectionTime = BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1L));
        }
        Preconditions.checkState(!garbageCollectionTime.isBefore(currentInputWatermarkTime), "Garbage collection hold %s cannot be before input watermark %s", garbageCollectionTime, currentInputWatermarkTime);
        Preconditions.checkState(!garbageCollectionTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), "Garbage collection hold %s is beyond end-of-time", garbageCollectionTime);
        ((WatermarkHoldState) context.state().access(EXTRA_HOLD_TAG)).add(garbageCollectionTime);
        WindowTracing.trace("WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is on time for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", garbageCollectionTime, context.key(), context.window(), currentInputWatermarkTime, currentOutputWatermarkTime);
        return garbageCollectionTime;
    }

    public void prefetchOnMerge(MergingStateAccessor<?, W> mergingStateAccessor) {
        Map<W, StateT> accessInEachMergingWindow = mergingStateAccessor.accessInEachMergingWindow(this.elementHoldTag);
        WatermarkHoldState watermarkHoldState = (WatermarkHoldState) mergingStateAccessor.access(this.elementHoldTag);
        if (accessInEachMergingWindow.isEmpty()) {
            return;
        }
        if ((accessInEachMergingWindow.size() == 1 && accessInEachMergingWindow.values().contains(watermarkHoldState) && watermarkHoldState.getTimestampCombiner().dependsOnlyOnEarliestTimestamp()) || watermarkHoldState.getTimestampCombiner().dependsOnlyOnWindow()) {
            return;
        }
        Iterator it = accessInEachMergingWindow.values().iterator();
        while (it.hasNext()) {
            ((WatermarkHoldState) it.next()).readLater();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v11, types: [org.apache.beam.sdk.transforms.windowing.BoundedWindow] */
    public void onMerge(ReduceFn<?, ?, ?, W>.OnMergeContext onMergeContext) {
        WindowTracing.debug("WatermarkHold.onMerge: for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", onMergeContext.key(), onMergeContext.window(), this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime());
        Collection values = onMergeContext.state().accessInEachMergingWindow(this.elementHoldTag).values();
        WatermarkHoldState watermarkHoldState = (WatermarkHoldState) onMergeContext.state().access(this.elementHoldTag);
        if (!values.isEmpty() && (values.size() != 1 || !values.contains(watermarkHoldState) || !watermarkHoldState.getTimestampCombiner().dependsOnlyOnEarliestTimestamp())) {
            if (watermarkHoldState.getTimestampCombiner().dependsOnlyOnWindow()) {
                Iterator it = values.iterator();
                while (it.hasNext()) {
                    ((WatermarkHoldState) it.next()).clear();
                }
                addElementHold(BoundedWindow.TIMESTAMP_MIN_VALUE, onMergeContext);
            } else {
                Iterator it2 = values.iterator();
                while (it2.hasNext()) {
                    ((WatermarkHoldState) it2.next()).readLater();
                }
                Instant instant = null;
                Iterator it3 = values.iterator();
                while (it3.hasNext()) {
                    Instant instant2 = (Instant) ((ReadableState) it3.next()).read();
                    if (instant2 != null) {
                        instant = instant == null ? instant2 : watermarkHoldState.getTimestampCombiner().merge((BoundedWindow) onMergeContext.window(), instant, instant2);
                    }
                }
                Iterator it4 = values.iterator();
                while (it4.hasNext()) {
                    ((WatermarkHoldState) it4.next()).clear();
                }
                if (instant != null) {
                    watermarkHoldState.add(instant);
                }
            }
        }
        StateMerging.clear(onMergeContext.state(), EXTRA_HOLD_TAG);
        addGarbageCollectionHold(onMergeContext, false);
    }

    public void prefetchExtract(ReduceFn<?, ?, ?, W>.Context context) {
        ((WatermarkHoldState) context.state().access(this.elementHoldTag)).readLater();
        ((WatermarkHoldState) context.state().access(EXTRA_HOLD_TAG)).readLater();
    }

    public ReadableState<OldAndNewHolds> extractAndRelease(final ReduceFn<?, ?, ?, W>.Context context, final boolean z) {
        WindowTracing.debug("WatermarkHold.extractAndRelease: for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", context.key(), context.window(), this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime());
        final WatermarkHoldState watermarkHoldState = (WatermarkHoldState) context.state().access(this.elementHoldTag);
        final WatermarkHoldState watermarkHoldState2 = (WatermarkHoldState) context.state().access(EXTRA_HOLD_TAG);
        return new ReadableState<OldAndNewHolds>() { // from class: org.apache.beam.runners.core.WatermarkHold.1
            @Override // org.apache.beam.sdk.state.ReadableState
            public ReadableState<OldAndNewHolds> readLater() {
                watermarkHoldState.readLater();
                watermarkHoldState2.readLater();
                return this;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.beam.sdk.state.ReadableState
            public OldAndNewHolds read() {
                Instant read = watermarkHoldState.read();
                Instant read2 = watermarkHoldState2.read();
                Instant instant = read == null ? read2 : read2 == null ? read : read.isBefore(read2) ? read : read2;
                if (instant == null || instant.isAfter(context.window().maxTimestamp())) {
                    WindowTracing.debug("WatermarkHold.extractAndRelease.read: clipping from {} to end of window for key:{}; window:{}", instant, context.key(), context.window());
                    instant = context.window().maxTimestamp();
                }
                WindowTracing.debug("WatermarkHold.extractAndRelease.read: clearing for key:{}; window:{}", context.key(), context.window());
                watermarkHoldState.clear();
                watermarkHoldState2.clear();
                Instant instant2 = null;
                if (!z) {
                    instant2 = WatermarkHold.this.addGarbageCollectionHold(context, true);
                }
                return new OldAndNewHolds(instant, instant2);
            }
        };
    }

    public void clearHolds(ReduceFn<?, ?, ?, W>.Context context) {
        WindowTracing.debug("WatermarkHold.clearHolds: For key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", context.key(), context.window(), this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime());
        ((WatermarkHoldState) context.state().access(this.elementHoldTag)).clear();
        ((WatermarkHoldState) context.state().access(EXTRA_HOLD_TAG)).clear();
    }

    @Nullable
    public Instant getDataCurrent(ReduceFn<?, ?, ?, W>.Context context) {
        return ((WatermarkHoldState) context.state().access(this.elementHoldTag)).read();
    }
}
