package org.apache.flink.runtime.operators.sort;

import java.io.File;
import java.io.IOException;
import java.lang.Thread;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/operators/sort/UnilateralSortMerger.class */
public class UnilateralSortMerger<E> implements Sorter<E> {
    private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
    protected static final int MIN_NUM_WRITE_BUFFERS = 2;
    protected static final int MAX_NUM_WRITE_BUFFERS = 4;
    protected static final int MIN_NUM_SORT_MEM_SEGMENTS = 10;
    private final ThreadBase<E> readThread;
    private final ThreadBase<E> sortThread;
    private final ThreadBase<E> spillThread;
    protected final List<MemorySegment> sortReadMemory;
    protected final List<MemorySegment> writeMemory;
    protected final MemoryManager memoryManager;
    private final LargeRecordHandler<E> largeRecordHandler;
    private final HashSet<FileIOChannel> openChannels;
    private final HashSet<FileIOChannel.ID> channelsToDeleteAtShutdown;
    protected final Object iteratorLock;
    protected volatile MutableObjectIterator<E> iterator;
    protected volatile IOException iteratorException;
    protected volatile boolean closed;
    protected final boolean objectReuseEnabled;
    private final Collection<InMemorySorter<?>> inMemorySorters;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) UnilateralSortMerger.class);
    private static final CircularElement<Object> EOF_MARKER = new CircularElement<>();
    private static final CircularElement<Object> SPILLING_MARKER = new CircularElement<>();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/runtime/operators/sort/UnilateralSortMerger$ChannelWithBlockCount.class */
    public static final class ChannelWithBlockCount {
        private final FileIOChannel.ID channel;
        private final int blockCount;

        public ChannelWithBlockCount(FileIOChannel.ID id, int i) {
            this.channel = id;
            this.blockCount = i;
        }

        public FileIOChannel.ID getChannel() {
            return this.channel;
        }

        public int getBlockCount() {
            return this.blockCount;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/runtime/operators/sort/UnilateralSortMerger$CircularElement.class */
    public static final class CircularElement<E> {
        final int id;
        final InMemorySorter<E> buffer;
        final List<MemorySegment> memory;

        public CircularElement() {
            this.id = -1;
            this.buffer = null;
            this.memory = null;
        }

        public CircularElement(int i, InMemorySorter<E> inMemorySorter, List<MemorySegment> list) {
            this.id = i;
            this.buffer = inMemorySorter;
            this.memory = list;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/runtime/operators/sort/UnilateralSortMerger$CircularQueues.class */
    public static final class CircularQueues<E> {
        final BlockingQueue<CircularElement<E>> empty;
        final BlockingQueue<CircularElement<E>> sort;
        final BlockingQueue<CircularElement<E>> spill;

        public CircularQueues() {
            this.empty = new LinkedBlockingQueue();
            this.sort = new LinkedBlockingQueue();
            this.spill = new LinkedBlockingQueue();
        }

        public CircularQueues(int i) {
            this.empty = new ArrayBlockingQueue(i);
            this.sort = new ArrayBlockingQueue(i);
            this.spill = new ArrayBlockingQueue(i);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/runtime/operators/sort/UnilateralSortMerger$ReadingThread.class */
    public static class ReadingThread<E> extends ThreadBase<E> {
        private final MutableObjectIterator<E> reader;
        private final LargeRecordHandler<E> largeRecords;
        private final long startSpillingBytes;
        private final E readTarget;

        public ReadingThread(ExceptionHandler<IOException> exceptionHandler, MutableObjectIterator<E> mutableObjectIterator, CircularQueues<E> circularQueues, LargeRecordHandler<E> largeRecordHandler, E e, AbstractInvokable abstractInvokable, long j) {
            super(exceptionHandler, "SortMerger Reading Thread", circularQueues, abstractInvokable);
            this.reader = mutableObjectIterator;
            this.readTarget = e;
            this.startSpillingBytes = j;
            this.largeRecords = largeRecordHandler;
        }

        /* JADX WARN: Code restructure failed: missing block: B:62:0x023b, code lost:
        
            if (r15 != false) goto L74;
         */
        /* JADX WARN: Code restructure failed: missing block: B:64:0x0242, code lost:
        
            if (isRunning() == false) goto L120;
         */
        /* JADX WARN: Code restructure failed: missing block: B:65:0x0245, code lost:
        
            r0 = r0.next(r8);
         */
        /* JADX WARN: Code restructure failed: missing block: B:66:0x024f, code lost:
        
            if (r0 == null) goto L122;
         */
        /* JADX WARN: Code restructure failed: missing block: B:67:0x0252, code lost:
        
            r8 = r0;
         */
        /* JADX WARN: Code restructure failed: missing block: B:68:0x025d, code lost:
        
            if (r0.write(r8) != false) goto L123;
         */
        /* JADX WARN: Code restructure failed: missing block: B:70:0x0260, code lost:
        
            r9 = r8;
         */
        /* JADX WARN: Code restructure failed: missing block: B:75:0x0266, code lost:
        
            if (r9 == null) goto L86;
         */
        /* JADX WARN: Code restructure failed: missing block: B:77:0x0271, code lost:
        
            if (org.apache.flink.runtime.operators.sort.UnilateralSortMerger.LOG.isDebugEnabled() == false) goto L89;
         */
        /* JADX WARN: Code restructure failed: missing block: B:78:0x0274, code lost:
        
            org.apache.flink.runtime.operators.sort.UnilateralSortMerger.LOG.debug("Emitting full buffer from reader thread: " + r10.id + ".");
         */
        /* JADX WARN: Code restructure failed: missing block: B:80:0x02d4, code lost:
        
            if (r0.isEmpty() != false) goto L92;
         */
        /* JADX WARN: Code restructure failed: missing block: B:81:0x02d7, code lost:
        
            r6.queues.sort.add(r10);
         */
        /* JADX WARN: Code restructure failed: missing block: B:82:0x02ff, code lost:
        
            r10 = null;
         */
        /* JADX WARN: Code restructure failed: missing block: B:85:0x02e9, code lost:
        
            r0.reset();
            r6.queues.empty.add(r10);
         */
        /* JADX WARN: Code restructure failed: missing block: B:86:0x029b, code lost:
        
            r13 = true;
         */
        /* JADX WARN: Code restructure failed: missing block: B:87:0x02a6, code lost:
        
            if (org.apache.flink.runtime.operators.sort.UnilateralSortMerger.LOG.isDebugEnabled() == false) goto L89;
         */
        /* JADX WARN: Code restructure failed: missing block: B:88:0x02a9, code lost:
        
            org.apache.flink.runtime.operators.sort.UnilateralSortMerger.LOG.debug("Emitting final buffer from reader thread: " + r10.id + ".");
         */
        @Override // org.apache.flink.runtime.operators.sort.UnilateralSortMerger.ThreadBase
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void go() throws java.io.IOException {
            /*
                Method dump skipped, instructions count: 812
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.runtime.operators.sort.UnilateralSortMerger.ReadingThread.go():void");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/runtime/operators/sort/UnilateralSortMerger$SortingThread.class */
    public static class SortingThread<E> extends ThreadBase<E> {
        private final IndexedSorter sorter;

        public SortingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> circularQueues, AbstractInvokable abstractInvokable) {
            super(exceptionHandler, "SortMerger sorting thread", circularQueues, abstractInvokable);
            this.sorter = new QuickSort();
        }

        @Override // org.apache.flink.runtime.operators.sort.UnilateralSortMerger.ThreadBase
        public void go() throws IOException {
            CircularElement<E> take;
            boolean z = true;
            while (isRunning() && z) {
                try {
                    take = this.queues.sort.take();
                } catch (InterruptedException e) {
                    if (!isRunning()) {
                        return;
                    }
                    if (UnilateralSortMerger.LOG.isErrorEnabled()) {
                        UnilateralSortMerger.LOG.error("Sorting thread was interrupted (without being shut down) while grabbing a buffer. Retrying to grab buffer...");
                    }
                }
                if (take == UnilateralSortMerger.EOF_MARKER || take == UnilateralSortMerger.SPILLING_MARKER) {
                    if (take == UnilateralSortMerger.EOF_MARKER) {
                        if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                            UnilateralSortMerger.LOG.debug("Sorting thread done.");
                        }
                        z = false;
                    }
                } else if (take.buffer.size() == 0) {
                    take.buffer.reset();
                    this.queues.empty.add(take);
                } else {
                    if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                        UnilateralSortMerger.LOG.debug("Sorting buffer " + take.id + ".");
                    }
                    this.sorter.sort(take.buffer);
                    if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                        UnilateralSortMerger.LOG.debug("Sorted buffer " + take.id + ".");
                    }
                }
                this.queues.spill.add(take);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/runtime/operators/sort/UnilateralSortMerger$SpillingThread.class */
    public class SpillingThread extends ThreadBase<E> {
        protected final MemoryManager memManager;
        protected final IOManager ioManager;
        protected final TypeSerializer<E> serializer;
        protected final TypeComparator<E> comparator;
        protected final List<MemorySegment> writeMemory;
        protected final List<MemorySegment> mergeReadMemory;
        protected final int maxFanIn;
        protected final int numWriteBuffersToCluster;

        public SpillingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> circularQueues, AbstractInvokable abstractInvokable, MemoryManager memoryManager, IOManager iOManager, TypeSerializer<E> typeSerializer, TypeComparator<E> typeComparator, List<MemorySegment> list, List<MemorySegment> list2, int i) {
            super(exceptionHandler, "SortMerger spilling thread", circularQueues, abstractInvokable);
            this.memManager = memoryManager;
            this.ioManager = iOManager;
            this.serializer = typeSerializer;
            this.comparator = typeComparator;
            this.mergeReadMemory = list;
            this.writeMemory = list2;
            this.maxFanIn = i;
            this.numWriteBuffersToCluster = list2.size() >= 4 ? list2.size() / 2 : 1;
        }

        @Override // org.apache.flink.runtime.operators.sort.UnilateralSortMerger.ThreadBase
        public void go() throws IOException {
            List<MemorySegment> list;
            List<MemorySegment> arrayList;
            CircularElement<E> takeNext;
            ArrayDeque arrayDeque = new ArrayDeque();
            boolean z = false;
            while (true) {
                if (!isRunning()) {
                    break;
                }
                try {
                    CircularElement<E> take = this.queues.spill.take();
                    if (take == UnilateralSortMerger.SPILLING_MARKER) {
                        break;
                    }
                    if (take == UnilateralSortMerger.EOF_MARKER) {
                        z = true;
                        break;
                    }
                    arrayDeque.add(take);
                } catch (InterruptedException e) {
                    throw new IOException("The spilling thread was interrupted.");
                }
            }
            if (isRunning()) {
                MutableObjectIterator<E> mutableObjectIterator = null;
                if (z && UnilateralSortMerger.this.largeRecordHandler != null && UnilateralSortMerger.this.largeRecordHandler.hasData()) {
                    ArrayList arrayList2 = new ArrayList();
                    while (true) {
                        CircularElement<E> poll = this.queues.empty.poll();
                        if (poll == null) {
                            break;
                        }
                        poll.buffer.dispose();
                        arrayList2.addAll(poll.memory);
                    }
                    if (arrayList2.isEmpty()) {
                        z = false;
                        UnilateralSortMerger.LOG.debug("Going to disk-based merge because of large records.");
                    } else {
                        UnilateralSortMerger.LOG.debug("Sorting large records, to add them to in-memory merge.");
                        mutableObjectIterator = UnilateralSortMerger.this.largeRecordHandler.finishWriteAndSortKeys(arrayList2);
                    }
                }
                if (z) {
                    if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                        UnilateralSortMerger.LOG.debug("Initiating in memory merge.");
                    }
                    ArrayList arrayList3 = new ArrayList(arrayDeque.size() + 1);
                    Iterator<E> it = arrayDeque.iterator();
                    while (it.hasNext()) {
                        arrayList3.add(((CircularElement) it.next()).buffer.getIterator());
                    }
                    if (mutableObjectIterator != null) {
                        arrayList3.add(mutableObjectIterator);
                    }
                    if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                        UnilateralSortMerger.LOG.debug("Releasing unused sort-buffer memory.");
                    }
                    disposeSortBuffers(true);
                    UnilateralSortMerger.this.setResultIterator(arrayList3.isEmpty() ? EmptyMutableObjectIterator.get() : arrayList3.size() == 1 ? (MutableObjectIterator) arrayList3.get(0) : new MergeIterator<>(arrayList3, this.comparator));
                    return;
                }
                FileIOChannel.Enumerator createChannelEnumerator = this.ioManager.createChannelEnumerator();
                List<ChannelWithBlockCount> arrayList4 = new ArrayList<>();
                while (isRunning()) {
                    try {
                        takeNext = takeNext(this.queues.spill, arrayDeque);
                    } catch (InterruptedException e2) {
                        if (!isRunning()) {
                            return;
                        } else {
                            UnilateralSortMerger.LOG.error("Sorting thread was interrupted (without being shut down) while grabbing a buffer. Retrying to grab buffer...");
                        }
                    }
                    if (!isRunning()) {
                        return;
                    }
                    if (takeNext == UnilateralSortMerger.EOF_MARKER) {
                        break;
                    }
                    FileIOChannel.ID next = createChannelEnumerator.next();
                    registerChannelToBeRemovedAtShudown(next);
                    BlockChannelWriter<MemorySegment> createBlockChannelWriter = this.ioManager.createBlockChannelWriter(next);
                    registerOpenChannelToBeRemovedAtShudown(createBlockChannelWriter);
                    ChannelWriterOutputView channelWriterOutputView = new ChannelWriterOutputView(createBlockChannelWriter, this.writeMemory, this.memManager.getPageSize());
                    if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                        UnilateralSortMerger.LOG.debug("Spilling buffer " + takeNext.id + ".");
                    }
                    takeNext.buffer.writeToOutput(channelWriterOutputView, UnilateralSortMerger.this.largeRecordHandler);
                    if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                        UnilateralSortMerger.LOG.debug("Spilled buffer " + takeNext.id + ".");
                    }
                    channelWriterOutputView.close();
                    unregisterOpenChannelToBeRemovedAtShudown(createBlockChannelWriter);
                    if (channelWriterOutputView.getBytesWritten() > 0) {
                        arrayList4.add(new ChannelWithBlockCount(next, channelWriterOutputView.getBlockCount()));
                    }
                    takeNext.buffer.reset();
                    this.queues.empty.add(takeNext);
                }
                if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                    UnilateralSortMerger.LOG.debug("Spilling done.");
                    UnilateralSortMerger.LOG.debug("Releasing sort-buffer memory.");
                }
                disposeSortBuffers(false);
                if (UnilateralSortMerger.this.largeRecordHandler == null || !UnilateralSortMerger.this.largeRecordHandler.hasData()) {
                    list = this.mergeReadMemory;
                } else {
                    if (arrayList4.isEmpty()) {
                        arrayList = this.mergeReadMemory;
                        list = Collections.emptyList();
                    } else {
                        int min = Math.min(this.maxFanIn, arrayList4.size());
                        int max = min * Math.max(2, Math.min(4, (this.mergeReadMemory.size() / 2) / min));
                        list = new ArrayList<>(max);
                        for (int i = 0; i < max; i++) {
                            list.add(this.mergeReadMemory.get(i));
                        }
                        arrayList = new ArrayList();
                        for (int i2 = max; i2 < this.mergeReadMemory.size(); i2++) {
                            arrayList.add(this.mergeReadMemory.get(i2));
                        }
                    }
                    if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                        UnilateralSortMerger.LOG.debug("Sorting keys for large records.");
                    }
                    mutableObjectIterator = UnilateralSortMerger.this.largeRecordHandler.finishWriteAndSortKeys(arrayList);
                }
                while (isRunning() && arrayList4.size() > this.maxFanIn) {
                    arrayList4 = mergeChannelList(arrayList4, list, this.writeMemory);
                }
                this.memManager.release(this.writeMemory);
                this.writeMemory.clear();
                if (!arrayList4.isEmpty()) {
                    if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                        UnilateralSortMerger.LOG.debug("Beginning final merge.");
                    }
                    List<List<MemorySegment>> arrayList5 = new ArrayList<>(arrayList4.size());
                    getSegmentsForReaders(arrayList5, list, arrayList4.size());
                    UnilateralSortMerger.this.setResultIterator(getMergingIterator(arrayList4, arrayList5, new ArrayList<>(arrayList4.size()), mutableObjectIterator));
                } else if (mutableObjectIterator == null) {
                    UnilateralSortMerger.this.setResultIterator(EmptyMutableObjectIterator.get());
                } else {
                    UnilateralSortMerger.this.setResultIterator(mutableObjectIterator);
                }
                if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                    UnilateralSortMerger.LOG.debug("Spilling and merging thread done.");
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public final void disposeSortBuffers(boolean z) {
            while (!this.queues.empty.isEmpty()) {
                try {
                    CircularElement<E> take = this.queues.empty.take();
                    take.buffer.dispose();
                    if (z) {
                        this.memManager.release(take.memory);
                    }
                } catch (InterruptedException e) {
                    if (!isRunning()) {
                        return;
                    } else {
                        UnilateralSortMerger.LOG.error("Spilling thread was interrupted (without being shut down) while collecting empty buffers to release them. Retrying to collect buffers...");
                    }
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public final CircularElement<E> takeNext(BlockingQueue<CircularElement<E>> blockingQueue, Queue<CircularElement<E>> queue) throws InterruptedException {
            return queue.isEmpty() ? blockingQueue.take() : queue.poll();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public final MergeIterator<E> getMergingIterator(List<ChannelWithBlockCount> list, List<List<MemorySegment>> list2, List<FileIOChannel> list3, MutableObjectIterator<E> mutableObjectIterator) throws IOException {
            if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                UnilateralSortMerger.LOG.debug("Performing merge of " + list.size() + " sorted streams.");
            }
            ArrayList arrayList = new ArrayList(list.size() + 1);
            for (int i = 0; i < list.size(); i++) {
                ChannelWithBlockCount channelWithBlockCount = list.get(i);
                List<MemorySegment> list4 = list2.get(i);
                BlockChannelReader<MemorySegment> createBlockChannelReader = this.ioManager.createBlockChannelReader(channelWithBlockCount.getChannel());
                list3.add(createBlockChannelReader);
                registerOpenChannelToBeRemovedAtShudown(createBlockChannelReader);
                unregisterChannelToBeRemovedAtShudown(channelWithBlockCount.getChannel());
                arrayList.add(new ChannelReaderInputViewIterator(new ChannelReaderInputView(createBlockChannelReader, list4, channelWithBlockCount.getBlockCount(), false), null, this.serializer));
            }
            if (mutableObjectIterator != null) {
                arrayList.add(mutableObjectIterator);
            }
            return new MergeIterator<>(arrayList, this.comparator);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public final List<ChannelWithBlockCount> mergeChannelList(List<ChannelWithBlockCount> list, List<MemorySegment> list2, List<MemorySegment> list3) throws IOException {
            double ceil = Math.ceil(Math.log(list.size()) / Math.log(this.maxFanIn)) - 1.0d;
            int size = list.size();
            int pow = (int) Math.pow(this.maxFanIn, ceil);
            int ceil2 = (int) Math.ceil((size - pow) / (this.maxFanIn - 1));
            int i = pow - ceil2;
            int i2 = size - i;
            ArrayList arrayList = new ArrayList(pow);
            arrayList.addAll(list.subList(0, i));
            int ceil3 = (int) Math.ceil(i2 / ceil2);
            ArrayList arrayList2 = new ArrayList(ceil3);
            getSegmentsForReaders(arrayList2, list2, ceil3);
            ArrayList arrayList3 = new ArrayList(ceil3);
            int i3 = i;
            while (isRunning() && i3 < list.size()) {
                arrayList3.clear();
                int i4 = 0;
                while (i4 < ceil3 && i3 < list.size()) {
                    arrayList3.add(list.get(i3));
                    i4++;
                    i3++;
                }
                arrayList.add(mergeChannels(arrayList3, arrayList2, list3));
            }
            return arrayList;
        }

        /* JADX WARN: Multi-variable type inference failed */
        protected ChannelWithBlockCount mergeChannels(List<ChannelWithBlockCount> list, List<List<MemorySegment>> list2, List<MemorySegment> list3) throws IOException {
            List<FileIOChannel> arrayList = new ArrayList<>(list.size());
            MergeIterator<E> mergingIterator = getMergingIterator(list, list2, arrayList, null);
            FileIOChannel.ID createChannel = this.ioManager.createChannel();
            registerChannelToBeRemovedAtShudown(createChannel);
            BlockChannelWriter<MemorySegment> createBlockChannelWriter = this.ioManager.createBlockChannelWriter(createChannel);
            registerOpenChannelToBeRemovedAtShudown(createBlockChannelWriter);
            ChannelWriterOutputView channelWriterOutputView = new ChannelWriterOutputView(createBlockChannelWriter, list3, this.memManager.getPageSize());
            if (!UnilateralSortMerger.this.objectReuseEnabled) {
                while (true) {
                    E next = mergingIterator.next();
                    if (next == null) {
                        break;
                    }
                    this.serializer.serialize(next, channelWriterOutputView);
                }
            } else {
                TypeSerializer<E> typeSerializer = this.serializer;
                E createInstance2 = typeSerializer.createInstance2();
                while (true) {
                    E next2 = mergingIterator.next(createInstance2);
                    createInstance2 = next2;
                    if (next2 == null) {
                        break;
                    }
                    typeSerializer.serialize(createInstance2, channelWriterOutputView);
                }
            }
            channelWriterOutputView.close();
            int blockCount = channelWriterOutputView.getBlockCount();
            unregisterOpenChannelToBeRemovedAtShudown(createBlockChannelWriter);
            for (int i = 0; i < arrayList.size(); i++) {
                FileIOChannel fileIOChannel = arrayList.get(i);
                fileIOChannel.closeAndDelete();
                unregisterOpenChannelToBeRemovedAtShudown(fileIOChannel);
            }
            return new ChannelWithBlockCount(createChannel, blockCount);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public final void getSegmentsForReaders(List<List<MemorySegment>> list, List<MemorySegment> list2, int i) {
            int size = list2.size();
            int i2 = size / i;
            int i3 = size % i;
            Iterator<MemorySegment> it = list2.iterator();
            for (int i4 = 0; i4 < i3; i4++) {
                ArrayList arrayList = new ArrayList(i2 + 1);
                list.add(arrayList);
                for (int i5 = i2; i5 >= 0; i5--) {
                    arrayList.add(it.next());
                }
            }
            for (int i6 = i3; i6 < i; i6++) {
                ArrayList arrayList2 = new ArrayList(i2);
                list.add(arrayList2);
                for (int i7 = i2; i7 > 0; i7--) {
                    arrayList2.add(it.next());
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void registerChannelToBeRemovedAtShudown(FileIOChannel.ID id) {
            UnilateralSortMerger.this.channelsToDeleteAtShutdown.add(id);
        }

        protected void unregisterChannelToBeRemovedAtShudown(FileIOChannel.ID id) {
            UnilateralSortMerger.this.channelsToDeleteAtShutdown.remove(id);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void registerOpenChannelToBeRemovedAtShudown(FileIOChannel fileIOChannel) {
            UnilateralSortMerger.this.openChannels.add(fileIOChannel);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void unregisterOpenChannelToBeRemovedAtShudown(FileIOChannel fileIOChannel) {
            UnilateralSortMerger.this.openChannels.remove(fileIOChannel);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/runtime/operators/sort/UnilateralSortMerger$ThreadBase.class */
    public static abstract class ThreadBase<E> extends Thread implements Thread.UncaughtExceptionHandler {
        protected final CircularQueues<E> queues;
        private final ExceptionHandler<IOException> exceptionHandler;
        private volatile boolean alive;

        protected ThreadBase(ExceptionHandler<IOException> exceptionHandler, String str, CircularQueues<E> circularQueues, AbstractInvokable abstractInvokable) {
            super(str);
            setDaemon(true);
            this.exceptionHandler = exceptionHandler;
            setUncaughtExceptionHandler(this);
            this.queues = circularQueues;
            this.alive = true;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                go();
            } catch (Throwable th) {
                internalHandleException(new IOException("Thread '" + getName() + "' terminated due to an exception: " + th.getMessage(), th));
            }
        }

        protected abstract void go() throws IOException;

        public boolean isRunning() {
            return this.alive;
        }

        public void shutdown() {
            this.alive = false;
            interrupt();
        }

        protected final void internalHandleException(IOException iOException) {
            if (isRunning() && this.exceptionHandler != null) {
                try {
                    this.exceptionHandler.handleException(iOException);
                } catch (Throwable th) {
                }
            }
        }

        @Override // java.lang.Thread.UncaughtExceptionHandler
        public void uncaughtException(Thread thread, Throwable th) {
            internalHandleException(new IOException("Thread '" + thread.getName() + "' terminated due to an uncaught exception: " + th.getMessage(), th));
        }
    }

    public UnilateralSortMerger(MemoryManager memoryManager, IOManager iOManager, MutableObjectIterator<E> mutableObjectIterator, AbstractInvokable abstractInvokable, TypeSerializerFactory<E> typeSerializerFactory, TypeComparator<E> typeComparator, double d, int i, float f, boolean z, boolean z2) throws IOException, MemoryAllocationException {
        this(memoryManager, iOManager, mutableObjectIterator, abstractInvokable, typeSerializerFactory, typeComparator, d, -1, i, f, z, z2);
    }

    public UnilateralSortMerger(MemoryManager memoryManager, IOManager iOManager, MutableObjectIterator<E> mutableObjectIterator, AbstractInvokable abstractInvokable, TypeSerializerFactory<E> typeSerializerFactory, TypeComparator<E> typeComparator, double d, int i, int i2, float f, boolean z, boolean z2) throws IOException, MemoryAllocationException {
        this(memoryManager, iOManager, (MutableObjectIterator) mutableObjectIterator, abstractInvokable, (TypeSerializerFactory) typeSerializerFactory, (TypeComparator) typeComparator, d, i, i2, f, false, z, z2);
    }

    public UnilateralSortMerger(MemoryManager memoryManager, List<MemorySegment> list, IOManager iOManager, MutableObjectIterator<E> mutableObjectIterator, AbstractInvokable abstractInvokable, TypeSerializerFactory<E> typeSerializerFactory, TypeComparator<E> typeComparator, int i, int i2, float f, boolean z, boolean z2) throws IOException {
        this(memoryManager, list, iOManager, (MutableObjectIterator) mutableObjectIterator, abstractInvokable, (TypeSerializerFactory) typeSerializerFactory, (TypeComparator) typeComparator, i, i2, f, false, z, z2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public UnilateralSortMerger(MemoryManager memoryManager, IOManager iOManager, MutableObjectIterator<E> mutableObjectIterator, AbstractInvokable abstractInvokable, TypeSerializerFactory<E> typeSerializerFactory, TypeComparator<E> typeComparator, double d, int i, int i2, float f, boolean z, boolean z2, boolean z3) throws IOException, MemoryAllocationException {
        this(memoryManager, memoryManager.allocatePages(abstractInvokable, memoryManager.computeNumberOfPages(d)), iOManager, mutableObjectIterator, abstractInvokable, typeSerializerFactory, typeComparator, i, i2, f, z, z2, z3);
    }

    protected UnilateralSortMerger(MemoryManager memoryManager, List<MemorySegment> list, IOManager iOManager, MutableObjectIterator<E> mutableObjectIterator, AbstractInvokable abstractInvokable, TypeSerializerFactory<E> typeSerializerFactory, TypeComparator<E> typeComparator, int i, int i2, float f, boolean z, boolean z2, boolean z3) throws IOException {
        this(memoryManager, list, iOManager, mutableObjectIterator, abstractInvokable, typeSerializerFactory, typeComparator, i, i2, f, z, z2, z3, new DefaultInMemorySorterFactory(typeSerializerFactory, typeComparator, 32));
    }

    protected UnilateralSortMerger(MemoryManager memoryManager, List<MemorySegment> list, IOManager iOManager, MutableObjectIterator<E> mutableObjectIterator, AbstractInvokable abstractInvokable, TypeSerializerFactory<E> typeSerializerFactory, TypeComparator<E> typeComparator, int i, int i2, float f, boolean z, boolean z2, boolean z3, InMemorySorterFactory<E> inMemorySorterFactory) throws IOException {
        int max;
        int max2;
        this.iteratorLock = new Object();
        if (memoryManager == null || ((iOManager == null && !z) || typeSerializerFactory == null || typeComparator == null)) {
            throw new NullPointerException();
        }
        if (abstractInvokable == null) {
            throw new NullPointerException("Parent Task must not be null.");
        }
        if (i2 < 2) {
            throw new IllegalArgumentException("Merger cannot work with less than two file handles.");
        }
        this.memoryManager = memoryManager;
        this.objectReuseEnabled = z3;
        int size = list.size();
        if (size < 12) {
            throw new IllegalArgumentException("Too little memory provided to sorter to perform task. Required are at least 12 pages. Current page size is " + memoryManager.getPageSize() + " bytes.");
        }
        if (!z || z2) {
            int i3 = (z ? 0 : 1) + (z2 ? 2 : 0);
            if (i2 + (i3 * 2) > size) {
                max = z ? 0 : 2;
                max2 = z2 ? 4 : 0;
                i2 = size - (i3 * 2);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Reducing maximal merge fan-in to " + i2 + " due to limited memory availability during merge");
                }
            } else {
                int i4 = size / (i3 * 100);
                if (i4 >= 4) {
                    max = z ? 0 : 4;
                    max2 = z2 ? 8 : 0;
                } else {
                    max = z ? 0 : Math.max(2, i4);
                    max2 = z2 ? Math.max(4, i4) : 0;
                }
            }
        } else {
            max = 0;
            max2 = 0;
        }
        int i5 = (size - max) - max2;
        long pageSize = i5 * memoryManager.getPageSize();
        i = i < 1 ? pageSize > 104857600 ? 2 : 1 : i;
        int i6 = i5 / i;
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Instantiating sorter with %d pages of sorting memory (=%d bytes total) divided over %d sort buffers (%d pages per buffer). Using %d buffers for writing sorted results and merging maximally %d streams at once. Using %d memory segments for large record spilling.", Integer.valueOf(i5), Long.valueOf(pageSize), Integer.valueOf(i), Integer.valueOf(i6), Integer.valueOf(max), Integer.valueOf(i2), Integer.valueOf(max2)));
        }
        this.sortReadMemory = list;
        this.writeMemory = new ArrayList(max);
        TypeSerializer<E> serializer = typeSerializerFactory.getSerializer();
        if (max > 0) {
            for (int i7 = 0; i7 < max; i7++) {
                this.writeMemory.add(this.sortReadMemory.remove(this.sortReadMemory.size() - 1));
            }
        }
        if (max2 > 0) {
            ArrayList arrayList = new ArrayList();
            for (int i8 = 0; i8 < max2; i8++) {
                arrayList.add(this.sortReadMemory.remove(this.sortReadMemory.size() - 1));
            }
            this.largeRecordHandler = new LargeRecordHandler<>(serializer, typeComparator.duplicate2(), iOManager, memoryManager, arrayList, abstractInvokable, i2);
        } else {
            this.largeRecordHandler = null;
        }
        CircularQueues<E> circularQueues = new CircularQueues<>();
        this.inMemorySorters = new ArrayList(i);
        Iterator<MemorySegment> it = this.sortReadMemory.iterator();
        int i9 = 0;
        while (i9 < i) {
            List<MemorySegment> arrayList2 = new ArrayList<>(i6);
            for (int i10 = i9 == i - 1 ? Integer.MAX_VALUE : i6; i10 > 0 && it.hasNext(); i10--) {
                arrayList2.add(it.next());
            }
            InMemorySorter<?> create = inMemorySorterFactory.create(arrayList2);
            this.inMemorySorters.add(create);
            circularQueues.empty.add(new CircularElement<>(i9, create, arrayList2));
            i9++;
        }
        ExceptionHandler<IOException> exceptionHandler = new ExceptionHandler<IOException>() { // from class: org.apache.flink.runtime.operators.sort.UnilateralSortMerger.1
            @Override // org.apache.flink.runtime.operators.sort.ExceptionHandler
            public void handleException(IOException iOException) {
                if (UnilateralSortMerger.this.closed) {
                    return;
                }
                UnilateralSortMerger.this.setResultIteratorException(iOException);
                UnilateralSortMerger.this.close();
            }
        };
        this.channelsToDeleteAtShutdown = new HashSet<>(64);
        this.openChannels = new HashSet<>(64);
        this.readThread = getReadingThread(exceptionHandler, mutableObjectIterator, circularQueues, this.largeRecordHandler, abstractInvokable, serializer, f * ((float) pageSize));
        this.sortThread = getSortingThread(exceptionHandler, circularQueues, abstractInvokable);
        this.spillThread = getSpillingThread(exceptionHandler, circularQueues, abstractInvokable, memoryManager, iOManager, typeSerializerFactory, typeComparator, this.sortReadMemory, this.writeMemory, i2);
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        if (contextClassLoader != null) {
            if (this.readThread != null) {
                this.readThread.setContextClassLoader(contextClassLoader);
            }
            if (this.sortThread != null) {
                this.sortThread.setContextClassLoader(contextClassLoader);
            }
            if (this.spillThread != null) {
                this.spillThread.setContextClassLoader(contextClassLoader);
            }
        }
        startThreads();
    }

    protected void startThreads() {
        if (this.readThread != null) {
            this.readThread.start();
        }
        if (this.sortThread != null) {
            this.sortThread.start();
        }
        if (this.spillThread != null) {
            this.spillThread.start();
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        boolean isEmpty;
        boolean isEmpty2;
        synchronized (this) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            try {
                synchronized (this.iteratorLock) {
                    if (this.iteratorException == null) {
                        this.iteratorException = new IOException("The sorter has been closed.");
                        this.iteratorLock.notifyAll();
                    }
                }
                if (this.readThread != null) {
                    try {
                        this.readThread.shutdown();
                    } catch (Throwable th) {
                        LOG.error("Error shutting down reader thread: " + th.getMessage(), th);
                    }
                }
                if (this.sortThread != null) {
                    try {
                        this.sortThread.shutdown();
                    } catch (Throwable th2) {
                        LOG.error("Error shutting down sorter thread: " + th2.getMessage(), th2);
                    }
                }
                if (this.spillThread != null) {
                    try {
                        this.spillThread.shutdown();
                    } catch (Throwable th3) {
                        LOG.error("Error shutting down spilling thread: " + th3.getMessage(), th3);
                    }
                }
                try {
                    if (this.readThread != null) {
                        this.readThread.join();
                    }
                    if (this.sortThread != null) {
                        this.sortThread.join();
                    }
                    if (this.spillThread != null) {
                        this.spillThread.join();
                    }
                } catch (InterruptedException e) {
                    LOG.debug("Closing of sort/merger was interrupted. The reading/sorting/spilling threads may still be working.", (Throwable) e);
                }
                while (true) {
                    if (isEmpty) {
                        break;
                    }
                }
                while (true) {
                    if (isEmpty2) {
                        try {
                            break;
                        } catch (Throwable th4) {
                            return;
                        }
                    }
                }
            } finally {
                Iterator<InMemorySorter<?>> it = this.inMemorySorters.iterator();
                while (it.hasNext()) {
                    it.next().dispose();
                }
                try {
                    if (!this.writeMemory.isEmpty()) {
                        this.memoryManager.release(this.writeMemory);
                    }
                    this.writeMemory.clear();
                } catch (Throwable th5) {
                }
                try {
                    if (!this.sortReadMemory.isEmpty()) {
                        this.memoryManager.release(this.sortReadMemory);
                    }
                    this.sortReadMemory.clear();
                } catch (Throwable th6) {
                }
                while (!this.openChannels.isEmpty()) {
                    try {
                        Iterator<FileIOChannel> it2 = this.openChannels.iterator();
                        while (it2.hasNext()) {
                            FileIOChannel next = it2.next();
                            it2.remove();
                            next.closeAndDelete();
                        }
                    } catch (Throwable th7) {
                    }
                }
                while (!this.channelsToDeleteAtShutdown.isEmpty()) {
                    try {
                        Iterator<FileIOChannel.ID> it3 = this.channelsToDeleteAtShutdown.iterator();
                        while (it3.hasNext()) {
                            FileIOChannel.ID next2 = it3.next();
                            it3.remove();
                            try {
                                File file = new File(next2.getPath());
                                if (file.exists()) {
                                    file.delete();
                                }
                            } catch (Throwable th8) {
                            }
                        }
                    } catch (Throwable th9) {
                    }
                }
                try {
                    if (this.largeRecordHandler != null) {
                        this.largeRecordHandler.close();
                    }
                } catch (Throwable th10) {
                }
            }
        }
    }

    protected ThreadBase<E> getReadingThread(ExceptionHandler<IOException> exceptionHandler, MutableObjectIterator<E> mutableObjectIterator, CircularQueues<E> circularQueues, LargeRecordHandler<E> largeRecordHandler, AbstractInvokable abstractInvokable, TypeSerializer<E> typeSerializer, long j) {
        return new ReadingThread(exceptionHandler, mutableObjectIterator, circularQueues, largeRecordHandler, typeSerializer.createInstance2(), abstractInvokable, j);
    }

    protected ThreadBase<E> getSortingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> circularQueues, AbstractInvokable abstractInvokable) {
        return new SortingThread(exceptionHandler, circularQueues, abstractInvokable);
    }

    protected ThreadBase<E> getSpillingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> circularQueues, AbstractInvokable abstractInvokable, MemoryManager memoryManager, IOManager iOManager, TypeSerializerFactory<E> typeSerializerFactory, TypeComparator<E> typeComparator, List<MemorySegment> list, List<MemorySegment> list2, int i) {
        return new SpillingThread(exceptionHandler, circularQueues, abstractInvokable, memoryManager, iOManager, typeSerializerFactory.getSerializer(), typeComparator, list, list2, i);
    }

    @Override // org.apache.flink.runtime.operators.sort.Sorter, org.apache.flink.runtime.operators.util.CloseableInputProvider
    public MutableObjectIterator<E> getIterator() throws InterruptedException {
        MutableObjectIterator<E> mutableObjectIterator;
        synchronized (this.iteratorLock) {
            while (this.iterator == null && this.iteratorException == null) {
                this.iteratorLock.wait();
            }
            if (this.iteratorException != null) {
                throw new RuntimeException("Error obtaining the sorted input: " + this.iteratorException.getMessage(), this.iteratorException);
            }
            mutableObjectIterator = this.iterator;
        }
        return mutableObjectIterator;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void setResultIterator(MutableObjectIterator<E> mutableObjectIterator) {
        synchronized (this.iteratorLock) {
            if (this.iteratorException == null) {
                this.iterator = mutableObjectIterator;
                this.iteratorLock.notifyAll();
            }
        }
    }

    protected final void setResultIteratorException(IOException iOException) {
        synchronized (this.iteratorLock) {
            if (this.iteratorException == null) {
                this.iteratorException = iOException;
                this.iteratorLock.notifyAll();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static <T> CircularElement<T> endMarker() {
        return (CircularElement<T>) EOF_MARKER;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static <T> CircularElement<T> spillingMarker() {
        return (CircularElement<T>) SPILLING_MARKER;
    }
}
