package org.apache.hudi.streamer;

import java.lang.invoke.SerializedLambda;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperatorFactory;
import org.apache.hudi.sink.bootstrap.BootstrapFunction;
import org.apache.hudi.sink.compact.CompactFunction;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;

/* loaded from: input_file:org/apache/hudi/streamer/HoodieFlinkStreamer.class */
public class HoodieFlinkStreamer {
    public static void main(String[] strArr) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkStreamerConfig flinkStreamerConfig = new FlinkStreamerConfig();
        JCommander jCommander = new JCommander(flinkStreamerConfig, null, strArr);
        if (flinkStreamerConfig.help.booleanValue() || strArr.length == 0) {
            jCommander.usage();
            System.exit(1);
        }
        executionEnvironment.enableCheckpointing(flinkStreamerConfig.checkpointInterval.longValue());
        executionEnvironment.getConfig().setGlobalJobParameters(flinkStreamerConfig);
        executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        if (flinkStreamerConfig.flinkCheckPointPath != null) {
            executionEnvironment.setStateBackend(new FsStateBackend(flinkStreamerConfig.flinkCheckPointPath));
        }
        TypedProperties appendKafkaProps = StreamerUtil.appendKafkaProps(flinkStreamerConfig);
        RowType logicalType = AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(flinkStreamerConfig)).getLogicalType();
        Configuration flinkConfig = FlinkStreamerConfig.toFlinkConfig(flinkStreamerConfig);
        long checkpointTimeout = executionEnvironment.getCheckpointConfig().getCheckpointTimeout();
        int parallelism = executionEnvironment.getParallelism();
        flinkConfig.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, checkpointTimeout);
        StreamWriteOperatorFactory streamWriteOperatorFactory = new StreamWriteOperatorFactory(flinkConfig);
        DataStream<RowData> uid = executionEnvironment.addSource(new FlinkKafkaConsumer(flinkStreamerConfig.kafkaTopic, new JsonRowDataDeserializationSchema(logicalType, InternalTypeInfo.of(logicalType), false, true, TimestampFormat.ISO_8601), appendKafkaProps)).name("kafka_source").uid("uid_kafka_source");
        if (flinkStreamerConfig.transformerClassNames != null && !flinkStreamerConfig.transformerClassNames.isEmpty()) {
            Option<Transformer> createTransformer = StreamerUtil.createTransformer(flinkStreamerConfig.transformerClassNames);
            if (createTransformer.isPresent()) {
                uid = createTransformer.get().apply(uid);
            }
        }
        DataStream map = uid.map(RowDataToHoodieFunctions.create(logicalType, flinkConfig), TypeInformation.of(HoodieRecord.class));
        if (flinkConfig.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
            map = map.rebalance().transform("index_bootstrap", TypeInformation.of(HoodieRecord.class), new ProcessOperator(new BootstrapFunction(flinkConfig))).setParallelism(((Integer) flinkConfig.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(Integer.valueOf(parallelism))).intValue()).uid("uid_index_bootstrap_" + flinkConfig.getString(FlinkOptions.TABLE_NAME));
        }
        SingleOutputStreamOperator parallelism2 = map.keyBy((v0) -> {
            return v0.getRecordKey();
        }).transform("bucket_assigner", TypeInformation.of(HoodieRecord.class), new BucketAssignOperator(new BucketAssignFunction(flinkConfig))).uid("uid_bucket_assigner" + flinkConfig.getString(FlinkOptions.TABLE_NAME)).setParallelism(((Integer) flinkConfig.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(Integer.valueOf(parallelism))).intValue()).keyBy(hoodieRecord -> {
            return hoodieRecord.getCurrentLocation().getFileId();
        }).transform("hoodie_stream_write", TypeInformation.of(Object.class), streamWriteOperatorFactory).uid("uid_hoodie_stream_write" + flinkConfig.getString(FlinkOptions.TABLE_NAME)).setParallelism(flinkConfig.getInteger(FlinkOptions.WRITE_TASKS));
        if (StreamerUtil.needsAsyncCompaction(flinkConfig)) {
            parallelism2.transform("compact_plan_generate", TypeInformation.of(CompactionPlanEvent.class), new CompactionPlanOperator(flinkConfig)).uid("uid_compact_plan_generate").setParallelism(1).rebalance().transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), new ProcessOperator(new CompactFunction(flinkConfig))).setParallelism(flinkConfig.getInteger(FlinkOptions.COMPACTION_TASKS)).addSink(new CompactionCommitSink(flinkConfig)).name("compact_commit").setParallelism(1);
        } else {
            parallelism2.addSink(new CleanFunction(flinkConfig)).setParallelism(1).name("clean_commits").uid("uid_clean_commits");
        }
        executionEnvironment.execute(flinkStreamerConfig.targetTableName);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1040917256:
                if (implMethodName.equals("lambda$main$4ab55ee2$1")) {
                    z = false;
                    break;
                }
                break;
            case 983924120:
                if (implMethodName.equals("getRecordKey")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/streamer/HoodieFlinkStreamer") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/hudi/common/model/HoodieRecord;)Ljava/lang/String;")) {
                    return hoodieRecord -> {
                        return hoodieRecord.getCurrentLocation().getFileId();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/common/model/HoodieRecord") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/String;")) {
                    return (v0) -> {
                        return v0.getRecordKey();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
