package pl.touk.nussknacker.engine.javaexample;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.typesafe.config.Config;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import pl.touk.nussknacker.engine.api.CustomStreamTransformer;
import pl.touk.nussknacker.engine.api.ProcessListener;
import pl.touk.nussknacker.engine.api.Service;
import pl.touk.nussknacker.engine.api.exception.ExceptionHandlerFactory;
import pl.touk.nussknacker.engine.api.process.SinkFactory;
import pl.touk.nussknacker.engine.api.process.SourceFactory;
import pl.touk.nussknacker.engine.api.process.WithCategories;
import pl.touk.nussknacker.engine.api.signal.ProcessSignalSender;
import pl.touk.nussknacker.engine.api.test.TestParsingUtils;
import pl.touk.nussknacker.engine.example.LoggingExceptionHandlerFactory;
import pl.touk.nussknacker.engine.javaapi.process.ExpressionConfig;
import pl.touk.nussknacker.engine.javaapi.process.ProcessConfigCreator;
import pl.touk.nussknacker.engine.kafka.KafkaConfig;
import pl.touk.nussknacker.engine.kafka.KafkaSinkFactory;
import pl.touk.nussknacker.engine.kafka.KafkaSourceFactory;
import scala.Option;
import scala.collection.JavaConversions;

/* loaded from: input_file:pl/touk/nussknacker/engine/javaexample/ExampleProcessConfigCreator.class */
public class ExampleProcessConfigCreator implements ProcessConfigCreator {
    private <T> WithCategories<T> all(T t) {
        ArrayList arrayList = new ArrayList();
        arrayList.add("Recommendations");
        arrayList.add("FraudDetection");
        return new WithCategories<>(t, JavaConversions.collectionAsScalaIterable(arrayList).toList());
    }

    @Override // pl.touk.nussknacker.engine.javaapi.process.ProcessConfigCreator
    public Map<String, WithCategories<Service>> services(Config config) {
        HashMap hashMap = new HashMap();
        hashMap.put("clientService", all(new ClientService()));
        return hashMap;
    }

    @Override // pl.touk.nussknacker.engine.javaapi.process.ProcessConfigCreator
    public Map<String, WithCategories<SourceFactory<?>>> sourceFactories(Config config) {
        KafkaSourceFactory<Transaction> transactionKafkaSourceFactory = getTransactionKafkaSourceFactory(getKafkaConfig(config));
        HashMap hashMap = new HashMap();
        hashMap.put("kafka-transaction", all(transactionKafkaSourceFactory));
        return hashMap;
    }

    private KafkaSourceFactory<Transaction> getTransactionKafkaSourceFactory(KafkaConfig kafkaConfig) {
        return new KafkaSourceFactory<>(kafkaConfig, new DeserializationSchema<Transaction>() { // from class: pl.touk.nussknacker.engine.javaexample.ExampleProcessConfigCreator.2
            /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
            public Transaction m1740deserialize(byte[] bArr) throws IOException {
                return (Transaction) new ObjectMapper().readValue(bArr, Transaction.class);
            }

            public boolean isEndOfStream(Transaction transaction) {
                return false;
            }

            public TypeInformation<Transaction> getProducedType() {
                return TypeInformation.of(Transaction.class);
            }
        }, Option.apply(new BoundedOutOfOrdernessTimestampExtractor<Transaction>(Time.minutes(10L)) { // from class: pl.touk.nussknacker.engine.javaexample.ExampleProcessConfigCreator.1
            public long extractTimestamp(Transaction transaction) {
                return transaction.eventDate;
            }
        }), TestParsingUtils.newLineSplit(), TypeInformation.of(Transaction.class));
    }

    @Override // pl.touk.nussknacker.engine.javaapi.process.ProcessConfigCreator
    public Map<String, WithCategories<SinkFactory>> sinkFactories(Config config) {
        KafkaSinkFactory kafkaSinkFactory = new KafkaSinkFactory(getKafkaConfig(config), new KeyedSerializationSchema<Object>() { // from class: pl.touk.nussknacker.engine.javaexample.ExampleProcessConfigCreator.3
            @Override // org.apache.flink.streaming.util.serialization.KeyedSerializationSchema
            public byte[] serializeKey(Object obj) {
                return UUID.randomUUID().toString().getBytes();
            }

            @Override // org.apache.flink.streaming.util.serialization.KeyedSerializationSchema
            public byte[] serializeValue(Object obj) {
                if (obj instanceof String) {
                    return ((String) obj).getBytes();
                }
                throw new RuntimeException("Sorry, only strings");
            }

            @Override // org.apache.flink.streaming.util.serialization.KeyedSerializationSchema
            public String getTargetTopic(Object obj) {
                return null;
            }
        });
        HashMap hashMap = new HashMap();
        hashMap.put("kafka-stringSink", all(kafkaSinkFactory));
        return hashMap;
    }

    @Override // pl.touk.nussknacker.engine.javaapi.process.ProcessConfigCreator
    public Map<String, WithCategories<CustomStreamTransformer>> customStreamTransformers(Config config) {
        HashMap hashMap = new HashMap();
        hashMap.put("eventsCounter", all(new EventsCounter()));
        hashMap.put("transactionAmountAggregator", all(new TransactionAmountAggregator()));
        return hashMap;
    }

    @Override // pl.touk.nussknacker.engine.javaapi.process.ProcessConfigCreator
    public Map<String, WithCategories<ProcessSignalSender>> signals(Config config) {
        return Collections.emptyMap();
    }

    @Override // pl.touk.nussknacker.engine.javaapi.process.ProcessConfigCreator
    public Collection<ProcessListener> listeners(Config config) {
        return Collections.emptyList();
    }

    @Override // pl.touk.nussknacker.engine.javaapi.process.ProcessConfigCreator
    public ExceptionHandlerFactory exceptionHandlerFactory(Config config) {
        return new LoggingExceptionHandlerFactory();
    }

    @Override // pl.touk.nussknacker.engine.javaapi.process.ProcessConfigCreator
    public ExpressionConfig expressionConfig(Config config) {
        return new ExpressionConfig(Collections.singletonMap("UTIL", all(new UtilProcessHelper())), Collections.emptyList());
    }

    @Override // pl.touk.nussknacker.engine.javaapi.process.ProcessConfigCreator
    public Map<String, String> buildInfo() {
        return Collections.emptyMap();
    }

    private KafkaConfig getKafkaConfig(Config config) {
        return new KafkaConfig(config.getString("kafka.zkAddress"), config.getString("kafka.kafkaAddress"), Option.empty(), Option.empty());
    }
}
