package uk.camsw.rx.test.kafka.dsl;

import java.util.UUID;
import kafka.message.MessageAndMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import uk.camsw.rx.test.dsl.given.BaseGiven;
import uk.camsw.rx.test.dsl.scenario.ExecutionContext;
import uk.camsw.rx.test.dsl.when.BaseWhen;
import uk.camsw.rx.test.kafka.KafkaEnv;
import uk.camsw.rx.test.kafka.Topic;
import uk.camsw.rx.test.kafka.TopicBuilder;

/* loaded from: input_file:uk/camsw/rx/test/kafka/dsl/KafkaSourceScenario.class */
public class KafkaSourceScenario<K, V, U> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaSourceScenario.class);
    private static final String KEY_TOPIC = KafkaSourceScenario.class.getSimpleName() + "_topic";
    private static final String KEY_ENV = KafkaSourceScenario.class.getSimpleName() + "_env";
    private final ExecutionContext<MessageAndMetadata<byte[], byte[]>, ?, U, Given<K, V, U>, When<K, V, U>> context = new ExecutionContext<>();

    /* loaded from: input_file:uk/camsw/rx/test/kafka/dsl/KafkaSourceScenario$Given.class */
    public static class Given<K, V, U> extends BaseGiven<U, Given<K, V, U>, When<K, V, U>> {
        private final ExecutionContext<MessageAndMetadata<byte[], byte[]>, ?, U, Given<K, V, U>, When<K, V, U>> context;

        public Given(ExecutionContext<MessageAndMetadata<byte[], byte[]>, ?, U, Given<K, V, U>, When<K, V, U>> executionContext) {
            super(executionContext);
            this.context = executionContext;
        }

        public Given<K, V, U> theStreamUnderTest(Func1<Topic<K, V>, Observable<U>> func1) {
            this.context.setStreamUnderTest((Observable) func1.call(this.context.get(KafkaSourceScenario.KEY_TOPIC)));
            return this;
        }

        /* renamed from: when, reason: merged with bridge method [inline-methods] */
        public When<K, V, U> m4when() {
            return new When<>(this.context);
        }

        public Given<K, V, U> theTopic(Func0<Topic<K, V>> func0) {
            this.context.put(KafkaSourceScenario.KEY_TOPIC, (Topic) func0.call());
            return this;
        }

        public Given<K, V, U> newTopic() {
            return aNewTopic();
        }

        public Given<K, V, U> aNewTopic(Action1<TopicBuilder> action1) {
            KafkaEnv kafkaEnv = (KafkaEnv) this.context.get(KafkaSourceScenario.KEY_ENV);
            theTopic(() -> {
                TopicBuilder<K, V> forTopic = TopicBuilder.newBuilder(kafkaEnv).forTopic("topic-" + UUID.randomUUID().toString());
                if (action1 != null) {
                    action1.call(forTopic);
                }
                Topic<K, V> build = forTopic.build();
                this.context.addFinally(executionContext -> {
                    try {
                        build.close();
                    } catch (Exception e) {
                        KafkaSourceScenario.logger.error("Failed to cleanup topic", e);
                    }
                });
                return build;
            });
            return this;
        }

        public Given<K, V, U> aNewTopic() {
            return aNewTopic(null);
        }

        public Given<K, V, U> kafkaEnvironment(KafkaEnv kafkaEnv) {
            this.context.put(KafkaSourceScenario.KEY_ENV, kafkaEnv);
            return this;
        }
    }

    /* loaded from: input_file:uk/camsw/rx/test/kafka/dsl/KafkaSourceScenario$When.class */
    public static class When<K, V, U> extends BaseWhen<U, When<K, V, U>> {
        private final ExecutionContext<MessageAndMetadata<byte[], byte[]>, ?, U, Given<K, V, U>, When<K, V, U>> context;

        public When(ExecutionContext<MessageAndMetadata<byte[], byte[]>, ?, U, Given<K, V, U>, When<K, V, U>> executionContext) {
            super(executionContext);
            this.context = executionContext;
        }

        public Producer<K, V, When<K, V, U>> theProducer() {
            return new Producer<>(this.context, (Topic) this.context.get(KafkaSourceScenario.KEY_TOPIC));
        }
    }

    public KafkaSourceScenario(KafkaEnv kafkaEnv) {
        Given given = new Given(this.context);
        this.context.initSteps(given, new When(this.context));
        given.kafkaEnvironment(kafkaEnv);
    }

    public Given<K, V, U> given() {
        return new Given<>(this.context);
    }
}
