package ml.shifu.guagua.yarn;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import ml.shifu.guagua.GuaguaRuntimeException;
import ml.shifu.guagua.GuaguaService;
import ml.shifu.guagua.io.Bytable;
import ml.shifu.guagua.io.GuaguaFileSplit;
import ml.shifu.guagua.master.GuaguaMasterService;
import ml.shifu.guagua.util.Progressable;
import ml.shifu.guagua.worker.GuaguaWorkerService;
import ml.shifu.guagua.yarn.util.GsonUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.zookeeper.common.IOUtils;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.serialization.ClassResolvers;
import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ml/shifu/guagua/yarn/GuaguaYarnTask.class */
public class GuaguaYarnTask<MASTER_RESULT extends Bytable, WORKER_RESULT extends Bytable> {
    private static final Logger LOG = LoggerFactory.getLogger(GuaguaYarnTask.class);
    private int partition;
    private ApplicationAttemptId appAttemptId;
    private ContainerId containerId;
    private ApplicationId appId;
    private Configuration yarnConf;
    private boolean isMaster;
    private GuaguaService guaguaService;
    private ml.shifu.guagua.hadoop.io.GuaguaInputSplit inputSplit;
    private int rpcPort;
    private String rpcHostName;
    private Channel rpcClientChannel;
    private ClientBootstrap rpcClient;

    /* loaded from: input_file:ml/shifu/guagua/yarn/GuaguaYarnTask$ClientHandler.class */
    public static class ClientHandler extends SimpleChannelUpstreamHandler {
        public void handleUpstream(ChannelHandlerContext channelHandlerContext, ChannelEvent channelEvent) throws Exception {
            super.handleUpstream(channelHandlerContext, channelEvent);
        }

        public void channelConnected(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) {
            GuaguaYarnTask.LOG.info("Channel connected:{}", channelStateEvent.getValue());
        }

        public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) {
            GuaguaYarnTask.LOG.info("Receive status:{}", messageEvent.getMessage());
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) {
            exceptionEvent.getChannel().close();
        }
    }

    public GuaguaYarnTask(ApplicationAttemptId applicationAttemptId, ContainerId containerId, int i, String str, String str2, Configuration configuration) {
        this.rpcPort = GuaguaYarnConstants.DEFAULT_STATUS_RPC_PORT;
        this.appAttemptId = applicationAttemptId;
        this.containerId = containerId;
        this.partition = i;
        this.rpcHostName = str;
        this.rpcPort = Integer.parseInt(str2);
        LOG.info("current partition:{}", Integer.valueOf(getPartition()));
        this.appId = getAppAttemptId().getApplicationId();
        this.yarnConf = configuration;
        this.inputSplit = (ml.shifu.guagua.hadoop.io.GuaguaInputSplit) GsonUtils.fromJson(getYarnConf().get(GuaguaYarnConstants.GUAGUA_YARN_INPUT_SPLIT_PREFIX + i), ml.shifu.guagua.hadoop.io.GuaguaInputSplit.class);
        LOG.info("current input split:{}", getInputSplit());
    }

    protected void setup() {
        setMaster(getInputSplit().isMaster());
        if (isMaster()) {
            setGuaguaService(new GuaguaMasterService());
        } else {
            setGuaguaService(new GuaguaWorkerService());
            LinkedList linkedList = new LinkedList();
            for (FileSplit fileSplit : getInputSplit().getFileSplits()) {
                linkedList.add(new GuaguaFileSplit(fileSplit.getPath().toString(), fileSplit.getStart(), fileSplit.getLength()));
            }
            getGuaguaService().setSplits(linkedList);
        }
        Properties replaceConfToProps = replaceConfToProps();
        getGuaguaService().setAppId(getAppId().toString());
        getGuaguaService().setContainerId(getPartition() + "");
        getGuaguaService().init(replaceConfToProps);
        getGuaguaService().start();
        initRPCClient();
    }

    private void initRPCClient() {
        this.rpcClient = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor()));
        this.rpcClient.setPipelineFactory(new ChannelPipelineFactory() { // from class: ml.shifu.guagua.yarn.GuaguaYarnTask.1
            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline(new ChannelHandler[]{new ObjectEncoder(), new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())), new ClientHandler()});
            }
        });
        ChannelFuture connect = this.rpcClient.connect(new InetSocketAddress(this.rpcHostName, this.rpcPort));
        LOG.info("Connect to {}:{}", this.rpcHostName, Integer.valueOf(this.rpcPort));
        this.rpcClientChannel = connect.awaitUninterruptibly().getChannel();
    }

    private Properties replaceConfToProps() {
        Properties properties = new Properties();
        Iterator it = getYarnConf().iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            properties.put(entry.getKey(), entry.getValue());
            if (LOG.isInfoEnabled() && ((String) entry.getKey()).toString().startsWith(GuaguaYarnConstants.GUAGUA_HDFS_DIR)) {
                LOG.debug("{}:{}", entry.getKey(), entry.getValue());
            }
        }
        return properties;
    }

    private <T> T getSplitDetails(Path path, long j) throws IOException {
        FSDataInputStream fSDataInputStream = null;
        try {
            fSDataInputStream = path.getFileSystem(getYarnConf()).open(path);
            fSDataInputStream.seek(j);
            String readString = Text.readString(fSDataInputStream);
            try {
                Deserializer deserializer = new SerializationFactory(getYarnConf()).getDeserializer(getYarnConf().getClassByName(readString));
                deserializer.open(fSDataInputStream);
                T t = (T) deserializer.deserialize((Object) null);
                IOUtils.closeStream(fSDataInputStream);
                return t;
            } catch (ClassNotFoundException e) {
                IOException iOException = new IOException(String.format("Split class %s not found", readString));
                iOException.initCause(e);
                throw iOException;
            }
        } catch (Throwable th) {
            IOUtils.closeStream(fSDataInputStream);
            throw th;
        }
    }

    public void run() {
        try {
            try {
                setup();
                getGuaguaService().run(new Progressable() { // from class: ml.shifu.guagua.yarn.GuaguaYarnTask.2
                    public void progress(int i, int i2, String str, boolean z, boolean z2) {
                        if (z) {
                            GuaguaYarnTask.LOG.info("Application progress: {}%.", Integer.valueOf((i * 100) / i2));
                            GuaguaIterationStatus guaguaIterationStatus = new GuaguaIterationStatus(GuaguaYarnTask.this.partition, i, i2);
                            guaguaIterationStatus.setKillContainer(z2);
                            GuaguaYarnTask.LOG.info("Send GuaguaIterationStatus: {}.", guaguaIterationStatus);
                            try {
                                GuaguaYarnTask.this.rpcClientChannel.write(GsonUtils.toJson(guaguaIterationStatus)).await(10L, TimeUnit.SECONDS);
                            } catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                            }
                        }
                    }
                });
                cleanup();
            } catch (Exception e) {
                LOG.error("Error in guagua main run method.", e);
                throw new GuaguaRuntimeException(e);
            }
        } catch (Throwable th) {
            cleanup();
            throw th;
        }
    }

    protected void cleanup() {
        if (this.rpcClient != null) {
            this.rpcClient.shutdown();
            this.rpcClient.releaseExternalResources();
        }
        if (this.rpcClientChannel != null) {
            this.rpcClientChannel.close();
        }
        getGuaguaService().stop();
    }

    public GuaguaService getGuaguaService() {
        return this.guaguaService;
    }

    public void setGuaguaService(GuaguaService guaguaService) {
        this.guaguaService = guaguaService;
    }

    public int getPartition() {
        return this.partition;
    }

    public void setPartition(int i) {
        this.partition = i;
    }

    public ApplicationAttemptId getAppAttemptId() {
        return this.appAttemptId;
    }

    public void setAppAttemptId(ApplicationAttemptId applicationAttemptId) {
        this.appAttemptId = applicationAttemptId;
    }

    public ContainerId getContainerId() {
        return this.containerId;
    }

    public void setContainerId(ContainerId containerId) {
        this.containerId = containerId;
    }

    public boolean isMaster() {
        return this.isMaster;
    }

    public void setMaster(boolean z) {
        this.isMaster = z;
    }

    public Configuration getYarnConf() {
        return this.yarnConf;
    }

    public void setYarnConf(YarnConfiguration yarnConfiguration) {
        this.yarnConf = yarnConfiguration;
    }

    public ApplicationId getAppId() {
        return this.appId;
    }

    public void setAppId(ApplicationId applicationId) {
        this.appId = applicationId;
    }

    public ml.shifu.guagua.hadoop.io.GuaguaInputSplit getInputSplit() {
        return this.inputSplit;
    }

    public void setInputSplit(ml.shifu.guagua.hadoop.io.GuaguaInputSplit guaguaInputSplit) {
        this.inputSplit = guaguaInputSplit;
    }

    public static void main(String[] strArr) {
        LOG.info("args:{}", Arrays.toString(strArr));
        if (strArr.length != 7) {
            throw new IllegalStateException(String.format("GuaguaYarnTask could not construct a TaskAttemptID for the Guagua job from args: %s", Arrays.toString(strArr)));
        }
        String str = System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.name());
        if (str == null) {
            throw new IllegalArgumentException("ContainerId not found in env vars.");
        }
        ContainerId containerId = ConverterUtils.toContainerId(str);
        ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
        try {
            YarnConfiguration yarnConfiguration = new YarnConfiguration();
            String str2 = System.getenv(ApplicationConstants.Environment.USER.name());
            yarnConfiguration.set("mapreduce.job.user.name", str2);
            UserGroupInformation.setConfiguration(yarnConfiguration);
            Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
            LOG.info("Executing with tokens:");
            Iterator it = credentials.getAllTokens().iterator();
            while (it.hasNext()) {
                LOG.info(((Token) it.next()).toString());
            }
            UserGroupInformation createRemoteUser = UserGroupInformation.createRemoteUser(str2);
            createRemoteUser.addCredentials(credentials);
            createRemoteUser.doAs(new PrivilegedAction<Void>() { // from class: ml.shifu.guagua.yarn.GuaguaYarnTask.3
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.security.PrivilegedAction
                public Void run() {
                    GuaguaYarnTask.this.run();
                    return null;
                }
            });
        } catch (Throwable th) {
            LOG.error("GuaguaYarnTask threw a top-level exception, failing task", th);
            System.exit(2);
        }
        System.exit(0);
    }

    static {
        Configuration.addDefaultResource(GuaguaYarnConstants.GUAGUA_CONF_FILE);
    }
}
