package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;

import java.util.List;
import org.apache.beam.sdk.extensions.protobuf.ProtoMessageSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;

/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaProtoTable.class */
public class BeamKafkaProtoTable extends BeamKafkaTable {
    private final Class<?> protoClass;

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaProtoTable$ProtoRecorderDecoder.class */
    private static class ProtoRecorderDecoder extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>> {
        private final Schema schema;
        private final Class<?> clazz;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaProtoTable$ProtoRecorderDecoder$KvToBytes.class */
        public static class KvToBytes extends SimpleFunction<KV<byte[], byte[]>, byte[]> {
            private KvToBytes() {
            }

            @Override // org.apache.beam.sdk.transforms.SimpleFunction, org.apache.beam.sdk.transforms.InferableFunction, org.apache.beam.sdk.transforms.ProcessFunction
            public byte[] apply(KV<byte[], byte[]> kv) {
                return kv.getValue();
            }
        }

        ProtoRecorderDecoder(Schema schema, Class<?> cls) {
            this.schema = schema;
            this.clazz = cls;
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        public PCollection<Row> expand(PCollection<KV<byte[], byte[]>> pCollection) {
            return ((PCollection) ((PCollection) pCollection.apply("decodeProtoRecord", MapElements.via((SimpleFunction) new KvToBytes()))).apply("Map bytes to rows", MapElements.via(ProtoMessageSchema.getProtoBytesToRowFn(this.clazz)))).setRowSchema(this.schema);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaProtoTable$ProtoRecorderEncoder.class */
    private static class ProtoRecorderEncoder extends PTransform<PCollection<Row>, PCollection<KV<byte[], byte[]>>> {
        private final Class<?> clazz;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaProtoTable$ProtoRecorderEncoder$BytesToKV.class */
        public static class BytesToKV extends SimpleFunction<byte[], KV<byte[], byte[]>> {
            private BytesToKV() {
            }

            @Override // org.apache.beam.sdk.transforms.SimpleFunction, org.apache.beam.sdk.transforms.InferableFunction, org.apache.beam.sdk.transforms.ProcessFunction
            public KV<byte[], byte[]> apply(byte[] bArr) {
                return KV.of(new byte[0], bArr);
            }
        }

        public ProtoRecorderEncoder(Class<?> cls) {
            this.clazz = cls;
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        public PCollection<KV<byte[], byte[]>> expand(PCollection<Row> pCollection) {
            return (PCollection) ((PCollection) pCollection.apply("Encode proto bytes to row", MapElements.via(ProtoMessageSchema.getRowToProtoBytesFn(this.clazz)))).apply("Bytes to KV", MapElements.via((SimpleFunction) new BytesToKV()));
        }
    }

    public BeamKafkaProtoTable(Schema schema, String str, List<String> list, Class<?> cls) {
        super(inferAndVerifySchema(cls, schema), str, list);
        this.protoClass = cls;
    }

    @Override // org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaTable
    public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>> getPTransformForInput() {
        return new ProtoRecorderDecoder(this.schema, this.protoClass);
    }

    @Override // org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaTable
    public PTransform<PCollection<Row>, PCollection<KV<byte[], byte[]>>> getPTransformForOutput() {
        return new ProtoRecorderEncoder(this.protoClass);
    }

    private static Schema inferAndVerifySchema(Class<?> cls, Schema schema) {
        Schema schemaFor = new ProtoMessageSchema().schemaFor(TypeDescriptor.of((Class) cls));
        if (schema.equivalent(schemaFor)) {
            return schemaFor;
        }
        throw new IllegalArgumentException(String.format("Given message schema: '%s'%ndoes not match schema inferred from protobuf class.%nProtobuf class: '%s'%nInferred schema: '%s'", schema, cls.getName(), schemaFor));
    }
}
