package org.apache.hadoop.mapreduce.task.reduce;

import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.security.GeneralSecurityException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import javax.crypto.SecretKey;
import javax.net.ssl.HttpsURLConnection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.util.ProcessTree;
import org.apache.hadoop.security.ssl.SSLFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:classes/org/apache/hadoop/mapreduce/task/reduce/Fetcher.class
 */
/* loaded from: input_file:hadoop-mapreduce-client-core-2.5.2.jar:org/apache/hadoop/mapreduce/task/reduce/Fetcher.class */
class Fetcher<K, V> extends Thread {
    private static final int DEFAULT_STALLED_COPY_TIMEOUT = 180000;
    private static final int UNIT_CONNECT_TIMEOUT = 60000;
    private static final int DEFAULT_READ_TIMEOUT = 180000;
    protected final Reporter reporter;
    private static final String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
    private final Counters.Counter connectionErrs;
    private final Counters.Counter ioErrs;
    private final Counters.Counter wrongLengthErrs;
    private final Counters.Counter badIdErrs;
    private final Counters.Counter wrongMapErrs;
    private final Counters.Counter wrongReduceErrs;
    protected final MergeManager<K, V> merger;
    protected final ShuffleSchedulerImpl<K, V> scheduler;
    protected final ShuffleClientMetrics metrics;
    protected final ExceptionReporter exceptionReporter;
    protected final int id;
    protected final int reduce;
    private final int connectionTimeout;
    private final int readTimeout;
    private final SecretKey shuffleSecretKey;
    protected HttpURLConnection connection;
    private volatile boolean stopped;
    private static boolean sslShuffle;
    private static SSLFactory sslFactory;
    private static final Log LOG = LogFactory.getLog(Fetcher.class);
    private static int nextId = 0;
    private static TaskAttemptID[] EMPTY_ATTEMPT_ID_ARRAY = new TaskAttemptID[0];

    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/hadoop/mapreduce/task/reduce/Fetcher$ShuffleErrors.class
     */
    /* loaded from: input_file:hadoop-mapreduce-client-core-2.5.2.jar:org/apache/hadoop/mapreduce/task/reduce/Fetcher$ShuffleErrors.class */
    private enum ShuffleErrors {
        IO_ERROR,
        WRONG_LENGTH,
        BAD_ID,
        WRONG_MAP,
        CONNECTION,
        WRONG_REDUCE
    }

    /* JADX WARN: Illegal instructions before constructor call */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public Fetcher(org.apache.hadoop.mapred.JobConf r13, org.apache.hadoop.mapreduce.TaskAttemptID r14, org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl<K, V> r15, org.apache.hadoop.mapreduce.task.reduce.MergeManager<K, V> r16, org.apache.hadoop.mapred.Reporter r17, org.apache.hadoop.mapreduce.task.reduce.ShuffleClientMetrics r18, org.apache.hadoop.mapreduce.task.reduce.ExceptionReporter r19, javax.crypto.SecretKey r20) {
        /*
            r12 = this;
            r0 = r12
            r1 = r13
            r2 = r14
            r3 = r15
            r4 = r16
            r5 = r17
            r6 = r18
            r7 = r19
            r8 = r20
            int r9 = org.apache.hadoop.mapreduce.task.reduce.Fetcher.nextId
            r10 = 1
            int r9 = r9 + r10
            r10 = r9
            org.apache.hadoop.mapreduce.task.reduce.Fetcher.nextId = r10
            r0.<init>(r1, r2, r3, r4, r5, r6, r7, r8, r9)
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.hadoop.mapreduce.task.reduce.Fetcher.<init>(org.apache.hadoop.mapred.JobConf, org.apache.hadoop.mapreduce.TaskAttemptID, org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl, org.apache.hadoop.mapreduce.task.reduce.MergeManager, org.apache.hadoop.mapred.Reporter, org.apache.hadoop.mapreduce.task.reduce.ShuffleClientMetrics, org.apache.hadoop.mapreduce.task.reduce.ExceptionReporter, javax.crypto.SecretKey):void");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public Fetcher(JobConf jobConf, TaskAttemptID taskAttemptID, ShuffleSchedulerImpl<K, V> shuffleSchedulerImpl, MergeManager<K, V> mergeManager, Reporter reporter, ShuffleClientMetrics shuffleClientMetrics, ExceptionReporter exceptionReporter, SecretKey secretKey, int i) {
        this.stopped = false;
        this.reporter = reporter;
        this.scheduler = shuffleSchedulerImpl;
        this.merger = mergeManager;
        this.metrics = shuffleClientMetrics;
        this.exceptionReporter = exceptionReporter;
        this.id = i;
        this.reduce = taskAttemptID.getTaskID().getId();
        this.shuffleSecretKey = secretKey;
        this.ioErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.IO_ERROR.toString());
        this.wrongLengthErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.WRONG_LENGTH.toString());
        this.badIdErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.BAD_ID.toString());
        this.wrongMapErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.WRONG_MAP.toString());
        this.connectionErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.CONNECTION.toString());
        this.wrongReduceErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.WRONG_REDUCE.toString());
        this.connectionTimeout = jobConf.getInt(MRJobConfig.SHUFFLE_CONNECT_TIMEOUT, 180000);
        this.readTimeout = jobConf.getInt(MRJobConfig.SHUFFLE_READ_TIMEOUT, 180000);
        setName("fetcher#" + i);
        setDaemon(true);
        synchronized (Fetcher.class) {
            sslShuffle = jobConf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, false);
            if (sslShuffle && sslFactory == null) {
                sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, jobConf);
                try {
                    sslFactory.init();
                } catch (Exception e) {
                    sslFactory.destroy();
                    throw new RuntimeException(e);
                }
            }
        }
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (!this.stopped && !Thread.currentThread().isInterrupted()) {
            try {
                MapHost mapHost = null;
                try {
                    this.merger.waitForResource();
                    mapHost = this.scheduler.getHost();
                    this.metrics.threadBusy();
                    copyFromHost(mapHost);
                    if (mapHost != null) {
                        this.scheduler.freeHost(mapHost);
                        this.metrics.threadFree();
                    }
                } catch (Throwable th) {
                    if (mapHost != null) {
                        this.scheduler.freeHost(mapHost);
                        this.metrics.threadFree();
                    }
                    throw th;
                }
            } catch (InterruptedException e) {
                return;
            } catch (Throwable th2) {
                this.exceptionReporter.reportException(th2);
                return;
            }
        }
    }

    @Override // java.lang.Thread
    public void interrupt() {
        try {
            closeConnection();
            super.interrupt();
        } catch (Throwable th) {
            super.interrupt();
            throw th;
        }
    }

    public void shutDown() throws InterruptedException {
        this.stopped = true;
        interrupt();
        try {
            join(ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
        } catch (InterruptedException e) {
            LOG.warn("Got interrupt while joining " + getName(), e);
        }
        if (sslFactory != null) {
            sslFactory.destroy();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @VisibleForTesting
    public synchronized void openConnection(URL url) throws IOException {
        HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
        if (sslShuffle) {
            HttpsURLConnection httpsURLConnection = (HttpsURLConnection) httpURLConnection;
            try {
                httpsURLConnection.setSSLSocketFactory(sslFactory.createSSLSocketFactory());
                httpsURLConnection.setHostnameVerifier(sslFactory.getHostnameVerifier());
            } catch (GeneralSecurityException e) {
                throw new IOException(e);
            }
        }
        this.connection = httpURLConnection;
    }

    protected synchronized void closeConnection() {
        if (this.connection != null) {
            this.connection.disconnect();
        }
    }

    private void abortConnect(MapHost mapHost, Set<TaskAttemptID> set) {
        Iterator<TaskAttemptID> it = set.iterator();
        while (it.hasNext()) {
            this.scheduler.putBackKnownMapOutput(mapHost, it.next());
        }
        closeConnection();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @VisibleForTesting
    public void copyFromHost(MapHost mapHost) throws IOException {
        List<TaskAttemptID> mapsForHost = this.scheduler.getMapsForHost(mapHost);
        if (mapsForHost.size() == 0) {
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Fetcher " + this.id + " going to fetch from " + mapHost + " for: " + mapsForHost);
        }
        HashSet hashSet = new HashSet(mapsForHost);
        try {
            URL mapOutputURL = getMapOutputURL(mapHost, mapsForHost);
            openConnection(mapOutputURL);
            if (this.stopped) {
                abortConnect(mapHost, hashSet);
                return;
            }
            String buildMsgFrom = SecureShuffleUtils.buildMsgFrom(mapOutputURL);
            String hashFromString = SecureShuffleUtils.hashFromString(buildMsgFrom, this.shuffleSecretKey);
            this.connection.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, hashFromString);
            this.connection.setReadTimeout(this.readTimeout);
            this.connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
            this.connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
            connect(this.connection, this.connectionTimeout);
            if (this.stopped) {
                abortConnect(mapHost, hashSet);
                return;
            }
            DataInputStream dataInputStream = new DataInputStream(this.connection.getInputStream());
            int responseCode = this.connection.getResponseCode();
            if (responseCode != 200) {
                throw new IOException("Got invalid response code " + responseCode + " from " + mapOutputURL + ": " + this.connection.getResponseMessage());
            }
            if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(this.connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)) || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(this.connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))) {
                throw new IOException("Incompatible shuffle response version");
            }
            String headerField = this.connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
            if (headerField == null) {
                throw new IOException("security validation of TT Map output failed");
            }
            LOG.debug("url=" + buildMsgFrom + ";encHash=" + hashFromString + ";replyHash=" + headerField);
            SecureShuffleUtils.verifyReply(headerField, hashFromString, this.shuffleSecretKey);
            LOG.info("for url=" + buildMsgFrom + " sent hash and received reply");
            TaskAttemptID[] taskAttemptIDArr = null;
            while (!hashSet.isEmpty() && taskAttemptIDArr == null) {
                try {
                    taskAttemptIDArr = copyMapOutput(mapHost, dataInputStream, hashSet);
                } catch (Throwable th) {
                    if (dataInputStream != null) {
                        IOUtils.cleanup(LOG, new Closeable[]{dataInputStream});
                    }
                    Iterator<TaskAttemptID> it = hashSet.iterator();
                    while (it.hasNext()) {
                        this.scheduler.putBackKnownMapOutput(mapHost, it.next());
                    }
                    throw th;
                }
            }
            if (taskAttemptIDArr != null && taskAttemptIDArr.length > 0) {
                LOG.warn("copyMapOutput failed for tasks " + Arrays.toString(taskAttemptIDArr));
                for (TaskAttemptID taskAttemptID : taskAttemptIDArr) {
                    this.scheduler.copyFailed(taskAttemptID, mapHost, true, false);
                }
            }
            if (taskAttemptIDArr == null && !hashSet.isEmpty()) {
                throw new IOException("server didn't return all expected map outputs: " + hashSet.size() + " left.");
            }
            dataInputStream.close();
            if (0 != 0) {
                IOUtils.cleanup(LOG, new Closeable[]{null});
            }
            Iterator<TaskAttemptID> it2 = hashSet.iterator();
            while (it2.hasNext()) {
                this.scheduler.putBackKnownMapOutput(mapHost, it2.next());
            }
        } catch (IOException e) {
            boolean z = e instanceof ConnectException;
            this.ioErrs.increment(1L);
            LOG.warn("Failed to connect to " + mapHost + " with " + hashSet.size() + " map outputs", e);
            Iterator<TaskAttemptID> it3 = hashSet.iterator();
            while (it3.hasNext()) {
                this.scheduler.copyFailed(it3.next(), mapHost, false, z);
            }
            Iterator<TaskAttemptID> it4 = hashSet.iterator();
            while (it4.hasNext()) {
                this.scheduler.putBackKnownMapOutput(mapHost, it4.next());
            }
        }
    }

    private TaskAttemptID[] copyMapOutput(MapHost mapHost, DataInputStream dataInputStream, Set<TaskAttemptID> set) {
        MapOutput<K, V> mapOutput = null;
        TaskAttemptID taskAttemptID = null;
        long j = -1;
        long j2 = -1;
        try {
            long currentTimeMillis = System.currentTimeMillis();
            try {
                ShuffleHeader shuffleHeader = new ShuffleHeader();
                shuffleHeader.readFields(dataInputStream);
                taskAttemptID = TaskAttemptID.forName(shuffleHeader.mapId);
                j2 = shuffleHeader.compressedLength;
                j = shuffleHeader.uncompressedLength;
                if (!verifySanity(j2, j, shuffleHeader.forReduce, set, taskAttemptID)) {
                    return new TaskAttemptID[]{taskAttemptID};
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("header: " + taskAttemptID + ", len: " + j2 + ", decomp len: " + j);
                }
                try {
                    mapOutput = this.merger.reserve(taskAttemptID, j, this.id);
                    if (mapOutput == null) {
                        LOG.info("fetcher#" + this.id + " - MergeManager returned status WAIT ...");
                        return EMPTY_ATTEMPT_ID_ARRAY;
                    }
                    try {
                        LOG.info("fetcher#" + this.id + " about to shuffle output of map " + mapOutput.getMapId() + " decomp: " + j + " len: " + j2 + " to " + mapOutput.getDescription());
                        mapOutput.shuffle(mapHost, dataInputStream, j2, j, this.metrics, this.reporter);
                        this.scheduler.copySucceeded(taskAttemptID, mapHost, j2, System.currentTimeMillis() - currentTimeMillis, mapOutput);
                        set.remove(taskAttemptID);
                        this.metrics.successFetch();
                        return null;
                    } catch (InternalError e) {
                        LOG.warn("Failed to shuffle for fetcher#" + this.id, e);
                        throw new IOException(e);
                    }
                } catch (IOException e2) {
                    this.ioErrs.increment(1L);
                    this.scheduler.reportLocalError(e2);
                    return EMPTY_ATTEMPT_ID_ARRAY;
                }
            } catch (IllegalArgumentException e3) {
                this.badIdErrs.increment(1L);
                LOG.warn("Invalid map id ", e3);
                return (TaskAttemptID[]) set.toArray(new TaskAttemptID[set.size()]);
            }
        } catch (IOException e4) {
            this.ioErrs.increment(1L);
            if (taskAttemptID == null || mapOutput == null) {
                LOG.info("fetcher#" + this.id + " failed to read map header" + taskAttemptID + " decomp: " + j + ", " + j2, e4);
                return taskAttemptID == null ? (TaskAttemptID[]) set.toArray(new TaskAttemptID[set.size()]) : new TaskAttemptID[]{taskAttemptID};
            }
            LOG.warn("Failed to shuffle output of " + taskAttemptID + " from " + mapHost.getHostName(), e4);
            mapOutput.abort();
            this.metrics.failedFetch();
            return new TaskAttemptID[]{taskAttemptID};
        }
    }

    private boolean verifySanity(long j, long j2, int i, Set<TaskAttemptID> set, TaskAttemptID taskAttemptID) {
        if (j < 0 || j2 < 0) {
            this.wrongLengthErrs.increment(1L);
            LOG.warn(getName() + " invalid lengths in map output header: id: " + taskAttemptID + " len: " + j + ", decomp len: " + j2);
            return false;
        }
        if (i != this.reduce) {
            this.wrongReduceErrs.increment(1L);
            LOG.warn(getName() + " data for the wrong reduce map: " + taskAttemptID + " len: " + j + " decomp len: " + j2 + " for reduce " + i);
            return false;
        }
        if (set.contains(taskAttemptID)) {
            return true;
        }
        this.wrongMapErrs.increment(1L);
        LOG.warn("Invalid map-output! Received output for " + taskAttemptID);
        return false;
    }

    private URL getMapOutputURL(MapHost mapHost, List<TaskAttemptID> list) throws MalformedURLException {
        StringBuffer stringBuffer = new StringBuffer(mapHost.getBaseUrl());
        boolean z = true;
        for (TaskAttemptID taskAttemptID : list) {
            if (!z) {
                stringBuffer.append(",");
            }
            stringBuffer.append(taskAttemptID);
            z = false;
        }
        LOG.debug("MapOutput URL for " + mapHost + " -> " + stringBuffer.toString());
        return new URL(stringBuffer.toString());
    }

    private void connect(URLConnection uRLConnection, int i) throws IOException {
        int i2 = 0;
        if (i < 0) {
            throw new IOException("Invalid timeout [timeout = " + i + " ms]");
        }
        if (i > 0) {
            i2 = Math.min(60000, i);
        }
        uRLConnection.setConnectTimeout(i2);
        while (true) {
            try {
                uRLConnection.connect();
                return;
            } catch (IOException e) {
                i -= i2;
                if (i == 0) {
                    throw e;
                }
                if (i < i2) {
                    i2 = i;
                    uRLConnection.setConnectTimeout(i2);
                }
            }
        }
    }
}
