package org.apache.hadoop.fs.azurebfs.services;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.class */
public class AbfsOutputStream extends OutputStream implements Syncable, StreamCapabilities, IOStatisticsSource {
    private final AbfsClient client;
    private final String path;
    private long position;
    private boolean supportFlush;
    private boolean disableOutputStreamFlush;
    private final int bufferSize;
    private byte[] buffer;
    private CachedSASToken cachedSasToken;
    private final FileSystem.Statistics statistics;
    private final AbfsOutputStreamStatistics outputStreamStatistics;
    private IOStatistics ioStatistics;
    private static final Logger LOG = LoggerFactory.getLogger(AbfsOutputStream.class);
    private long lastTotalAppendOffset = 0;
    private final ElasticByteBufferPool byteBufferPool = new ElasticByteBufferPool();
    private boolean closed = false;
    private volatile IOException lastError = null;
    private long lastFlushOffset = 0;
    private int bufferIndex = 0;
    private ConcurrentLinkedDeque<WriteOperation> writeOperations = new ConcurrentLinkedDeque<>();
    private final int maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
    private final ThreadPoolExecutor threadExecutor = new ThreadPoolExecutor(this.maxConcurrentRequestCount, this.maxConcurrentRequestCount, 10, TimeUnit.SECONDS, new LinkedBlockingQueue());
    private final ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<>(this.threadExecutor);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream$WriteOperation.class */
    public static class WriteOperation {
        private final Future<Void> task;
        private final long startOffset;
        private final long length;

        WriteOperation(Future<Void> future, long j, long j2) {
            Preconditions.checkNotNull(future, "task");
            Preconditions.checkArgument(j >= 0, "startOffset");
            Preconditions.checkArgument(j2 >= 0, "length");
            this.task = future;
            this.startOffset = j;
            this.length = j2;
        }
    }

    public AbfsOutputStream(AbfsClient abfsClient, FileSystem.Statistics statistics, String str, long j, AbfsOutputStreamContext abfsOutputStreamContext) {
        this.client = abfsClient;
        this.statistics = statistics;
        this.path = str;
        this.position = j;
        this.supportFlush = abfsOutputStreamContext.isEnableFlush();
        this.disableOutputStreamFlush = abfsOutputStreamContext.isDisableOutputStreamFlush();
        this.bufferSize = abfsOutputStreamContext.getWriteBufferSize();
        this.buffer = this.byteBufferPool.getBuffer(false, this.bufferSize).array();
        this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics();
        this.cachedSasToken = new CachedSASToken(abfsOutputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
        if (this.outputStreamStatistics != null) {
            this.ioStatistics = this.outputStreamStatistics.getIOStatistics();
        }
    }

    public boolean hasCapability(String str) {
        return this.supportFlush && StoreImplementationUtils.isProbeForSyncable(str);
    }

    @Override // java.io.OutputStream
    public void write(int i) throws IOException {
        write(new byte[]{(byte) (i & 255)});
    }

    @Override // java.io.OutputStream
    public synchronized void write(byte[] bArr, int i, int i2) throws IOException {
        int i3;
        maybeThrowLastError();
        Preconditions.checkArgument(bArr != null, "null data");
        if (i < 0 || i2 < 0 || i2 > bArr.length - i) {
            throw new IndexOutOfBoundsException();
        }
        int i4 = i;
        int i5 = this.bufferSize - this.bufferIndex;
        int i6 = i2;
        while (i6 > 0) {
            if (i5 <= i6) {
                System.arraycopy(bArr, i4, this.buffer, this.bufferIndex, i5);
                this.bufferIndex += i5;
                writeCurrentBufferToService();
                i4 += i5;
                i3 = i6 - i5;
            } else {
                System.arraycopy(bArr, i4, this.buffer, this.bufferIndex, i6);
                this.bufferIndex += i6;
                i3 = 0;
            }
            i6 = i3;
            i5 = this.bufferSize - this.bufferIndex;
        }
        incrementWriteOps();
    }

    private void incrementWriteOps() {
        if (this.statistics != null) {
            this.statistics.incrementWriteOps(1);
        }
    }

    private void maybeThrowLastError() throws IOException {
        if (this.lastError != null) {
            throw this.lastError;
        }
    }

    @Override // java.io.OutputStream, java.io.Flushable
    public void flush() throws IOException {
        if (this.disableOutputStreamFlush) {
            return;
        }
        flushInternalAsync();
    }

    public void hsync() throws IOException {
        if (this.supportFlush) {
            flushInternal(false);
        }
    }

    public void hflush() throws IOException {
        if (this.supportFlush) {
            flushInternal(false);
        }
    }

    @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() throws IOException {
        try {
            if (this.closed) {
                return;
            }
            try {
                flushInternal(true);
                this.threadExecutor.shutdown();
                this.lastError = new IOException("Stream is closed!");
                this.buffer = null;
                this.bufferIndex = 0;
                this.closed = true;
                this.writeOperations.clear();
                if (!this.threadExecutor.isShutdown()) {
                    this.threadExecutor.shutdownNow();
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Closing AbfsOutputStream: {}", toString());
                }
            } catch (IOException e) {
                throw IOUtils.wrapException(this.path, e.getMessage(), e);
            }
        } catch (Throwable th) {
            this.lastError = new IOException("Stream is closed!");
            this.buffer = null;
            this.bufferIndex = 0;
            this.closed = true;
            this.writeOperations.clear();
            if (!this.threadExecutor.isShutdown()) {
                this.threadExecutor.shutdownNow();
            }
            throw th;
        }
    }

    private synchronized void flushInternal(boolean z) throws IOException {
        maybeThrowLastError();
        writeCurrentBufferToService();
        flushWrittenBytesToService(z);
    }

    private synchronized void flushInternalAsync() throws IOException {
        maybeThrowLastError();
        writeCurrentBufferToService();
        flushWrittenBytesToServiceAsync();
    }

    private synchronized void writeCurrentBufferToService() throws IOException {
        if (this.bufferIndex == 0) {
            return;
        }
        byte[] bArr = this.buffer;
        int i = this.bufferIndex;
        if (this.outputStreamStatistics != null) {
            this.outputStreamStatistics.writeCurrentBuffer();
            this.outputStreamStatistics.bytesToUpload(i);
        }
        this.buffer = this.byteBufferPool.getBuffer(false, this.bufferSize).array();
        this.bufferIndex = 0;
        long j = this.position;
        this.position += i;
        if (this.threadExecutor.getQueue().size() >= this.maxConcurrentRequestCount) {
            if (this.outputStreamStatistics != null) {
                DurationTracker timeSpentTaskWait = this.outputStreamStatistics.timeSpentTaskWait();
                Throwable th = null;
                try {
                    try {
                        waitForTaskToComplete();
                        if (timeSpentTaskWait != null) {
                            if (0 != 0) {
                                try {
                                    timeSpentTaskWait.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                timeSpentTaskWait.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (timeSpentTaskWait != null) {
                        if (th != null) {
                            try {
                                timeSpentTaskWait.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            timeSpentTaskWait.close();
                        }
                    }
                    throw th3;
                }
            } else {
                waitForTaskToComplete();
            }
        }
        Future<Void> submit = this.completionService.submit(IOStatisticsBinding.trackDurationOfCallable(this.ioStatistics, "time_spent_on_put_request", () -> {
            AbfsPerfInfo abfsPerfInfo = new AbfsPerfInfo(this.client.getAbfsPerfTracker(), "writeCurrentBufferToService", AbfsHttpConstants.APPEND_ACTION);
            Throwable th5 = null;
            try {
                try {
                    AbfsRestOperation append = this.client.append(this.path, j, bArr, 0, i, this.cachedSasToken.get());
                    this.cachedSasToken.update(append.getSasToken());
                    abfsPerfInfo.registerResult(append.getResult());
                    this.byteBufferPool.putBuffer(ByteBuffer.wrap(bArr));
                    abfsPerfInfo.registerSuccess(true);
                    if (abfsPerfInfo != null) {
                        if (0 != 0) {
                            try {
                                abfsPerfInfo.close();
                            } catch (Throwable th6) {
                                th5.addSuppressed(th6);
                            }
                        } else {
                            abfsPerfInfo.close();
                        }
                    }
                    return null;
                } finally {
                }
            } catch (Throwable th7) {
                if (abfsPerfInfo != null) {
                    if (th5 != null) {
                        try {
                            abfsPerfInfo.close();
                        } catch (Throwable th8) {
                            th5.addSuppressed(th8);
                        }
                    } else {
                        abfsPerfInfo.close();
                    }
                }
                throw th7;
            }
        }));
        if (this.outputStreamStatistics != null) {
            if (submit.isCancelled()) {
                this.outputStreamStatistics.uploadFailed(i);
            } else {
                this.outputStreamStatistics.uploadSuccessful(i);
            }
        }
        this.writeOperations.add(new WriteOperation(submit, j, i));
        shrinkWriteOperationQueue();
    }

    private synchronized void flushWrittenBytesToService(boolean z) throws IOException {
        Iterator<WriteOperation> it = this.writeOperations.iterator();
        while (it.hasNext()) {
            try {
                it.next().task.get();
            } catch (Exception e) {
                e = e;
                if ((e.getCause() instanceof AbfsRestOperationException) && ((AbfsRestOperationException) e.getCause()).getStatusCode() == 404) {
                    throw new FileNotFoundException(e.getMessage());
                }
                if (e.getCause() instanceof AzureBlobFileSystemException) {
                    e = (AzureBlobFileSystemException) e.getCause();
                }
                this.lastError = new IOException(e);
                throw this.lastError;
            }
        }
        flushWrittenBytesToServiceInternal(this.position, false, z);
    }

    private synchronized void flushWrittenBytesToServiceAsync() throws IOException {
        shrinkWriteOperationQueue();
        if (this.lastTotalAppendOffset > this.lastFlushOffset) {
            flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true, false);
        }
    }

    private synchronized void flushWrittenBytesToServiceInternal(long j, boolean z, boolean z2) throws IOException {
        try {
            AbfsPerfInfo abfsPerfInfo = new AbfsPerfInfo(this.client.getAbfsPerfTracker(), "flushWrittenBytesToServiceInternal", AbfsHttpConstants.FLUSH_ACTION);
            Throwable th = null;
            try {
                try {
                    AbfsRestOperation flush = this.client.flush(this.path, j, z, z2, this.cachedSasToken.get());
                    this.cachedSasToken.update(flush.getSasToken());
                    abfsPerfInfo.registerResult(flush.getResult()).registerSuccess(true);
                    if (abfsPerfInfo != null) {
                        if (0 != 0) {
                            try {
                                abfsPerfInfo.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            abfsPerfInfo.close();
                        }
                    }
                    this.lastFlushOffset = j;
                } finally {
                }
            } finally {
            }
        } catch (AzureBlobFileSystemException e) {
            if (!(e instanceof AbfsRestOperationException) || ((AbfsRestOperationException) e).getStatusCode() != 404) {
                throw new IOException(e);
            }
            throw new FileNotFoundException(e.getMessage());
        }
    }

    private synchronized void shrinkWriteOperationQueue() throws IOException {
        while (this.writeOperations.peek() != null && this.writeOperations.peek().task.isDone()) {
            try {
                this.writeOperations.peek().task.get();
                this.lastTotalAppendOffset += this.writeOperations.peek().length;
                this.writeOperations.remove();
                if (this.outputStreamStatistics != null) {
                    this.outputStreamStatistics.queueShrunk();
                }
            } catch (Exception e) {
                if (e.getCause() instanceof AzureBlobFileSystemException) {
                    this.lastError = (AzureBlobFileSystemException) e.getCause();
                } else {
                    this.lastError = new IOException(e);
                }
                throw this.lastError;
            }
        }
    }

    private void waitForTaskToComplete() throws IOException {
        boolean z;
        boolean z2 = false;
        while (true) {
            z = z2;
            if (this.completionService.poll() == null) {
                break;
            } else {
                z2 = true;
            }
        }
        if (z) {
            return;
        }
        try {
            this.completionService.take();
        } catch (InterruptedException e) {
            this.lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e);
            throw this.lastError;
        }
    }

    @VisibleForTesting
    public synchronized void waitForPendingUploads() throws IOException {
        waitForTaskToComplete();
    }

    @VisibleForTesting
    public AbfsOutputStreamStatistics getOutputStreamStatistics() {
        return this.outputStreamStatistics;
    }

    @VisibleForTesting
    public int getWriteOperationsSize() {
        return this.writeOperations.size();
    }

    public IOStatistics getIOStatistics() {
        return this.ioStatistics;
    }

    public String toString() {
        StringBuilder sb = new StringBuilder(super.toString());
        sb.append("AbfsOutputStream@").append(hashCode());
        if (this.outputStreamStatistics != null) {
            sb.append("){");
            sb.append(this.outputStreamStatistics.toString());
            sb.append("}");
        }
        return sb.toString();
    }
}
