package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.Seconds;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.class */
public class TestPubsub implements TestRule {
    private static final String EVENTS_TOPIC_NAME = "events";
    private static final String TOPIC_PREFIX = "integ-test-";
    private final TestPubsubOptions pipelineOptions;

    @Nullable
    private PubsubClient pubsub = null;

    @Nullable
    private PubsubClient.TopicPath eventsTopicPath = null;

    @Nullable
    private PubsubClient.SubscriptionPath subscriptionPath = null;
    private static final DateTimeFormatter DATETIME_FORMAT = DateTimeFormat.forPattern("YYYY-MM-dd-HH-mm-ss-SSS");
    private static final String NO_ID_ATTRIBUTE = null;
    private static final String NO_TIMESTAMP_ATTRIBUTE = null;
    private static final Integer DEFAULT_ACK_DEADLINE_SECONDS = 60;

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/TestPubsub$PollingAssertion.class */
    public interface PollingAssertion {
        void waitForUpTo(Duration duration) throws IOException, InterruptedException;
    }

    public static TestPubsub create() {
        return new TestPubsub((TestPubsubOptions) TestPipeline.testingPipelineOptions().as(TestPubsubOptions.class));
    }

    private TestPubsub(TestPubsubOptions testPubsubOptions) {
        this.pipelineOptions = testPubsubOptions;
    }

    public Statement apply(final Statement statement, final Description description) {
        return new Statement() { // from class: org.apache.beam.sdk.io.gcp.pubsub.TestPubsub.1
            public void evaluate() throws Throwable {
                if (TestPubsub.this.pubsub != null) {
                    throw new AssertionError("Pubsub client was not shutdown in previous test. Topic path is'" + TestPubsub.this.eventsTopicPath + "'. Current test: " + description.getDisplayName());
                }
                try {
                    TestPubsub.this.initializePubsub(description);
                    statement.evaluate();
                } finally {
                    TestPubsub.this.tearDown();
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void initializePubsub(Description description) throws IOException {
        this.pubsub = PubsubGrpcClient.FACTORY.newClient(NO_TIMESTAMP_ATTRIBUTE, NO_ID_ATTRIBUTE, this.pipelineOptions);
        PubsubClient.TopicPath topicPath = PubsubClient.topicPathFromName(this.pipelineOptions.getProject(), createTopicName(description, EVENTS_TOPIC_NAME));
        this.pubsub.createTopic(topicPath);
        this.eventsTopicPath = topicPath;
        this.subscriptionPath = this.pubsub.createRandomSubscription(PubsubClient.projectPathFromPath(String.format("projects/%s", this.pipelineOptions.getProject())), topicPath(), DEFAULT_ACK_DEADLINE_SECONDS.intValue());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void tearDown() throws IOException {
        if (this.pubsub == null) {
            return;
        }
        try {
            if (this.subscriptionPath != null) {
                this.pubsub.deleteSubscription(this.subscriptionPath);
            }
            if (this.eventsTopicPath != null) {
                this.pubsub.deleteTopic(this.eventsTopicPath);
            }
        } finally {
            this.pubsub.close();
            this.pubsub = null;
            this.eventsTopicPath = null;
            this.subscriptionPath = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String createTopicName(Description description, String str) throws IOException {
        StringBuilder sb = new StringBuilder(TOPIC_PREFIX);
        if (description.getClassName() != null) {
            try {
                sb.append(Class.forName(description.getClassName()).getSimpleName()).append("-");
            } catch (ClassNotFoundException e) {
                throw new RuntimeException(e);
            }
        }
        if (description.getMethodName() != null) {
            sb.append(description.getMethodName().replaceAll("[\\[\\]]", "")).append("-");
        }
        DATETIME_FORMAT.printTo(sb, Instant.now());
        return sb.toString() + "-" + str + "-" + String.valueOf(ThreadLocalRandom.current().nextLong());
    }

    public PubsubClient.TopicPath topicPath() {
        return this.eventsTopicPath;
    }

    public PubsubClient.SubscriptionPath subscriptionPath() {
        return this.subscriptionPath;
    }

    private List<PubsubClient.SubscriptionPath> listSubscriptions(PubsubClient.ProjectPath projectPath, PubsubClient.TopicPath topicPath) throws IOException {
        return this.pubsub.listSubscriptions(projectPath, topicPath);
    }

    public void publish(List<PubsubMessage> list) throws IOException {
        this.pubsub.publish(this.eventsTopicPath, (List) list.stream().map(this::toOutgoingMessage).collect(Collectors.toList()));
    }

    public List<PubsubMessage> pull() throws IOException {
        return pull(100);
    }

    public List<PubsubMessage> pull(int i) throws IOException {
        List<PubsubClient.IncomingMessage> pull = this.pubsub.pull(0L, this.subscriptionPath, i, true);
        if (!pull.isEmpty()) {
            this.pubsub.acknowledge(this.subscriptionPath, (List) pull.stream().map((v0) -> {
                return v0.ackId();
            }).collect(ImmutableList.toImmutableList()));
        }
        return (List) pull.stream().map(incomingMessage -> {
            return new PubsubMessage(incomingMessage.message().getData().toByteArray(), incomingMessage.message().getAttributesMap(), incomingMessage.recordId());
        }).collect(ImmutableList.toImmutableList());
    }

    public List<PubsubMessage> waitForNMessages(int i, Duration duration) throws IOException, InterruptedException {
        ArrayList arrayList = new ArrayList(i);
        DateTime dateTime = new DateTime();
        int seconds = duration.toStandardSeconds().getSeconds();
        arrayList.addAll(pull(i - arrayList.size()));
        while (arrayList.size() < i && Seconds.secondsBetween(dateTime, new DateTime()).getSeconds() < seconds) {
            Thread.sleep(1000L);
            arrayList.addAll(pull(i - arrayList.size()));
        }
        return arrayList;
    }

    public PollingAssertion assertThatTopicEventuallyReceives(Matcher<PubsubMessage>... matcherArr) {
        return duration -> {
            MatcherAssert.assertThat(waitForNMessages(matcherArr.length, duration), Matchers.containsInAnyOrder(matcherArr));
        };
    }

    public void checkIfAnySubscriptionExists(String str, Duration duration) throws InterruptedException, IllegalArgumentException, IOException, TimeoutException {
        int i;
        if (duration.getMillis() <= 0) {
            throw new IllegalArgumentException(String.format("timeoutDuration should be greater than 0", new Object[0]));
        }
        DateTime dateTime = new DateTime();
        int i2 = 0;
        while (true) {
            i = i2;
            if (i != 0 || Seconds.secondsBetween(new DateTime(), dateTime).getSeconds() >= duration.toStandardSeconds().getSeconds()) {
                break;
            }
            Thread.sleep(1000L);
            i2 = listSubscriptions(PubsubClient.projectPathFromPath(String.format("projects/%s", str)), topicPath()).size();
        }
        if (i <= 0) {
            throw new TimeoutException("Timed out when checking if topics exist for " + topicPath());
        }
    }

    private PubsubClient.OutgoingMessage toOutgoingMessage(PubsubMessage pubsubMessage) {
        return PubsubClient.OutgoingMessage.of(com.google.pubsub.v1.PubsubMessage.newBuilder().setData(ByteString.copyFrom(pubsubMessage.getPayload())).putAllAttributes(pubsubMessage.getAttributeMap()).build(), DateTime.now().getMillis(), (String) null);
    }
}
