package ml.shifu.guagua.yarn;

import com.google.common.collect.Maps;
import java.io.IOException;
import java.lang.Thread;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import ml.shifu.guagua.yarn.util.GsonUtils;
import ml.shifu.guagua.yarn.util.YarnUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.serialization.ClassResolvers;
import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ml/shifu/guagua/yarn/GuaguaAppMaster.class */
public class GuaguaAppMaster {
    private static final int YARN_ABORT_EXIT_STATUS = -100;
    private static final int YARN_SUCCESS_EXIT_STATUS = 0;
    private static final int SLEEP_BETWEEN_HEARTBEATS_MSECS = 900;
    private ContainerId masterContainerId;
    private ApplicationAttemptId appAttemptId;
    private volatile boolean done;
    private Configuration yarnConf;
    private AtomicInteger completedCount;
    private AtomicInteger failedCount;
    private AtomicInteger allocatedCount;
    private AtomicInteger successfulCount;
    private int containersToLaunch;
    private ExecutorService executor;
    private ExecutorService taskTimeoutExecutor;
    private long taskTimeOut;
    private int heapPerContainer;
    private AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient;
    private NMClientAsync nmClientAsync;
    private NMCallbackHandler containerListener;
    private static Map<String, LocalResource> localResources;
    private String appMasterHostname;
    private int appMasterRpcPort = 1234;
    private String appMasterTrackingUrl = "";
    private String containerArgs;
    private List<InputSplit> inputSplits;
    private ApplicationId appId;
    private Map<Integer, List<Container>> partitionContainerMap;
    private Map<String, Integer> containerPartitionMap;
    private Map<Integer, PartitionStatus> partitionStatusMap;
    private List<Integer> failedPartitions;
    private AtomicInteger partitionIndex;
    private int maxContainerAttempts;
    private int totalIterations;
    private ByteBuffer allTokens;
    private int rpcPort;
    private String rpcHostName;
    private Map<Integer, GuaguaIterationStatus> partitionProgress;
    private ServerBootstrap rpcServer;
    private static final Logger LOG = LoggerFactory.getLogger(GuaguaAppMaster.class);
    private static final Object LOCK = new Object();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ml/shifu/guagua/yarn/GuaguaAppMaster$LaunchContainerRunnable.class */
    public class LaunchContainerRunnable implements Runnable {
        private Container container;
        private NMCallbackHandler containerListener;
        private final int partition;

        public LaunchContainerRunnable(Container container, NMCallbackHandler nMCallbackHandler, int i) {
            this.container = container;
            this.containerListener = nMCallbackHandler;
            this.partition = i;
        }

        @Override // java.lang.Runnable
        public void run() {
            ContainerLaunchContext buildContainerLaunchContext = buildContainerLaunchContext();
            this.containerListener.addContainer(this.container.getId(), this.container);
            GuaguaAppMaster.this.getNmClientAsync().startContainerAsync(this.container, buildContainerLaunchContext);
        }

        private ContainerLaunchContext buildContainerLaunchContext() {
            GuaguaAppMaster.LOG.info("Setting up container launch container for containerid={}", this.container.getId());
            ContainerLaunchContext containerLaunchContext = (ContainerLaunchContext) Records.newRecord(ContainerLaunchContext.class);
            List<String> generateShellExecCommand = generateShellExecCommand();
            GuaguaAppMaster.LOG.info("Conatain launch Commands :{}" + generateShellExecCommand);
            containerLaunchContext.setCommands(generateShellExecCommand);
            containerLaunchContext.setTokens(GuaguaAppMaster.this.allTokens.slice());
            buildEnvironment(containerLaunchContext);
            containerLaunchContext.setLocalResources(GuaguaAppMaster.this.getTaskResourceMap());
            return containerLaunchContext;
        }

        private List<String> generateShellExecCommand() {
            return YarnUtils.getCommand(GuaguaYarnTask.class.getName(), GuaguaAppMaster.this.containerArgs, new StringBuilder(300).append(GuaguaAppMaster.this.getAppAttemptId().getApplicationId().getClusterTimestamp()).append(" ").append(GuaguaAppMaster.this.getAppAttemptId().getApplicationId().getId()).append(" ").append(this.container.getId().getId()).append(" ").append(GuaguaAppMaster.this.getAppAttemptId().getAttemptId()).append(" ").append(this.partition).append(" ").append(GuaguaAppMaster.this.rpcHostName).append(" ").append(GuaguaAppMaster.this.rpcPort).toString(), GuaguaAppMaster.this.getHeapPerContainer() + "");
        }

        private void buildEnvironment(ContainerLaunchContext containerLaunchContext) {
            HashMap newHashMap = Maps.newHashMap();
            YarnUtils.addLocalClasspathToEnv(newHashMap, GuaguaAppMaster.this.getYarnConf());
            containerLaunchContext.setEnvironment(newHashMap);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ml/shifu/guagua/yarn/GuaguaAppMaster$MasterThreadFactory.class */
    public static class MasterThreadFactory implements ThreadFactory {
        static final AtomicInteger poolNumber = new AtomicInteger(1);
        final ThreadGroup group;
        final AtomicInteger threadNumber = new AtomicInteger(1);
        final String namePrefix;

        MasterThreadFactory() {
            SecurityManager securityManager = System.getSecurityManager();
            this.group = securityManager != null ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup();
            this.namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(this.group, runnable, this.namePrefix + this.threadNumber.getAndIncrement(), 0L);
            if (thread.isDaemon()) {
                thread.setDaemon(false);
            }
            if (thread.getPriority() != 5) {
                thread.setPriority(5);
            }
            thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: ml.shifu.guagua.yarn.GuaguaAppMaster.MasterThreadFactory.1
                @Override // java.lang.Thread.UncaughtExceptionHandler
                public void uncaughtException(Thread thread2, Throwable th) {
                    GuaguaAppMaster.LOG.warn("Error message in thread {} with error message {}, error root cause {}.", new Object[]{thread2, th, th.getCause()});
                }
            });
            return thread;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ml/shifu/guagua/yarn/GuaguaAppMaster$NMCallbackHandler.class */
    public class NMCallbackHandler implements NMClientAsync.CallbackHandler {
        private ConcurrentMap<ContainerId, Container> containers;

        private NMCallbackHandler() {
            this.containers = new ConcurrentHashMap();
        }

        public void addContainer(ContainerId containerId, Container container) {
            this.containers.putIfAbsent(containerId, container);
        }

        public void onContainerStopped(ContainerId containerId) {
            GuaguaAppMaster.LOG.info("Succeeded to stop Container {}", containerId);
            this.containers.remove(containerId);
        }

        public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
            GuaguaAppMaster.LOG.info("Container Status: id={}, status={}", containerId, containerStatus);
        }

        public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> map) {
            GuaguaAppMaster.LOG.info("Succeeded to start Container {}", containerId);
            Container container = this.containers.get(containerId);
            if (container != null) {
                GuaguaAppMaster.this.getNmClientAsync().getContainerStatusAsync(containerId, container.getNodeId());
            }
        }

        public void onStartContainerError(ContainerId containerId, Throwable th) {
            GuaguaAppMaster.LOG.error(String.format("Failed to start Container %s", containerId), th);
            this.containers.remove(containerId);
        }

        public void onGetContainerStatusError(ContainerId containerId, Throwable th) {
            GuaguaAppMaster.LOG.error(String.format("Failed to query the status of Container %s", containerId), th);
        }

        public void onStopContainerError(ContainerId containerId, Throwable th) {
            GuaguaAppMaster.LOG.error(String.format("Failed to stop Container %s", containerId), th);
            this.containers.remove(containerId);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ml/shifu/guagua/yarn/GuaguaAppMaster$PartitionStatus.class */
    public enum PartitionStatus {
        INIT,
        SUCCESSFUL,
        FAILED,
        RETRY
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ml/shifu/guagua/yarn/GuaguaAppMaster$RMCallbackHandler.class */
    public class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
        private RMCallbackHandler() {
        }

        public void onContainersCompleted(List<ContainerStatus> list) {
            GuaguaAppMaster.LOG.info("Got response from RM for container ask, completedCnt={}", Integer.valueOf(list.size()));
            for (ContainerStatus containerStatus : list) {
                GuaguaAppMaster.LOG.info("Got container status for containerID={}, state={}, exitStatus={}, diagnostics={}.", new Object[]{containerStatus.getContainerId(), containerStatus.getState(), Integer.valueOf(containerStatus.getExitStatus()), containerStatus.getDiagnostics()});
                if (GuaguaAppMaster.this.containerPartitionMap.containsKey(containerStatus.getContainerId().toString())) {
                    int intValue = ((Integer) GuaguaAppMaster.this.containerPartitionMap.get(containerStatus.getContainerId().toString())).intValue();
                    if (((List) GuaguaAppMaster.this.partitionContainerMap.get(Integer.valueOf(intValue))).size() >= GuaguaAppMaster.this.maxContainerAttempts) {
                        GuaguaAppMaster.this.setDone(true);
                        GuaguaAppMaster.LOG.info("One partition {} has more than max attempt {} ", Integer.valueOf(intValue), Integer.valueOf(GuaguaAppMaster.this.maxContainerAttempts));
                        return;
                    }
                    switch (containerStatus.getExitStatus()) {
                        case GuaguaAppMaster.YARN_ABORT_EXIT_STATUS /* -100 */:
                            GuaguaAppMaster.LOG.info("YARN_ABORT_EXIT_STATUS: Container id {} exits with {}", containerStatus.getContainerId(), Integer.valueOf(GuaguaAppMaster.YARN_ABORT_EXIT_STATUS));
                            break;
                        case 0:
                            GuaguaAppMaster.this.partitionStatusMap.put(Integer.valueOf(intValue), PartitionStatus.SUCCESSFUL);
                            GuaguaAppMaster.this.getSuccessfulCount().incrementAndGet();
                            break;
                        default:
                            GuaguaAppMaster.LOG.info("default: Container id {} exits with {}", containerStatus.getContainerId(), Integer.valueOf(containerStatus.getExitStatus()));
                            GuaguaAppMaster.this.partitionStatusMap.put(Integer.valueOf(intValue), PartitionStatus.FAILED);
                            GuaguaAppMaster.this.failedPartitions.add(Integer.valueOf(intValue));
                            GuaguaAppMaster.this.madeOneContainerRequestToRM();
                            GuaguaAppMaster.this.getFailedCount().incrementAndGet();
                            break;
                    }
                    GuaguaAppMaster.this.getCompletedCount().incrementAndGet();
                } else {
                    GuaguaAppMaster.this.getCompletedCount().incrementAndGet();
                    GuaguaAppMaster.LOG.info("Why such container {} is started, no partition. Exited with status:{}", containerStatus.getContainerId(), Integer.valueOf(containerStatus.getExitStatus()));
                }
            }
            if (GuaguaAppMaster.this.getSuccessfulCount().get() != GuaguaAppMaster.this.getContainersToLaunch()) {
                GuaguaAppMaster.LOG.info("After completion of one conatiner. current status is: completedCount:{} containersToLaunch:{} successfulCount:{} failedCount:{}.", new Object[]{Integer.valueOf(GuaguaAppMaster.this.getCompletedCount().get()), Integer.valueOf(GuaguaAppMaster.this.getContainersToLaunch()), Integer.valueOf(GuaguaAppMaster.this.getSuccessfulCount().get()), Integer.valueOf(GuaguaAppMaster.this.getFailedCount().get())});
            } else {
                GuaguaAppMaster.this.setDone(true);
                GuaguaAppMaster.LOG.info("All container compeleted. done = {} ", Boolean.valueOf(GuaguaAppMaster.this.isDone()));
            }
        }

        public void onContainersAllocated(List<Container> list) {
            GuaguaAppMaster.LOG.info("Got response from RM for container ask, allocatedCnt={}", Integer.valueOf(list.size()));
            GuaguaAppMaster.this.getAllocatedCount().addAndGet(list.size());
            GuaguaAppMaster.LOG.info("Total allocated # of container so far {} : allocated out of required {}.", Integer.valueOf(GuaguaAppMaster.this.getAllocatedCount().get()), Integer.valueOf(GuaguaAppMaster.this.getContainersToLaunch()));
            GuaguaAppMaster.this.startContainerLaunchingThreads(list);
        }

        public void onShutdownRequest() {
            GuaguaAppMaster.this.setDone(true);
            GuaguaAppMaster.this.getAmRMClient().stop();
        }

        public void onNodesUpdated(List<NodeReport> list) {
        }

        public float getProgress() {
            float f;
            int i = 0;
            int i2 = 0;
            synchronized (GuaguaAppMaster.LOCK) {
                Iterator it = GuaguaAppMaster.this.partitionProgress.entrySet().iterator();
                while (it.hasNext()) {
                    i += ((GuaguaIterationStatus) ((Map.Entry) it.next()).getValue()).getCurrentIteration();
                    i2 += GuaguaAppMaster.this.totalIterations;
                }
                f = (i * 1.0f) / i2;
            }
            return f;
        }

        public void onError(Throwable th) {
            GuaguaAppMaster.this.setDone(true);
            GuaguaAppMaster.this.getAmRMClient().stop();
        }
    }

    /* loaded from: input_file:ml/shifu/guagua/yarn/GuaguaAppMaster$ServerHandler.class */
    private class ServerHandler extends SimpleChannelUpstreamHandler {
        private ServerHandler() {
        }

        public void handleUpstream(ChannelHandlerContext channelHandlerContext, ChannelEvent channelEvent) throws Exception {
            if ((channelEvent instanceof ChannelStateEvent) && ((ChannelStateEvent) channelEvent).getState() != ChannelState.INTEREST_OPS) {
                GuaguaAppMaster.LOG.debug(channelEvent.toString());
            }
            super.handleUpstream(channelHandlerContext, channelEvent);
        }

        public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) {
            List list;
            GuaguaIterationStatus guaguaIterationStatus = (GuaguaIterationStatus) GsonUtils.fromJson(messageEvent.getMessage().toString(), GuaguaIterationStatus.class);
            GuaguaAppMaster.LOG.info("Receive RPC status:{}", guaguaIterationStatus);
            synchronized (GuaguaAppMaster.LOCK) {
                GuaguaAppMaster.this.partitionProgress.put(Integer.valueOf(guaguaIterationStatus.getPartition()), guaguaIterationStatus);
            }
            if (guaguaIterationStatus.isKillContainer()) {
                synchronized (GuaguaAppMaster.LOCK) {
                    list = (List) GuaguaAppMaster.this.partitionContainerMap.get(Integer.valueOf(guaguaIterationStatus.getPartition()));
                }
                GuaguaAppMaster.LOG.info("containers:{}", list);
                Container container = (Container) list.get(list.size() - 1);
                GuaguaAppMaster.LOG.info("Container {} in node {} is killed because of straggler condition.", container.getId(), container.getNodeId());
                GuaguaAppMaster.this.getNmClientAsync().stopContainerAsync(container.getId(), container.getNodeId());
            }
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) {
            exceptionEvent.getChannel().close();
        }
    }

    public GuaguaAppMaster(ContainerId containerId, ApplicationAttemptId applicationAttemptId, Configuration configuration) {
        this.taskTimeOut = GuaguaYarnConstants.DEFAULT_TIME_OUT;
        this.rpcPort = GuaguaYarnConstants.DEFAULT_STATUS_RPC_PORT;
        try {
            String hostName = InetAddress.getLocalHost().getHostName();
            this.appMasterHostname = hostName;
            this.rpcHostName = hostName;
        } catch (UnknownHostException e) {
            LOG.error("Error in getting local host name.", e);
        }
        this.masterContainerId = containerId;
        this.appAttemptId = applicationAttemptId;
        this.appId = getAppAttemptId().getApplicationId();
        this.yarnConf = configuration;
        this.completedCount = new AtomicInteger(0);
        this.failedCount = new AtomicInteger(0);
        this.allocatedCount = new AtomicInteger(0);
        this.successfulCount = new AtomicInteger(0);
        this.partitionContainerMap = new ConcurrentHashMap();
        this.containerPartitionMap = new ConcurrentHashMap();
        this.partitionStatusMap = new ConcurrentHashMap();
        this.partitionIndex = new AtomicInteger(0);
        this.failedPartitions = new CopyOnWriteArrayList();
        this.maxContainerAttempts = getYarnConf().getInt(GuaguaYarnConstants.GUAGUA_YARN_MAX_CONTAINER_ATTEMPTS, 4);
        this.heapPerContainer = getYarnConf().getInt(GuaguaYarnConstants.GUAGUA_CHILD_MEMORY, 1024);
        this.totalIterations = getYarnConf().getInt("guagua.iteration.count", 1);
        String str = getYarnConf().get(GuaguaYarnConstants.GUAGUA_YARN_CONTAINER_ARGS);
        this.containerArgs = str == null ? GuaguaYarnConstants.GUAGUA_YARN_DEFAULT_CONTAINER_JAVA_OPTS : "-server -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 " + str;
        this.rpcPort = getYarnConf().getInt(GuaguaYarnConstants.GUAGUA_YARN_STATUS_RPC_PORT, GuaguaYarnConstants.DEFAULT_STATUS_RPC_PORT);
        this.partitionProgress = new ConcurrentHashMap();
        this.executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        this.taskTimeoutExecutor = Executors.newSingleThreadExecutor();
        this.taskTimeOut = getYarnConf().getLong(GuaguaYarnConstants.GUAGUA_TASK_TIMEOUT, GuaguaYarnConstants.DEFAULT_TIME_OUT);
        LOG.info("{}:{}", Long.valueOf(this.taskTimeOut), Long.valueOf(GuaguaYarnConstants.DEFAULT_TIME_OUT));
        LOG.info("GuaguaAppMaster  for ContainerId {} ApplicationAttemptId {}", containerId, applicationAttemptId);
    }

    public boolean run() throws YarnException, IOException {
        try {
            prepareInputSplits();
            getAllTokens();
            registerRMCallBackHandler();
            registerNMCallbackHandler();
            registerAMToRM();
            startRPCServer();
            startTaskTimeoutExecutor();
            madeAllContainerRequestToRM();
            LOG.info("Wait to finish ..");
            while (!isDone()) {
                try {
                    Thread.sleep(900L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            LOG.info("Done {}", Boolean.valueOf(isDone()));
            shutdown();
            return finish();
        } catch (Throwable th) {
            shutdown();
            finish();
            throw th;
        }
    }

    private void startTaskTimeoutExecutor() {
        this.taskTimeoutExecutor.submit(new Runnable() { // from class: ml.shifu.guagua.yarn.GuaguaAppMaster.1
            @Override // java.lang.Runnable
            public void run() {
                while (true) {
                    try {
                        Thread.sleep(GuaguaAppMaster.this.taskTimeOut);
                        GuaguaAppMaster.LOG.debug(GuaguaAppMaster.this.partitionProgress.toString());
                        for (Map.Entry entry : GuaguaAppMaster.this.partitionProgress.entrySet()) {
                            GuaguaIterationStatus guaguaIterationStatus = (GuaguaIterationStatus) entry.getValue();
                            if (guaguaIterationStatus.getTime() != 0 && guaguaIterationStatus.getCurrentIteration() != 1 && System.currentTimeMillis() - guaguaIterationStatus.getTime() > GuaguaAppMaster.this.taskTimeOut) {
                                List list = (List) GuaguaAppMaster.this.partitionContainerMap.get(entry.getKey());
                                Container container = (Container) list.get(list.size() - 1);
                                GuaguaAppMaster.LOG.info("Container {} is timeout with timeout period {}, will be killed by node manager {}.", new Object[]{container.getId(), Long.valueOf(GuaguaAppMaster.this.taskTimeOut), container.getNodeId()});
                                GuaguaAppMaster.this.getNmClientAsync().stopContainerAsync(container.getId(), container.getNodeId());
                            }
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            }
        });
    }

    protected void shutdown() {
        if (null != getExecutor() && !getExecutor().isTerminated()) {
            LOG.info("Forcefully terminating executors with done ={}", Boolean.valueOf(isDone()));
            getExecutor().shutdownNow();
        }
        if (this.rpcServer != null) {
            this.rpcServer.shutdown();
            this.rpcServer.releaseExternalResources();
        }
        if (this.taskTimeoutExecutor != null) {
            this.taskTimeoutExecutor.shutdownNow();
        }
    }

    private void startRPCServer() {
        this.rpcServer = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newFixedThreadPool(4), Executors.newCachedThreadPool(new MasterThreadFactory())));
        this.rpcServer.setPipelineFactory(new ChannelPipelineFactory() { // from class: ml.shifu.guagua.yarn.GuaguaAppMaster.2
            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline(new ChannelHandler[]{new ObjectEncoder(), new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())), new ServerHandler()});
            }
        });
        this.rpcServer.bind(new InetSocketAddress(this.rpcPort));
    }

    private void prepareInputSplits() throws IOException {
        this.inputSplits = getNewSplits(getYarnConf());
        setContainersToLaunch(this.inputSplits.size());
        LOG.info("Input split size including master: {}", Integer.valueOf(this.inputSplits.size()));
    }

    public List<InputSplit> getNewSplits(Configuration configuration) throws IOException {
        int i = getYarnConf().getInt("guagua.worker.number", 0) + configuration.getInt("guagua.master.number", 1);
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 1; i2 <= i; i2++) {
            arrayList.add(GsonUtils.fromJson(getYarnConf().get(GuaguaYarnConstants.GUAGUA_YARN_INPUT_SPLIT_PREFIX + i2), ml.shifu.guagua.hadoop.io.GuaguaInputSplit.class));
            this.partitionProgress.put(Integer.valueOf(i2), new GuaguaIterationStatus());
        }
        return arrayList;
    }

    private void getAllTokens() throws IOException {
        Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
        DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
        credentials.writeTokenStorageToStream(dataOutputBuffer);
        Iterator it = credentials.getAllTokens().iterator();
        while (it.hasNext()) {
            Token token = (Token) it.next();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Token type : {}", token.getKind());
            }
            if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
                it.remove();
            }
        }
        this.allTokens = ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength());
    }

    private void registerRMCallBackHandler() {
        setAmRMClient(AMRMClientAsync.createAMRMClientAsync(1000, new RMCallbackHandler()));
        getAmRMClient().init(getYarnConf());
        getAmRMClient().start();
    }

    private void registerNMCallbackHandler() {
        setContainerListener(new NMCallbackHandler());
        setNmClientAsync(new NMClientAsyncImpl(getContainerListener()));
        getNmClientAsync().init(getYarnConf());
        getNmClientAsync().start();
    }

    private RegisterApplicationMasterResponse registerAMToRM() throws YarnException {
        try {
            if (UserGroupInformation.isSecurityEnabled()) {
                LOG.info("SECURITY ENABLED ");
            }
            return getAmRMClient().registerApplicationMaster(this.appMasterHostname, this.appMasterRpcPort, this.appMasterTrackingUrl);
        } catch (IOException e) {
            throw new IllegalStateException("GuaguaAppMaster failed to register with RM.", e);
        }
    }

    private void madeAllContainerRequestToRM() {
        for (int i = 0; i < getContainersToLaunch(); i++) {
            getAmRMClient().addContainerRequest(setupContainerAskForRM());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void madeOneContainerRequestToRM() {
        getAmRMClient().addContainerRequest(setupContainerAskForRM());
    }

    private AMRMClient.ContainerRequest setupContainerAskForRM() {
        Priority priority = (Priority) Records.newRecord(Priority.class);
        priority.setPriority(0);
        Resource resource = (Resource) Records.newRecord(Resource.class);
        resource.setMemory(getHeapPerContainer());
        resource.setVirtualCores(getYarnConf().getInt(GuaguaYarnConstants.GUAGUA_YARN_TASK_VCORES, 1));
        AMRMClient.ContainerRequest containerRequest = new AMRMClient.ContainerRequest(resource, (String[]) null, (String[]) null, priority);
        LOG.info("Requested container ask: {}", containerRequest.toString());
        return containerRequest;
    }

    private boolean finish() {
        FinalApplicationStatus finalApplicationStatus;
        LOG.info("Application completed. Stopping running containers");
        getNmClientAsync().stop();
        LOG.info("Application completed. Signalling finish to RM");
        String str = null;
        boolean z = true;
        if (getSuccessfulCount().get() == getContainersToLaunch()) {
            finalApplicationStatus = FinalApplicationStatus.SUCCEEDED;
        } else {
            finalApplicationStatus = FinalApplicationStatus.FAILED;
            str = String.format("Diagnostics total=%s, completed=%s, failed=%s.", Integer.valueOf(getContainersToLaunch()), Integer.valueOf(getCompletedCount().get()), Integer.valueOf(getFailedCount().get()));
            z = false;
        }
        try {
            getAmRMClient().unregisterApplicationMaster(finalApplicationStatus, str, this.appMasterTrackingUrl);
        } catch (IOException e) {
            LOG.error("Failed to unregister application", e);
        } catch (YarnException e2) {
            LOG.error("Failed to unregister application", e2);
        }
        getAmRMClient().stop();
        return z;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void startContainerLaunchingThreads(List<Container> list) {
        Map<String, List<Container>> hostContainersMap = getHostContainersMap(list);
        int size = list.size();
        while (size > 0) {
            int currentPartition = getCurrentPartition();
            if (currentPartition == -1) {
                LOG.warn("Request too many resources. TODO, remove containers no needed.");
                Iterator<Container> it = list.iterator();
                while (it.hasNext()) {
                    getAmRMClient().releaseAssignedContainer(it.next().getId());
                }
                return;
            }
            Container dataLocalityContainer = getDataLocalityContainer(hostContainersMap, currentPartition);
            if (dataLocalityContainer == null) {
                dataLocalityContainer = list.get(0);
            }
            list.remove(dataLocalityContainer);
            LOG.info("Launching command on a new container., containerId={}, containerNode={}, containerPort={}, containerNodeURI={}, containerResourceMemory={}", new Object[]{dataLocalityContainer.getId(), dataLocalityContainer.getNodeId().getHost(), Integer.valueOf(dataLocalityContainer.getNodeId().getPort()), dataLocalityContainer.getNodeHttpAddress(), Integer.valueOf(dataLocalityContainer.getResource().getMemory())});
            List<Container> list2 = this.partitionContainerMap.get(Integer.valueOf(currentPartition));
            if (list2 == null) {
                list2 = new ArrayList();
            }
            list2.add(dataLocalityContainer);
            this.partitionContainerMap.put(Integer.valueOf(currentPartition), list2);
            this.containerPartitionMap.put(dataLocalityContainer.getId().toString(), Integer.valueOf(currentPartition));
            this.partitionStatusMap.put(Integer.valueOf(currentPartition), PartitionStatus.INIT);
            getExecutor().execute(new LaunchContainerRunnable(dataLocalityContainer, getContainerListener(), currentPartition));
            size = list.size();
        }
    }

    private Map<String, List<Container>> getHostContainersMap(List<Container> list) {
        HashMap hashMap = new HashMap();
        for (Container container : list) {
            String host = container.getNodeId().getHost();
            List list2 = (List) hashMap.get(host);
            if (list2 == null) {
                list2 = new ArrayList();
            }
            list2.add(container);
            hashMap.put(host, list2);
        }
        return hashMap;
    }

    private Container getDataLocalityContainer(Map<String, List<Container>> map, int i) {
        ml.shifu.guagua.hadoop.io.GuaguaInputSplit guaguaInputSplit = (ml.shifu.guagua.hadoop.io.GuaguaInputSplit) this.inputSplits.get(i - 1);
        String str = null;
        FileSplit[] fileSplits = guaguaInputSplit.getFileSplits();
        if (fileSplits != null) {
            try {
                str = fileSplits[0].getLocations()[0];
            } catch (Exception e) {
                str = null;
            }
        }
        List<Container> list = map.get(str);
        Container container = null;
        if (list != null && !list.isEmpty()) {
            Container remove = list.remove(0);
            map.put(str, list);
            LOG.info("find a container {} with host {} for partition {} and split {}.", new Object[]{remove, str, Integer.valueOf(i), guaguaInputSplit});
            return remove;
        }
        String str2 = null;
        List<Container> list2 = null;
        Iterator<Map.Entry<String, List<Container>>> it = map.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Map.Entry<String, List<Container>> next = it.next();
            str2 = next.getKey();
            list2 = next.getValue();
            if (list2 != null && !list2.isEmpty()) {
                container = list2.remove(0);
                break;
            }
        }
        map.put(str2, list2);
        LOG.info("find a container {} with host {} for partition {} and split {}.", new Object[]{container, str, Integer.valueOf(i), guaguaInputSplit});
        return container;
    }

    private int getCurrentPartition() {
        int addAndGet;
        LOG.info("failed container request size:{} {}", Integer.valueOf(this.failedPartitions.size()), this.failedPartitions);
        Iterator<Integer> it = this.failedPartitions.iterator();
        if (it.hasNext()) {
            addAndGet = it.next().intValue();
            this.failedPartitions.remove(Integer.valueOf(addAndGet));
            LOG.info("failed container request size after remove:{} {}", Integer.valueOf(this.failedPartitions.size()), this.failedPartitions);
        } else {
            LOG.info("partitionIndex{} containersToLaunch {}", Integer.valueOf(this.partitionIndex.get()), Integer.valueOf(this.containersToLaunch));
            if (this.partitionIndex.get() >= this.containersToLaunch) {
                return -1;
            }
            addAndGet = this.partitionIndex.addAndGet(1);
        }
        return addAndGet;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized Map<String, LocalResource> getTaskResourceMap() {
        if (null == localResources) {
            localResources = Maps.newHashMap();
            try {
                localResources = YarnUtils.getLocalResourceMap(getYarnConf(), getAppId());
            } catch (IOException e) {
                throw new IllegalStateException("Could not configure the container launch context for GuaguaYarnTask.", e);
            }
        }
        return localResources;
    }

    public ContainerId getContainerId() {
        return this.masterContainerId;
    }

    public void setContainerId(ContainerId containerId) {
        this.masterContainerId = containerId;
    }

    public ApplicationAttemptId getAppAttemptId() {
        return this.appAttemptId;
    }

    public void setAppAttemptId(ApplicationAttemptId applicationAttemptId) {
        this.appAttemptId = applicationAttemptId;
    }

    public boolean isDone() {
        return this.done;
    }

    public void setDone(boolean z) {
        this.done = z;
    }

    public Configuration getYarnConf() {
        return this.yarnConf;
    }

    public void setYarnConf(YarnConfiguration yarnConfiguration) {
        this.yarnConf = yarnConfiguration;
    }

    public AtomicInteger getCompletedCount() {
        return this.completedCount;
    }

    public void setCompletedCount(AtomicInteger atomicInteger) {
        this.completedCount = atomicInteger;
    }

    public AtomicInteger getFailedCount() {
        return this.failedCount;
    }

    public void setFailedCount(AtomicInteger atomicInteger) {
        this.failedCount = atomicInteger;
    }

    public AtomicInteger getAllocatedCount() {
        return this.allocatedCount;
    }

    public void setAllocatedCount(AtomicInteger atomicInteger) {
        this.allocatedCount = atomicInteger;
    }

    public AtomicInteger getSuccessfulCount() {
        return this.successfulCount;
    }

    public void setSuccessfulCount(AtomicInteger atomicInteger) {
        this.successfulCount = atomicInteger;
    }

    public int getContainersToLaunch() {
        return this.containersToLaunch;
    }

    public void setContainersToLaunch(int i) {
        this.containersToLaunch = i;
    }

    public ExecutorService getExecutor() {
        return this.executor;
    }

    public void setExecutor(ExecutorService executorService) {
        this.executor = executorService;
    }

    public int getHeapPerContainer() {
        return this.heapPerContainer;
    }

    public void setHeapPerContainer(int i) {
        this.heapPerContainer = i;
    }

    public AMRMClientAsync<AMRMClient.ContainerRequest> getAmRMClient() {
        return this.amRmClient;
    }

    public void setAmRMClient(AMRMClientAsync<AMRMClient.ContainerRequest> aMRMClientAsync) {
        this.amRmClient = aMRMClientAsync;
    }

    public NMClientAsync getNmClientAsync() {
        return this.nmClientAsync;
    }

    public void setNmClientAsync(NMClientAsync nMClientAsync) {
        this.nmClientAsync = nMClientAsync;
    }

    public NMCallbackHandler getContainerListener() {
        return this.containerListener;
    }

    public void setContainerListener(NMCallbackHandler nMCallbackHandler) {
        this.containerListener = nMCallbackHandler;
    }

    public String getContainerArgs() {
        return this.containerArgs;
    }

    public void setContainerArgs(String str) {
        this.containerArgs = str;
    }

    public ApplicationId getAppId() {
        return this.appId;
    }

    public void setAppId(ApplicationId applicationId) {
        this.appId = applicationId;
    }

    public static void main(String[] strArr) {
        LOG.info("Starting GuaguaAppMaster. ");
        String str = System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.name());
        if (str == null) {
            throw new IllegalArgumentException("ContainerId not found in env vars.");
        }
        ContainerId containerId = ConverterUtils.toContainerId(str);
        ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
        YarnConfiguration yarnConfiguration = new YarnConfiguration();
        String str2 = System.getenv(ApplicationConstants.Environment.USER.name());
        yarnConfiguration.set("mapreduce.job.user.name", str2);
        try {
            UserGroupInformation.setConfiguration(yarnConfiguration);
            Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
            LOG.info("Executing with tokens:");
            Iterator it = credentials.getAllTokens().iterator();
            while (it.hasNext()) {
                LOG.info(((Token) it.next()).toString());
            }
            UserGroupInformation createRemoteUser = UserGroupInformation.createRemoteUser(str2);
            createRemoteUser.addCredentials(credentials);
            Iterator it2 = credentials.getAllTokens().iterator();
            while (it2.hasNext()) {
                if (((Token) it2.next()).getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
                    it2.remove();
                }
            }
            createRemoteUser.doAs(new PrivilegedAction<Void>() { // from class: ml.shifu.guagua.yarn.GuaguaAppMaster.3
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.security.PrivilegedAction
                public Void run() {
                    boolean z = false;
                    try {
                        z = GuaguaAppMaster.this.run();
                    } catch (Throwable th) {
                        GuaguaAppMaster.LOG.error("GuaguaAppMaster caught a top-level exception in main.", th);
                        System.exit(1);
                    }
                    if (z) {
                        GuaguaAppMaster.LOG.info("Guagua Application Master completed successfully. exiting");
                        System.exit(0);
                        return null;
                    }
                    GuaguaAppMaster.LOG.info("Guagua Application Master failed. exiting");
                    System.exit(2);
                    return null;
                }
            });
        } catch (Throwable th) {
            LOG.error("GuaguaAppMaster caught a top-level exception in main.", th);
            System.exit(1);
        }
    }

    static {
        Configuration.addDefaultResource(GuaguaYarnConstants.GUAGUA_CONF_FILE);
    }
}
