package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.AutoValue_PubsubMessageToRow;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.RowJson;
import org.apache.beam.sdk.util.RowJsonUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Instant;

@Experimental
@Internal
@AutoValue
/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.class */
public abstract class PubsubMessageToRow extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> implements Serializable {
    static final String TIMESTAMP_FIELD = "event_timestamp";
    static final String ATTRIBUTES_FIELD = "attributes";
    static final String PAYLOAD_FIELD = "payload";
    static final TupleTag<PubsubMessage> DLQ_TAG = new TupleTag<PubsubMessage>() { // from class: org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubMessageToRow.1
    };
    static final TupleTag<Row> MAIN_TAG = new TupleTag<Row>() { // from class: org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubMessageToRow.2
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue.Builder
    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow$Builder.class */
    public static abstract class Builder {
        public abstract Builder messageSchema(Schema schema);

        public abstract Builder useDlq(boolean z);

        public abstract Builder useFlatSchema(boolean z);

        public abstract PubsubMessageToRow build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    @Internal
    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow$FlatSchemaPubsubMessageToRoW.class */
    public static class FlatSchemaPubsubMessageToRoW extends DoFn<PubsubMessage, Row> {
        private final Schema messageSchema;
        private final boolean useDlq;

        @Nullable
        private volatile transient ObjectMapper objectMapper;

        protected FlatSchemaPubsubMessageToRoW(Schema schema, boolean z) {
            this.messageSchema = schema;
            this.useDlq = z;
        }

        private Object getValueForFieldFlatSchema(Schema.Field field, Instant instant, Row row) {
            String name = field.getName();
            return PubsubMessageToRow.TIMESTAMP_FIELD.equals(name) ? instant : row.getValue(name);
        }

        private Row parsePayload(PubsubMessage pubsubMessage) {
            String str = new String(pubsubMessage.getPayload(), StandardCharsets.UTF_8);
            Schema schema = new Schema((List) this.messageSchema.getFields().stream().filter(field -> {
                return !field.getName().equals(PubsubMessageToRow.TIMESTAMP_FIELD);
            }).collect(Collectors.toList()));
            if (this.objectMapper == null) {
                this.objectMapper = RowJsonUtils.newObjectMapperWith(RowJson.RowJsonDeserializer.forSchema(schema));
            }
            return RowJsonUtils.jsonToRow(this.objectMapper, str);
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<PubsubMessage, Row>.ProcessContext processContext) {
            try {
                Row parsePayload = parsePayload((PubsubMessage) processContext.element());
                processContext.output(Row.withSchema(this.messageSchema).addValues((List) this.messageSchema.getFields().stream().map(field -> {
                    return getValueForFieldFlatSchema(field, processContext.timestamp(), parsePayload);
                }).collect(Collectors.toList())).build());
            } catch (RowJson.RowJsonDeserializer.UnsupportedRowJsonException e) {
                if (!this.useDlq) {
                    throw new RuntimeException("Error parsing message", e);
                }
                processContext.output(PubsubMessageToRow.DLQ_TAG, (PubsubMessage) processContext.element());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @Internal
    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow$NestedSchemaPubsubMessageToRow.class */
    public static class NestedSchemaPubsubMessageToRow extends DoFn<PubsubMessage, Row> {
        private final Schema messageSchema;
        private final boolean useDlq;

        @Nullable
        private volatile transient ObjectMapper objectMapper;

        protected NestedSchemaPubsubMessageToRow(Schema schema, boolean z) {
            this.messageSchema = schema;
            this.useDlq = z;
        }

        private Object getValueForFieldNestedSchema(Schema.Field field, Instant instant, Map<String, String> map, Row row) {
            String name = field.getName();
            boolean z = -1;
            switch (name.hashCode()) {
                case -786701938:
                    if (name.equals(PubsubMessageToRow.PAYLOAD_FIELD)) {
                        z = 2;
                        break;
                    }
                    break;
                case 405645655:
                    if (name.equals(PubsubMessageToRow.ATTRIBUTES_FIELD)) {
                        z = true;
                        break;
                    }
                    break;
                case 436051377:
                    if (name.equals(PubsubMessageToRow.TIMESTAMP_FIELD)) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    return instant;
                case true:
                    return map;
                case true:
                    return row;
                default:
                    throw new IllegalArgumentException("Unexpected field '" + field.getName() + "' in top level schema for Pubsub message. Top level schema should only contain 'timestamp', 'attributes', and 'payload' fields");
            }
        }

        private Row parsePayload(PubsubMessage pubsubMessage) {
            String str = new String(pubsubMessage.getPayload(), StandardCharsets.UTF_8);
            Schema rowSchema = this.messageSchema.getField(PubsubMessageToRow.PAYLOAD_FIELD).getType().getRowSchema();
            if (this.objectMapper == null) {
                this.objectMapper = RowJsonUtils.newObjectMapperWith(RowJson.RowJsonDeserializer.forSchema(rowSchema));
            }
            return RowJsonUtils.jsonToRow(this.objectMapper, str);
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<PubsubMessage, Row>.ProcessContext processContext) {
            try {
                Row parsePayload = parsePayload((PubsubMessage) processContext.element());
                processContext.output(Row.withSchema(this.messageSchema).addValues((List) this.messageSchema.getFields().stream().map(field -> {
                    return getValueForFieldNestedSchema(field, processContext.timestamp(), ((PubsubMessage) processContext.element()).getAttributeMap(), parsePayload);
                }).collect(Collectors.toList())).build());
            } catch (RowJson.RowJsonDeserializer.UnsupportedRowJsonException e) {
                if (!this.useDlq) {
                    throw new RuntimeException("Error parsing message", e);
                }
                processContext.output(PubsubMessageToRow.DLQ_TAG, (PubsubMessage) processContext.element());
            }
        }
    }

    public abstract Schema messageSchema();

    public abstract boolean useDlq();

    public abstract boolean useFlatSchema();

    public static Builder builder() {
        return new AutoValue_PubsubMessageToRow.Builder();
    }

    public PCollectionTuple expand(PCollection<PubsubMessage> pCollection) {
        return pCollection.apply(ParDo.of(useFlatSchema() ? new FlatSchemaPubsubMessageToRoW(messageSchema(), useDlq()) : new NestedSchemaPubsubMessageToRow(messageSchema(), useDlq())).withOutputTags(MAIN_TAG, useDlq() ? TupleTagList.of(DLQ_TAG) : TupleTagList.empty()));
    }
}
