package org.apache.tez.dag.app.dag;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListeningExecutorService;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.Preconditions;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
import org.apache.tez.dag.app.dag.impl.TezRootInputInitializerContextImpl;
import org.apache.tez.dag.app.dag.impl.VertexImpl;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputInitializer;
import org.apache.tez.runtime.api.InputInitializerContext;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/dag/app/dag/RootInputInitializerManager.class */
public class RootInputInitializerManager {
    private static final Logger LOG = LoggerFactory.getLogger(RootInputInitializerManager.class);

    @VisibleForTesting
    protected ListeningExecutorService executor;
    private final EventHandler eventHandler;
    private final UserGroupInformation dagUgi;
    private final StateChangeNotifier entityStateTracker;
    private final Vertex vertex;
    private final AppContext appContext;
    private volatile boolean isStopped = false;

    @VisibleForTesting
    final Map<String, InitializerWrapper> initializerMap = new ConcurrentHashMap();
    private final ListMultimap<String, VertexUpdateRegistrationHolder> pendingVertexRegistrations = LinkedListMultimap.create();

    @InterfaceAudience.Private
    @VisibleForTesting
    /* loaded from: input_file:org/apache/tez/dag/app/dag/RootInputInitializerManager$InitializerWrapper.class */
    public static class InitializerWrapper implements VertexStateUpdateListener, TaskStateUpdateListener {
        private final RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input;
        private final InputInitializer initializer;
        private final InputInitializerContext context;
        private final String vertexLogIdentifier;
        private final TezVertexID vertexId;
        private final StateChangeNotifier stateChangeNotifier;
        private final AppContext appContext;
        private final AtomicBoolean isComplete = new AtomicBoolean(false);
        private final List<String> notificationRegisteredVertices = Lists.newArrayList();
        private final Map<String, Map<Integer, Integer>> firstSuccessfulAttemptMap = new HashMap();
        private final ListMultimap<String, TezEvent> pendingEvents = LinkedListMultimap.create();
        private final List<String> taskNotificationRegisteredVertices = Lists.newLinkedList();

        InitializerWrapper(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> rootInputLeafOutput, InputInitializer inputInitializer, InputInitializerContext inputInitializerContext, Vertex vertex, StateChangeNotifier stateChangeNotifier, AppContext appContext) {
            this.input = rootInputLeafOutput;
            this.initializer = inputInitializer;
            this.context = inputInitializerContext;
            this.vertexLogIdentifier = vertex.getLogIdentifier();
            this.vertexId = vertex.getVertexId();
            this.stateChangeNotifier = stateChangeNotifier;
            this.appContext = appContext;
        }

        public RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> getInput() {
            return this.input;
        }

        public InputInitializer getInitializer() {
            return this.initializer;
        }

        public String getVertexLogIdentifier() {
            return this.vertexLogIdentifier;
        }

        public boolean isComplete() {
            return this.isComplete.get();
        }

        public void setComplete() {
            this.isComplete.set(true);
            unregisterForVertexStatusUpdates();
            unregisterForTaskStatusUpdates();
        }

        public void registerForVertexStateUpdates(String str, Set<org.apache.tez.dag.api.event.VertexState> set) {
            synchronized (this.notificationRegisteredVertices) {
                this.notificationRegisteredVertices.add(str);
            }
            this.stateChangeNotifier.registerForVertexUpdates(str, set, this);
        }

        private void unregisterForVertexStatusUpdates() {
            synchronized (this.notificationRegisteredVertices) {
                Iterator<String> it = this.notificationRegisteredVertices.iterator();
                while (it.hasNext()) {
                    this.stateChangeNotifier.unregisterForVertexUpdates(it.next(), this);
                }
            }
        }

        @Override // org.apache.tez.dag.app.dag.VertexStateUpdateListener
        public void onStateUpdated(VertexStateUpdate vertexStateUpdate) {
            if (isComplete()) {
                if (RootInputInitializerManager.LOG.isDebugEnabled()) {
                    RootInputInitializerManager.LOG.debug("Dropping state update for vertex=" + vertexStateUpdate.getVertexName() + ", state=" + vertexStateUpdate.getVertexState() + " since initializer " + this.input.getName() + " is already complete.");
                }
            } else {
                try {
                    this.initializer.onVertexStateUpdated(vertexStateUpdate);
                } catch (Exception e) {
                    this.appContext.getEventHandler().handle(new VertexEventRootInputFailed(this.vertexId, this.input.getName(), new AMUserCodeException(AMUserCodeException.Source.InputInitializer, e)));
                }
            }
        }

        @InterfaceAudience.Private
        @VisibleForTesting
        public Map<String, Map<Integer, Integer>> getFirstSuccessfulAttemptMap() {
            return this.firstSuccessfulAttemptMap;
        }

        @InterfaceAudience.Private
        @VisibleForTesting
        public ListMultimap<String, TezEvent> getPendingEvents() {
            return this.pendingEvents;
        }

        @Override // org.apache.tez.dag.app.dag.TaskStateUpdateListener
        public void onTaskSucceeded(String str, TezTaskID tezTaskID, int i) {
            if (i == -1) {
                throw new TezUncheckedException("AttemptId is -1. This is likely caused by TEZ-1577; recovery not supported when InputInitializerEvents are used");
            }
            Map<Integer, Integer> map = this.firstSuccessfulAttemptMap.get(str);
            Integer num = map.get(Integer.valueOf(tezTaskID.getId()));
            if (num == null) {
                num = Integer.valueOf(i);
                map.put(Integer.valueOf(tezTaskID.getId()), num);
            }
            List list = this.pendingEvents.get(str);
            if (list == null || list.isEmpty()) {
                return;
            }
            LinkedList linkedList = new LinkedList();
            Iterator it = list.iterator();
            while (it.hasNext()) {
                TezEvent tezEvent = (TezEvent) it.next();
                int id = tezEvent.getSourceInfo().getTaskAttemptID().getTaskID().getId();
                int id2 = tezEvent.getSourceInfo().getTaskAttemptID().getId();
                if (id == tezTaskID.getId()) {
                    if (id2 == num.intValue()) {
                        linkedList.add((InputInitializerEvent) tezEvent.getEvent());
                    }
                    it.remove();
                }
            }
            sendEvents(linkedList);
        }

        public void handleInputInitializerEvents(Collection<TezEvent> collection) {
            LinkedList linkedList = new LinkedList();
            for (TezEvent tezEvent : collection) {
                String taskVertexName = tezEvent.getSourceInfo().getTaskVertexName();
                int id = tezEvent.getSourceInfo().getTaskAttemptID().getTaskID().getId();
                int id2 = tezEvent.getSourceInfo().getTaskAttemptID().getId();
                Map<Integer, Integer> map = this.firstSuccessfulAttemptMap.get(taskVertexName);
                if (map == null) {
                    map = new HashMap();
                    this.firstSuccessfulAttemptMap.put(taskVertexName, map);
                    this.stateChangeNotifier.registerForTaskSuccessUpdates(taskVertexName, this);
                    this.taskNotificationRegisteredVertices.add(taskVertexName);
                }
                Integer num = map.get(Integer.valueOf(id));
                if (num == null) {
                    Task task = this.appContext.getCurrentDAG().getVertex(taskVertexName).getTask(id);
                    if (task.getState() == TaskState.SUCCEEDED) {
                        num = Integer.valueOf(task.getSuccessfulAttempt().getID().getId());
                        map.put(Integer.valueOf(id), num);
                    }
                }
                if (num == null) {
                    this.pendingEvents.put(taskVertexName, tezEvent);
                } else if (id2 == num.intValue()) {
                    linkedList.add((InputInitializerEvent) tezEvent.getEvent());
                }
            }
            sendEvents(linkedList);
        }

        private void sendEvents(List<InputInitializerEvent> list) {
            if (list == null || list.isEmpty()) {
                return;
            }
            try {
                this.initializer.handleInputInitializerEvent(list);
            } catch (Exception e) {
                this.appContext.getEventHandler().handle(new VertexEventRootInputFailed(this.vertexId, this.input.getName(), new AMUserCodeException(AMUserCodeException.Source.InputInitializer, e)));
            }
        }

        private void unregisterForTaskStatusUpdates() {
            Iterator<String> it = this.taskNotificationRegisteredVertices.iterator();
            while (it.hasNext()) {
                this.stateChangeNotifier.unregisterForTaskSuccessUpdates(it.next(), this);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/app/dag/RootInputInitializerManager$VertexUpdateRegistrationHolder.class */
    public static class VertexUpdateRegistrationHolder {
        private final String vertexName;
        private final Set<org.apache.tez.dag.api.event.VertexState> stateSet;

        private VertexUpdateRegistrationHolder(String str, Set<org.apache.tez.dag.api.event.VertexState> set) {
            this.vertexName = str;
            this.stateSet = set;
        }
    }

    public RootInputInitializerManager(Vertex vertex, AppContext appContext, UserGroupInformation userGroupInformation, StateChangeNotifier stateChangeNotifier) {
        this.appContext = appContext;
        this.vertex = vertex;
        this.eventHandler = appContext.getEventHandler();
        this.executor = appContext.getExecService();
        this.dagUgi = userGroupInformation;
        this.entityStateTracker = stateChangeNotifier;
    }

    public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> list, List<TezEvent> list2) {
        this.executor.submit(() -> {
            createAndStartInitializing(list, list2);
        });
    }

    private void createAndStartInitializing(List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> list, List<TezEvent> list2) {
        String str = null;
        try {
            ArrayList<InitializerWrapper> arrayList = new ArrayList();
            for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> rootInputLeafOutput : list) {
                str = rootInputLeafOutput.getName();
                InitializerWrapper createInitializerWrapper = createInitializerWrapper(rootInputLeafOutput);
                this.initializerMap.put(rootInputLeafOutput.getName(), createInitializerWrapper);
                registerPendingVertex(rootInputLeafOutput, createInitializerWrapper);
                arrayList.add(createInitializerWrapper);
            }
            handleInitializerEvents(list2);
            list2.clear();
            for (InitializerWrapper initializerWrapper : arrayList) {
                this.executor.submit(() -> {
                    runInitializerAndProcessResult(initializerWrapper);
                });
            }
        } catch (Throwable th) {
            VertexImpl vertexImpl = (VertexImpl) this.vertex;
            String str2 = "Fail to create InputInitializerManager, " + ExceptionUtils.getStackTrace(th);
            LOG.info(str2);
            vertexImpl.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, str2);
            this.eventHandler.handle(new VertexEventRootInputFailed(this.vertex.getVertexId(), str, new AMUserCodeException(AMUserCodeException.Source.InputInitializer, th)));
        }
    }

    private void runInitializerAndProcessResult(InitializerWrapper initializerWrapper) {
        try {
            List<Event> runInitializer = runInitializer(initializerWrapper);
            LOG.info("Succeeded InputInitializer for Input: " + initializerWrapper.getInput().getName() + " on vertex " + initializerWrapper.getVertexLogIdentifier());
            this.eventHandler.handle(new VertexEventRootInputInitialized(this.vertex.getVertexId(), initializerWrapper.getInput().getName(), runInitializer));
        } catch (Throwable th) {
            th = th;
            if (th instanceof UndeclaredThrowableException) {
                th = th.getCause();
            }
            LOG.info("Failed InputInitializer for Input: " + initializerWrapper.getInput().getName() + " on vertex " + initializerWrapper.getVertexLogIdentifier());
            this.eventHandler.handle(new VertexEventRootInputFailed(this.vertex.getVertexId(), initializerWrapper.getInput().getName(), new AMUserCodeException(AMUserCodeException.Source.InputInitializer, th)));
        } finally {
            initializerWrapper.setComplete();
        }
    }

    private List<Event> runInitializer(InitializerWrapper initializerWrapper) throws IOException, InterruptedException {
        return (List) this.dagUgi.doAs(() -> {
            LOG.info("Starting InputInitializer for Input: " + initializerWrapper.getInput().getName() + " on vertex " + initializerWrapper.getVertexLogIdentifier());
            try {
                TezUtilsInternal.setHadoopCallerContext(this.appContext.getHadoopShim(), initializerWrapper.vertexId);
                return initializerWrapper.getInitializer().initialize();
            } finally {
                this.appContext.getHadoopShim().clearHadoopCallerContext();
            }
        });
    }

    private InitializerWrapper createInitializerWrapper(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> rootInputLeafOutput) throws TezException {
        TezRootInputInitializerContextImpl tezRootInputInitializerContextImpl = new TezRootInputInitializerContextImpl(rootInputLeafOutput, this.vertex, this.appContext, this);
        try {
            TezUtilsInternal.setHadoopCallerContext(this.appContext.getHadoopShim(), this.vertex.getVertexId());
            InitializerWrapper initializerWrapper = new InitializerWrapper(rootInputLeafOutput, createInitializer(rootInputLeafOutput, tezRootInputInitializerContextImpl), tezRootInputInitializerContextImpl, this.vertex, this.entityStateTracker, this.appContext);
            this.appContext.getHadoopShim().clearHadoopCallerContext();
            return initializerWrapper;
        } catch (Throwable th) {
            this.appContext.getHadoopShim().clearHadoopCallerContext();
            throw th;
        }
    }

    private void registerPendingVertex(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> rootInputLeafOutput, InitializerWrapper initializerWrapper) {
        List<VertexUpdateRegistrationHolder> removeAll = this.pendingVertexRegistrations.removeAll(rootInputLeafOutput.getName());
        if (removeAll != null) {
            for (VertexUpdateRegistrationHolder vertexUpdateRegistrationHolder : removeAll) {
                initializerWrapper.registerForVertexStateUpdates(vertexUpdateRegistrationHolder.vertexName, vertexUpdateRegistrationHolder.stateSet);
            }
        }
    }

    @VisibleForTesting
    protected InputInitializer createInitializer(final RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> rootInputLeafOutput, final InputInitializerContext inputInitializerContext) throws TezException {
        try {
            return (InputInitializer) this.dagUgi.doAs(new PrivilegedExceptionAction<InputInitializer>() { // from class: org.apache.tez.dag.app.dag.RootInputInitializerManager.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.security.PrivilegedExceptionAction
                public InputInitializer run() throws Exception {
                    return (InputInitializer) ReflectionUtils.createClazzInstance(rootInputLeafOutput.getControllerDescriptor().getClassName(), new Class[]{InputInitializerContext.class}, new Object[]{inputInitializerContext});
                }
            });
        } catch (IOException e) {
            throw new TezException(e);
        } catch (InterruptedException e2) {
            throw new TezException(e2);
        } catch (UndeclaredThrowableException e3) {
            if (e3.getCause() instanceof TezException) {
                throw e3.getCause();
            }
            throw e3;
        }
    }

    public void handleInitializerEvents(List<TezEvent> list) {
        LinkedListMultimap create = LinkedListMultimap.create();
        for (TezEvent tezEvent : list) {
            Preconditions.checkState(tezEvent.getEvent() instanceof InputInitializerEvent);
            InputInitializerEvent event = tezEvent.getEvent();
            Preconditions.checkState(this.vertex.getName().equals(event.getTargetVertexName()), "Received event for incorrect vertex");
            Objects.requireNonNull(event.getTargetInputName(), "target input name must be set");
            InitializerWrapper initializerWrapper = this.initializerMap.get(event.getTargetInputName());
            Preconditions.checkState(initializerWrapper != null, "Received event for unknown input : " + event.getTargetInputName());
            create.put(initializerWrapper, tezEvent);
        }
        if (this.isStopped) {
            LOG.warn("InitializerManager already stopped for " + this.vertex.getLogIdentifier() + " Dropping " + list.size() + " events");
        }
        for (Map.Entry entry : create.asMap().entrySet()) {
            InitializerWrapper initializerWrapper2 = (InitializerWrapper) entry.getKey();
            if (initializerWrapper2.isComplete()) {
                LOG.warn(((Collection) entry.getValue()).size() + " events targeted at vertex " + this.vertex.getLogIdentifier() + ", initializerWrapper for Input: " + initializerWrapper2.getInput().getName() + " will be dropped, since Input has already been initialized.");
            } else {
                initializerWrapper2.handleInputInitializerEvents((Collection) entry.getValue());
            }
        }
    }

    public void registerForVertexUpdates(String str, String str2, @Nullable Set<org.apache.tez.dag.api.event.VertexState> set) {
        Objects.requireNonNull(str, "VertexName cannot be null: " + str);
        Objects.requireNonNull(str2, "InputName cannot be null");
        InitializerWrapper initializerWrapper = this.initializerMap.get(str2);
        if (initializerWrapper == null) {
            this.pendingVertexRegistrations.put(str2, new VertexUpdateRegistrationHolder(str, set));
        } else {
            initializerWrapper.registerForVertexStateUpdates(str, set);
        }
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public InitializerWrapper getInitializerWrapper(String str) {
        return this.initializerMap.get(str);
    }

    public void shutdown() {
        this.isStopped = true;
    }
}
