package org.apache.beam.fn.harness.logging;

import com.beust.jcommander.Parameters;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.LogRecord;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Timestamp;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.BindableService;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.inprocess.InProcessChannelBuilder;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.inprocess.InProcessServerBuilder;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.CallStreamObserver;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.testng.remote.RemoteTestNG;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.class */
public class BeamFnLoggingClientTest {
    private static final LogRecord FILTERED_RECORD = new LogRecord(Level.SEVERE, "FilteredMessage");
    private static final LogRecord TEST_RECORD = new LogRecord(Level.FINE, "Message");
    private static final LogRecord TEST_RECORD_WITH_EXCEPTION;
    private static final BeamFnApi.LogEntry TEST_ENTRY;
    private static final BeamFnApi.LogEntry TEST_ENTRY_WITH_EXCEPTION;

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Test
    public void testLogging() throws Exception {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        final AtomicReference atomicReference = new AtomicReference();
        final CallStreamObserver build = TestStreams.withOnNext(list -> {
            concurrentLinkedQueue.addAll(list.getLogEntriesList());
        }).withOnCompleted(() -> {
            atomicBoolean.set(true);
            ((StreamObserver) atomicReference.get()).onCompleted();
        }).build();
        Endpoints.ApiServiceDescriptor build2 = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(getClass().getName() + Parameters.DEFAULT_OPTION_PREFIXES + UUID.randomUUID().toString()).build();
        Server build3 = InProcessServerBuilder.forName(build2.getUrl()).addService((BindableService) new BeamFnLoggingGrpc.BeamFnLoggingImplBase() { // from class: org.apache.beam.fn.harness.logging.BeamFnLoggingClientTest.1
            @Override // org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc.BeamFnLoggingImplBase
            public StreamObserver<BeamFnApi.LogEntry.List> logging(StreamObserver<BeamFnApi.LogControl> streamObserver) {
                atomicReference.set(streamObserver);
                return build;
            }
        }).build();
        build3.start();
        ManagedChannel build4 = InProcessChannelBuilder.forName(build2.getUrl()).build();
        try {
            BeamFnLoggingClient beamFnLoggingClient = new BeamFnLoggingClient(PipelineOptionsFactory.fromArgs("--defaultSdkHarnessLogLevel=OFF", "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}").create(), build2, apiServiceDescriptor -> {
                return build4;
            });
            Assert.assertEquals(Level.OFF, LogManager.getLogManager().getLogger("").getLevel());
            Assert.assertEquals(Level.FINE, LogManager.getLogManager().getLogger("ConfiguredLogger").getLevel());
            LogManager.getLogManager().getLogger("").log(FILTERED_RECORD);
            LogManager.getLogManager().getLogger("ConfiguredLogger").log(TEST_RECORD);
            LogManager.getLogManager().getLogger("ConfiguredLogger").log(TEST_RECORD_WITH_EXCEPTION);
            beamFnLoggingClient.close();
            Assert.assertEquals(Level.INFO, LogManager.getLogManager().getLogger("").getLevel());
            Assert.assertNull(LogManager.getLogManager().getLogger("ConfiguredLogger").getLevel());
            Assert.assertTrue(atomicBoolean.get());
            Assert.assertTrue(build4.isShutdown());
            Assert.assertThat(concurrentLinkedQueue, Matchers.contains(TEST_ENTRY, TEST_ENTRY_WITH_EXCEPTION));
            build3.shutdownNow();
        } catch (Throwable th) {
            build3.shutdownNow();
            throw th;
        }
    }

    @Test
    public void testWhenServerFailsThatClientIsAbleToCleanup() throws Exception {
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        final AtomicReference atomicReference = new AtomicReference();
        final CallStreamObserver build = TestStreams.withOnNext(list -> {
            concurrentLinkedQueue.addAll(list.getLogEntriesList());
        }).build();
        Endpoints.ApiServiceDescriptor build2 = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(getClass().getName() + Parameters.DEFAULT_OPTION_PREFIXES + UUID.randomUUID().toString()).build();
        Server build3 = InProcessServerBuilder.forName(build2.getUrl()).addService((BindableService) new BeamFnLoggingGrpc.BeamFnLoggingImplBase() { // from class: org.apache.beam.fn.harness.logging.BeamFnLoggingClientTest.2
            @Override // org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc.BeamFnLoggingImplBase
            public StreamObserver<BeamFnApi.LogEntry.List> logging(StreamObserver<BeamFnApi.LogControl> streamObserver) {
                atomicReference.set(streamObserver);
                streamObserver.onError(Status.INTERNAL.withDescription("TEST ERROR").asException());
                return build;
            }
        }).build();
        build3.start();
        ManagedChannel build4 = InProcessChannelBuilder.forName(build2.getUrl()).build();
        try {
            BeamFnLoggingClient beamFnLoggingClient = new BeamFnLoggingClient(PipelineOptionsFactory.fromArgs("--defaultSdkHarnessLogLevel=OFF", "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}").create(), build2, apiServiceDescriptor -> {
                return build4;
            });
            this.thrown.expectMessage("TEST ERROR");
            beamFnLoggingClient.close();
            Assert.assertEquals(Level.INFO, LogManager.getLogManager().getLogger("").getLevel());
            Assert.assertNull(LogManager.getLogManager().getLogger("ConfiguredLogger").getLevel());
            Assert.assertTrue(build4.isShutdown());
            build3.shutdownNow();
        } catch (Throwable th) {
            Assert.assertEquals(Level.INFO, LogManager.getLogManager().getLogger("").getLevel());
            Assert.assertNull(LogManager.getLogManager().getLogger("ConfiguredLogger").getLevel());
            Assert.assertTrue(build4.isShutdown());
            build3.shutdownNow();
            throw th;
        }
    }

    @Test
    public void testWhenServerHangsUpEarlyThatClientIsAbleCleanup() throws Exception {
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        final AtomicReference atomicReference = new AtomicReference();
        final CallStreamObserver build = TestStreams.withOnNext(list -> {
            concurrentLinkedQueue.addAll(list.getLogEntriesList());
        }).build();
        Endpoints.ApiServiceDescriptor build2 = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(getClass().getName() + Parameters.DEFAULT_OPTION_PREFIXES + UUID.randomUUID().toString()).build();
        Server build3 = InProcessServerBuilder.forName(build2.getUrl()).addService((BindableService) new BeamFnLoggingGrpc.BeamFnLoggingImplBase() { // from class: org.apache.beam.fn.harness.logging.BeamFnLoggingClientTest.3
            @Override // org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc.BeamFnLoggingImplBase
            public StreamObserver<BeamFnApi.LogEntry.List> logging(StreamObserver<BeamFnApi.LogControl> streamObserver) {
                atomicReference.set(streamObserver);
                streamObserver.onCompleted();
                return build;
            }
        }).build();
        build3.start();
        ManagedChannel build4 = InProcessChannelBuilder.forName(build2.getUrl()).build();
        try {
            new BeamFnLoggingClient(PipelineOptionsFactory.fromArgs("--defaultSdkHarnessLogLevel=OFF", "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}").create(), build2, apiServiceDescriptor -> {
                return build4;
            }).close();
            Assert.assertEquals(Level.INFO, LogManager.getLogManager().getLogger("").getLevel());
            Assert.assertNull(LogManager.getLogManager().getLogger("ConfiguredLogger").getLevel());
            Assert.assertTrue(build4.isShutdown());
            build3.shutdownNow();
        } catch (Throwable th) {
            Assert.assertEquals(Level.INFO, LogManager.getLogManager().getLogger("").getLevel());
            Assert.assertNull(LogManager.getLogManager().getLogger("ConfiguredLogger").getLevel());
            Assert.assertTrue(build4.isShutdown());
            build3.shutdownNow();
            throw th;
        }
    }

    static {
        TEST_RECORD.setLoggerName("LoggerName");
        TEST_RECORD.setMillis(1234567890L);
        TEST_RECORD.setThreadID(12345);
        TEST_RECORD_WITH_EXCEPTION = new LogRecord(Level.WARNING, "MessageWithException");
        TEST_RECORD_WITH_EXCEPTION.setLoggerName("LoggerName");
        TEST_RECORD_WITH_EXCEPTION.setMillis(1234567890L);
        TEST_RECORD_WITH_EXCEPTION.setThreadID(12345);
        TEST_RECORD_WITH_EXCEPTION.setThrown(new RuntimeException("ExceptionMessage"));
        TEST_ENTRY = BeamFnApi.LogEntry.newBuilder().setSeverity(BeamFnApi.LogEntry.Severity.Enum.DEBUG).setMessage("Message").setThread(RemoteTestNG.DEBUG_PORT).setTimestamp(Timestamp.newBuilder().setSeconds(1234567L).setNanos(890000000).build()).setLogLocation("LoggerName").build();
        TEST_ENTRY_WITH_EXCEPTION = BeamFnApi.LogEntry.newBuilder().setSeverity(BeamFnApi.LogEntry.Severity.Enum.WARN).setMessage("MessageWithException").setTrace(Throwables.getStackTraceAsString(TEST_RECORD_WITH_EXCEPTION.getThrown())).setThread(RemoteTestNG.DEBUG_PORT).setTimestamp(Timestamp.newBuilder().setSeconds(1234567L).setNanos(890000000).build()).setLogLocation("LoggerName").build();
    }
}
