package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;

import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import junit.framework.Assert;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mortbay.util.MultiException;

/* loaded from: input_file:test-classes/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.class */
public class TestLogAggregationService extends BaseContainerManagerTest {
    private static RecordFactory recordFactory;
    private Map<ApplicationAccessType, String> acls = createAppAcls();
    private File remoteRootLogDir = new File("target", getClass().getName() + "-remoteLogDir");

    public TestLogAggregationService() throws UnsupportedFileSystemException {
        this.remoteRootLogDir.mkdir();
    }

    @Override // org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest
    public void tearDown() throws IOException, InterruptedException {
        super.tearDown();
        createContainerExecutor().deleteAsUser(this.user, new Path(this.remoteRootLogDir.getAbsolutePath()), new Path[0]);
    }

    @Test
    public void testLocalFileDeletionAfterUpload() throws Exception {
        this.delSrvc = new DeletionService(createContainerExecutor());
        this.delSrvc.init(this.conf);
        this.conf.set("yarn.nodemanager.log-dirs", localLogDir.getAbsolutePath());
        this.conf.set("yarn.nodemanager.remote-app-log-dir", this.remoteRootLogDir.getAbsolutePath());
        DrainDispatcher createDispatcher = createDispatcher();
        EventHandler eventHandler = (EventHandler) Mockito.mock(EventHandler.class);
        createDispatcher.register(ApplicationEventType.class, eventHandler);
        LogAggregationService logAggregationService = (LogAggregationService) Mockito.spy(new LogAggregationService(createDispatcher, this.context, this.delSrvc, this.dirsHandler));
        logAggregationService.init(this.conf);
        logAggregationService.start();
        ApplicationId newApplicationId = BuilderUtils.newApplicationId(1234L, 1);
        File file = new File(localLogDir, ConverterUtils.toString(newApplicationId));
        file.mkdir();
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId, this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
        ApplicationAttemptId newApplicationAttemptId = BuilderUtils.newApplicationAttemptId(newApplicationId, 1);
        ContainerId newContainerId = BuilderUtils.newContainerId(newApplicationAttemptId, 1);
        writeContainerLogs(file, newContainerId);
        logAggregationService.handle(new LogHandlerContainerFinishedEvent(newContainerId, 0));
        logAggregationService.handle(new LogHandlerAppFinishedEvent(newApplicationId));
        logAggregationService.stop();
        Assert.assertEquals(0, logAggregationService.getNumAggregators());
        ((LogAggregationService) Mockito.verify(logAggregationService)).closeFileSystems((UserGroupInformation) Mockito.any(UserGroupInformation.class));
        this.delSrvc.stop();
        File file2 = new File(file, ConverterUtils.toString(newContainerId));
        for (String str : new String[]{"stdout", "stderr", "syslog"}) {
            File file3 = new File(file2, str);
            Assert.assertFalse("check " + file3, file3.exists());
        }
        Assert.assertFalse(file.exists());
        Path remoteNodeLogFileForApp = logAggregationService.getRemoteNodeLogFileForApp(newApplicationId, this.user);
        Assert.assertTrue("Log file [" + remoteNodeLogFileForApp + "] not found", new File(remoteNodeLogFileForApp.toUri().getPath()).exists());
        createDispatcher.await();
        checkEvents(eventHandler, new ApplicationEvent[]{new ApplicationEvent(newApplicationAttemptId.getApplicationId(), ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), new ApplicationEvent(newApplicationAttemptId.getApplicationId(), ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)}, true, "getType", "getApplicationID");
        createDispatcher.stop();
    }

    @Test
    public void testNoContainerOnNode() throws Exception {
        this.conf.set("yarn.nodemanager.log-dirs", localLogDir.getAbsolutePath());
        this.conf.set("yarn.nodemanager.remote-app-log-dir", this.remoteRootLogDir.getAbsolutePath());
        DrainDispatcher createDispatcher = createDispatcher();
        EventHandler eventHandler = (EventHandler) Mockito.mock(EventHandler.class);
        createDispatcher.register(ApplicationEventType.class, eventHandler);
        LogAggregationService logAggregationService = new LogAggregationService(createDispatcher, this.context, this.delSrvc, this.dirsHandler);
        logAggregationService.init(this.conf);
        logAggregationService.start();
        ApplicationId newApplicationId = BuilderUtils.newApplicationId(1234L, 1);
        new File(localLogDir, ConverterUtils.toString(newApplicationId)).mkdir();
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId, this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
        logAggregationService.handle(new LogHandlerAppFinishedEvent(newApplicationId));
        logAggregationService.stop();
        Assert.assertEquals(0, logAggregationService.getNumAggregators());
        Assert.assertFalse(new File(logAggregationService.getRemoteNodeLogFileForApp(newApplicationId, this.user).toUri().getPath()).exists());
        createDispatcher.await();
        checkEvents(eventHandler, new ApplicationEvent[]{new ApplicationEvent(newApplicationId, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), new ApplicationEvent(newApplicationId, ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)}, true, "getType", "getApplicationID");
        createDispatcher.stop();
    }

    @Test
    public void testMultipleAppsLogAggregation() throws Exception {
        this.conf.set("yarn.nodemanager.log-dirs", localLogDir.getAbsolutePath());
        this.conf.set("yarn.nodemanager.remote-app-log-dir", this.remoteRootLogDir.getAbsolutePath());
        DrainDispatcher createDispatcher = createDispatcher();
        EventHandler eventHandler = (EventHandler) Mockito.mock(EventHandler.class);
        createDispatcher.register(ApplicationEventType.class, eventHandler);
        LogAggregationService logAggregationService = new LogAggregationService(createDispatcher, this.context, this.delSrvc, this.dirsHandler);
        logAggregationService.init(this.conf);
        logAggregationService.start();
        ApplicationId newApplicationId = BuilderUtils.newApplicationId(1234L, 1);
        File file = new File(localLogDir, ConverterUtils.toString(newApplicationId));
        file.mkdir();
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId, this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
        ApplicationAttemptId newApplicationAttemptId = BuilderUtils.newApplicationAttemptId(newApplicationId, 1);
        ContainerId newContainerId = BuilderUtils.newContainerId(newApplicationAttemptId, 1);
        writeContainerLogs(file, newContainerId);
        logAggregationService.handle(new LogHandlerContainerFinishedEvent(newContainerId, 0));
        ApplicationId newApplicationId2 = BuilderUtils.newApplicationId(1234L, 2);
        ApplicationAttemptId newApplicationAttemptId2 = BuilderUtils.newApplicationAttemptId(newApplicationId2, 1);
        File file2 = new File(localLogDir, ConverterUtils.toString(newApplicationId2));
        file2.mkdir();
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId2, this.user, null, ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY, this.acls));
        ContainerId newContainerId2 = BuilderUtils.newContainerId(newApplicationAttemptId2, 1);
        writeContainerLogs(file2, newContainerId2);
        logAggregationService.handle(new LogHandlerContainerFinishedEvent(newContainerId2, 0));
        ContainerId newContainerId3 = BuilderUtils.newContainerId(newApplicationAttemptId, 2);
        writeContainerLogs(file, newContainerId3);
        logAggregationService.handle(new LogHandlerContainerFinishedEvent(newContainerId3, 0));
        ApplicationId newApplicationId3 = BuilderUtils.newApplicationId(1234L, 3);
        ApplicationAttemptId newApplicationAttemptId3 = BuilderUtils.newApplicationAttemptId(newApplicationId3, 1);
        File file3 = new File(localLogDir, ConverterUtils.toString(newApplicationId3));
        file3.mkdir();
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId3, this.user, null, ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
        createDispatcher.await();
        checkEvents(eventHandler, new ApplicationEvent[]{new ApplicationEvent(newApplicationId, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), new ApplicationEvent(newApplicationId2, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), new ApplicationEvent(newApplicationId3, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED)}, false, "getType", "getApplicationID");
        Mockito.reset(new EventHandler[]{eventHandler});
        ContainerId newContainerId4 = BuilderUtils.newContainerId(newApplicationAttemptId3, 1);
        writeContainerLogs(file3, newContainerId4);
        logAggregationService.handle(new LogHandlerContainerFinishedEvent(newContainerId4, 0));
        ContainerId newContainerId5 = BuilderUtils.newContainerId(newApplicationAttemptId3, 2);
        writeContainerLogs(file3, newContainerId5);
        logAggregationService.handle(new LogHandlerContainerFinishedEvent(newContainerId5, 1));
        ContainerId newContainerId6 = BuilderUtils.newContainerId(newApplicationAttemptId2, 2);
        writeContainerLogs(file2, newContainerId6);
        logAggregationService.handle(new LogHandlerContainerFinishedEvent(newContainerId6, 0));
        ContainerId newContainerId7 = BuilderUtils.newContainerId(newApplicationAttemptId3, 3);
        writeContainerLogs(file3, newContainerId7);
        logAggregationService.handle(new LogHandlerContainerFinishedEvent(newContainerId7, 0));
        logAggregationService.handle(new LogHandlerAppFinishedEvent(newApplicationId2));
        logAggregationService.handle(new LogHandlerAppFinishedEvent(newApplicationId3));
        logAggregationService.handle(new LogHandlerAppFinishedEvent(newApplicationId));
        logAggregationService.stop();
        Assert.assertEquals(0, logAggregationService.getNumAggregators());
        verifyContainerLogs(logAggregationService, newApplicationId, new ContainerId[]{newContainerId, newContainerId3});
        verifyContainerLogs(logAggregationService, newApplicationId2, new ContainerId[]{newContainerId2});
        verifyContainerLogs(logAggregationService, newApplicationId3, new ContainerId[]{newContainerId4, newContainerId5});
        createDispatcher.await();
        checkEvents(eventHandler, new ApplicationEvent[]{new ApplicationEvent(newApplicationId, ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED), new ApplicationEvent(newApplicationId2, ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED), new ApplicationEvent(newApplicationId3, ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)}, false, "getType", "getApplicationID");
        createDispatcher.stop();
    }

    @Test
    public void testLogAggregationInitFailsWithoutKillingNM() throws Exception {
        this.conf.set("yarn.nodemanager.log-dirs", localLogDir.getAbsolutePath());
        this.conf.set("yarn.nodemanager.remote-app-log-dir", this.remoteRootLogDir.getAbsolutePath());
        DrainDispatcher createDispatcher = createDispatcher();
        EventHandler eventHandler = (EventHandler) Mockito.mock(EventHandler.class);
        createDispatcher.register(ApplicationEventType.class, eventHandler);
        LogAggregationService logAggregationService = (LogAggregationService) Mockito.spy(new LogAggregationService(createDispatcher, this.context, this.delSrvc, this.dirsHandler));
        logAggregationService.init(this.conf);
        logAggregationService.start();
        ApplicationId newApplicationId = BuilderUtils.newApplicationId(System.currentTimeMillis(), (int) Math.random());
        ((LogAggregationService) Mockito.doThrow(new YarnException("KABOOM!")).when(logAggregationService)).initAppAggregator((ApplicationId) Mockito.eq(newApplicationId), (String) Mockito.eq(this.user), (Credentials) Mockito.any(Credentials.class), (ContainerLogsRetentionPolicy) Mockito.any(ContainerLogsRetentionPolicy.class), Mockito.anyMap());
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId, this.user, null, ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
        createDispatcher.await();
        checkEvents(eventHandler, new ApplicationEvent[]{new ApplicationFinishEvent(newApplicationId, "Application failed to init aggregation: KABOOM!")}, false, "getType", "getApplicationID", "getDiagnostic");
        ((LogAggregationService) Mockito.verify(logAggregationService, Mockito.never())).closeFileSystems((UserGroupInformation) Mockito.any(UserGroupInformation.class));
        logAggregationService.handle(new LogHandlerContainerFinishedEvent(BuilderUtils.newContainerId(4, 1, 1L, 1), 0));
        createDispatcher.await();
        logAggregationService.handle(new LogHandlerAppFinishedEvent(BuilderUtils.newApplicationId(1L, 5)));
        createDispatcher.await();
    }

    @Test
    public void testLogAggregationCreateDirsFailsWithoutKillingNM() throws Exception {
        this.conf.set("yarn.nodemanager.log-dirs", localLogDir.getAbsolutePath());
        this.conf.set("yarn.nodemanager.remote-app-log-dir", this.remoteRootLogDir.getAbsolutePath());
        DrainDispatcher createDispatcher = createDispatcher();
        EventHandler eventHandler = (EventHandler) Mockito.mock(EventHandler.class);
        createDispatcher.register(ApplicationEventType.class, eventHandler);
        LogAggregationService logAggregationService = (LogAggregationService) Mockito.spy(new LogAggregationService(createDispatcher, this.context, this.delSrvc, this.dirsHandler));
        logAggregationService.init(this.conf);
        logAggregationService.start();
        ApplicationId newApplicationId = BuilderUtils.newApplicationId(System.currentTimeMillis(), (int) Math.random());
        RuntimeException runtimeException = new RuntimeException("KABOOM!");
        ((LogAggregationService) Mockito.doThrow(runtimeException).when(logAggregationService)).createAppDir((String) Mockito.any(String.class), (ApplicationId) Mockito.any(ApplicationId.class), (UserGroupInformation) Mockito.any(UserGroupInformation.class));
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId, this.user, null, ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
        createDispatcher.await();
        checkEvents(eventHandler, new ApplicationEvent[]{new ApplicationFinishEvent(newApplicationId, "Application failed to init aggregation: " + runtimeException)}, false, "getType", "getApplicationID", "getDiagnostic");
        ((LogAggregationService) Mockito.verify(logAggregationService)).closeFileSystems((UserGroupInformation) Mockito.any(UserGroupInformation.class));
        logAggregationService.handle(new LogHandlerContainerFinishedEvent(BuilderUtils.newContainerId(4, 1, 1L, 1), 0));
        createDispatcher.await();
        logAggregationService.handle(new LogHandlerAppFinishedEvent(BuilderUtils.newApplicationId(1L, 5)));
        createDispatcher.await();
        logAggregationService.stop();
        Assert.assertEquals(0, logAggregationService.getNumAggregators());
    }

    private void writeContainerLogs(File file, ContainerId containerId) throws IOException {
        String converterUtils = ConverterUtils.toString(containerId);
        File file2 = new File(file, converterUtils);
        file2.mkdir();
        for (String str : new String[]{"stdout", "stderr", "syslog"}) {
            FileWriter fileWriter = new FileWriter(new File(file2, str));
            fileWriter.write(converterUtils + " Hello " + str + "!");
            fileWriter.close();
        }
    }

    private void verifyContainerLogs(LogAggregationService logAggregationService, ApplicationId applicationId, ContainerId[] containerIdArr) throws IOException {
        AggregatedLogFormat.LogReader logReader = new AggregatedLogFormat.LogReader(this.conf, logAggregationService.getRemoteNodeLogFileForApp(applicationId, this.user));
        Assert.assertEquals(this.user, logReader.getApplicationOwner());
        verifyAcls(logReader.getApplicationAcls());
        try {
            HashMap hashMap = new HashMap();
            AggregatedLogFormat.LogKey logKey = new AggregatedLogFormat.LogKey();
            DataInputStream next = logReader.next(logKey);
            while (next != null) {
                LOG.info("Found container " + logKey.toString());
                HashMap hashMap2 = new HashMap();
                hashMap.put(logKey.toString(), hashMap2);
                while (true) {
                    try {
                        DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
                        AggregatedLogFormat.LogReader.readAContainerLogsForALogType(next, dataOutputBuffer);
                        DataInputBuffer dataInputBuffer = new DataInputBuffer();
                        dataInputBuffer.reset(dataOutputBuffer.getData(), dataOutputBuffer.getLength());
                        Assert.assertEquals("\nLogType:", dataInputBuffer.readUTF());
                        String readUTF = dataInputBuffer.readUTF();
                        Assert.assertEquals("\nLogLength:", dataInputBuffer.readUTF());
                        long parseLong = Long.parseLong(dataInputBuffer.readUTF());
                        Assert.assertEquals("\nLog Contents:\n", dataInputBuffer.readUTF());
                        byte[] bArr = new byte[(int) parseLong];
                        dataInputBuffer.read(bArr, 0, (int) parseLong);
                        hashMap2.put(readUTF, new String(bArr));
                        LOG.info("LogType:" + readUTF);
                        LOG.info("LogType:" + parseLong);
                        LOG.info("Log Contents:\n" + ((String) hashMap2.get(readUTF)));
                    } catch (EOFException e) {
                        logKey = new AggregatedLogFormat.LogKey();
                        next = logReader.next(logKey);
                    }
                }
            }
            Assert.assertEquals(containerIdArr.length, hashMap.size());
            for (ContainerId containerId : containerIdArr) {
                String converterUtils = ConverterUtils.toString(containerId);
                Map map = (Map) hashMap.remove(converterUtils);
                Assert.assertEquals(3, map.size());
                for (String str : new String[]{"stdout", "stderr", "syslog"}) {
                    String str2 = converterUtils + " Hello " + str + "!";
                    LOG.info("Expected log-content : " + new String(str2));
                    String str3 = (String) map.remove(str);
                    Assert.assertNotNull(containerId + " " + str + " not present in aggregated log-file!", str3);
                    Assert.assertEquals(str2, str3);
                }
                Assert.assertEquals(0, map.size());
            }
            Assert.assertEquals(0, hashMap.size());
            logReader.close();
        } catch (Throwable th) {
            logReader.close();
            throw th;
        }
    }

    @Test
    public void testLogAggregationForRealContainerLaunch() throws IOException, InterruptedException {
        this.containerManager.start();
        File file = new File(tmpDir, "scriptFile.sh");
        PrintWriter printWriter = new PrintWriter(file);
        printWriter.write("\necho Hello World! Stdout! > " + new File(localLogDir, "stdout"));
        printWriter.write("\necho Hello World! Stderr! > " + new File(localLogDir, "stderr"));
        printWriter.write("\necho Hello World! Syslog! > " + new File(localLogDir, "syslog"));
        printWriter.close();
        ContainerLaunchContext containerLaunchContext = (ContainerLaunchContext) recordFactory.newRecordInstance(ContainerLaunchContext.class);
        ApplicationId applicationId = (ApplicationId) recordFactory.newRecordInstance(ApplicationId.class);
        applicationId.setClusterTimestamp(0L);
        applicationId.setId(0);
        ContainerId newContainerId = BuilderUtils.newContainerId(BuilderUtils.newApplicationAttemptId(applicationId, 1), 0);
        containerLaunchContext.setContainerId(newContainerId);
        containerLaunchContext.setUser(this.user);
        URL yarnUrlFromPath = ConverterUtils.getYarnUrlFromPath(localFS.makeQualified(new Path(file.getAbsolutePath())));
        LocalResource localResource = (LocalResource) recordFactory.newRecordInstance(LocalResource.class);
        localResource.setResource(yarnUrlFromPath);
        localResource.setSize(-1L);
        localResource.setVisibility(LocalResourceVisibility.APPLICATION);
        localResource.setType(LocalResourceType.FILE);
        localResource.setTimestamp(file.lastModified());
        HashMap hashMap = new HashMap();
        hashMap.put("dest_file", localResource);
        containerLaunchContext.setLocalResources(hashMap);
        containerLaunchContext.setUser(containerLaunchContext.getUser());
        ArrayList arrayList = new ArrayList();
        arrayList.add("/bin/bash");
        arrayList.add(file.getAbsolutePath());
        containerLaunchContext.setCommands(arrayList);
        containerLaunchContext.setResource((Resource) recordFactory.newRecordInstance(Resource.class));
        containerLaunchContext.getResource().setMemory(104857600);
        StartContainerRequest startContainerRequest = (StartContainerRequest) recordFactory.newRecordInstance(StartContainerRequest.class);
        startContainerRequest.setContainerLaunchContext(containerLaunchContext);
        this.containerManager.startContainer(startContainerRequest);
        BaseContainerManagerTest.waitForContainerState(this.containerManager, newContainerId, ContainerState.COMPLETE);
        this.containerManager.handle(new CMgrCompletedAppsEvent(Arrays.asList(applicationId)));
        this.containerManager.stop();
    }

    private void verifyAcls(Map<ApplicationAccessType, String> map) {
        Assert.assertEquals(this.acls.size(), map.size());
        for (ApplicationAccessType applicationAccessType : this.acls.keySet()) {
            Assert.assertEquals(this.acls.get(applicationAccessType), map.get(applicationAccessType));
        }
    }

    private DrainDispatcher createDispatcher() {
        DrainDispatcher drainDispatcher = new DrainDispatcher();
        drainDispatcher.init(this.conf);
        drainDispatcher.start();
        return drainDispatcher;
    }

    private Map<ApplicationAccessType, String> createAppAcls() {
        HashMap hashMap = new HashMap();
        hashMap.put(ApplicationAccessType.MODIFY_APP, "user group");
        hashMap.put(ApplicationAccessType.VIEW_APP, "*");
        return hashMap;
    }

    @Test(timeout = 20000)
    public void testStopAfterError() throws Exception {
        DeletionService deletionService = (DeletionService) Mockito.mock(DeletionService.class);
        LocalDirsHandlerService localDirsHandlerService = (LocalDirsHandlerService) Mockito.mock(LocalDirsHandlerService.class);
        Mockito.when(localDirsHandlerService.getLogDirs()).thenThrow(new Throwable[]{new RuntimeException()});
        DrainDispatcher createDispatcher = createDispatcher();
        createDispatcher.register(ApplicationEventType.class, (EventHandler) Mockito.mock(EventHandler.class));
        LogAggregationService logAggregationService = new LogAggregationService(createDispatcher, this.context, deletionService, localDirsHandlerService);
        logAggregationService.init(this.conf);
        logAggregationService.start();
        logAggregationService.handle(new LogHandlerAppStartedEvent(BuilderUtils.newApplicationId(1234L, 1), this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
        logAggregationService.stop();
        Assert.assertEquals(0, logAggregationService.getNumAggregators());
    }

    @Test
    public void testLogAggregatorCleanup() throws Exception {
        DeletionService deletionService = (DeletionService) Mockito.mock(DeletionService.class);
        LocalDirsHandlerService localDirsHandlerService = (LocalDirsHandlerService) Mockito.mock(LocalDirsHandlerService.class);
        DrainDispatcher createDispatcher = createDispatcher();
        createDispatcher.register(ApplicationEventType.class, (EventHandler) Mockito.mock(EventHandler.class));
        LogAggregationService logAggregationService = new LogAggregationService(createDispatcher, this.context, deletionService, localDirsHandlerService);
        logAggregationService.init(this.conf);
        logAggregationService.start();
        ApplicationId newApplicationId = BuilderUtils.newApplicationId(1234L, 1);
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId, this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
        logAggregationService.handle(new LogHandlerAppFinishedEvent(newApplicationId));
        createDispatcher.await();
        for (int i = 20000; i > 0 && logAggregationService.getNumAggregators() > 0; i -= 100) {
            Thread.sleep(100L);
        }
        Assert.assertEquals("Log aggregator failed to cleanup!", 0, logAggregationService.getNumAggregators());
    }

    private static <T extends Event<?>> void checkEvents(EventHandler<T> eventHandler, T[] tArr, boolean z, String... strArr) throws Exception {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(tArr.getClass().getComponentType());
        ((EventHandler) Mockito.verify(eventHandler, Mockito.atLeast(0))).handle((Event) forClass.capture());
        List allValues = forClass.getAllValues();
        MultiException multiException = new MultiException();
        try {
            Assert.assertEquals("expected events", tArr.length, allValues.size());
        } catch (Throwable th) {
            multiException.add(th);
        }
        if (z) {
            int max = Math.max(tArr.length, allValues.size());
            int i = 0;
            while (i < max) {
                try {
                    Assert.assertEquals("event#" + i, i < tArr.length ? eventToString(tArr[i], strArr) : null, i < allValues.size() ? eventToString((Event) allValues.get(i), strArr) : null);
                } catch (Throwable th2) {
                    multiException.add(th2);
                }
                i++;
            }
        } else {
            HashSet hashSet = new HashSet();
            for (T t : tArr) {
                hashSet.add(eventToString(t, strArr));
            }
            Iterator it = allValues.iterator();
            while (it.hasNext()) {
                try {
                    String eventToString = eventToString((Event) it.next(), strArr);
                    Assert.assertTrue("unexpected event: " + eventToString, hashSet.remove(eventToString));
                } catch (Throwable th3) {
                    multiException.add(th3);
                }
            }
            Iterator it2 = hashSet.iterator();
            while (it2.hasNext()) {
                try {
                    Assert.fail("missing event: " + ((String) it2.next()));
                } catch (Throwable th4) {
                    multiException.add(th4);
                }
            }
        }
        multiException.ifExceptionThrow();
    }

    private static String eventToString(Event<?> event, String[] strArr) throws Exception {
        StringBuilder sb = new StringBuilder("[ ");
        for (String str : strArr) {
            try {
                Method method = event.getClass().getMethod(str, new Class[0]);
                sb.append(method.getName()).append("=").append(method.invoke(event, new Object[0]).toString()).append(" ");
            } catch (Exception e) {
            }
        }
        sb.append("]");
        return sb.toString();
    }

    static {
        LOG = LogFactory.getLog(TestLogAggregationService.class);
        recordFactory = RecordFactoryProvider.getRecordFactory((Configuration) null);
    }
}
