package org.apache.gobblin.iceberg.writer;

import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.gobblin.completeness.verifier.KafkaAuditCountVerifier;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist;
import org.apache.gobblin.hive.AutoCloseableHiveLock;
import org.apache.gobblin.hive.HiveLock;
import org.apache.gobblin.hive.HivePartition;
import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
import org.apache.gobblin.hive.spec.HiveSpec;
import org.apache.gobblin.hive.writer.MetadataWriter;
import org.apache.gobblin.iceberg.Utils.IcebergUtils;
import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
import org.apache.gobblin.metadata.OperationType;
import org.apache.gobblin.metadata.SchemaSource;
import org.apache.gobblin.metrics.GobblinMetricsRegistry;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.time.TimeIterator;
import org.apache.gobblin.util.AvroUtils;
import org.apache.gobblin.util.ClustersNames;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.util.WriterUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.FindFiles;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hive.HiveCatalogs;
import org.apache.iceberg.shaded.org.apache.avro.Schema;
import org.apache.iceberg.types.Types;
import org.joda.time.DateTime;
import org.joda.time.format.PeriodFormatterBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.class */
public class IcebergMetadataWriter implements MetadataWriter {
    public static final String USE_DATA_PATH_AS_TABLE_LOCATION = "use.data.path.as.table.location";
    public static final String TABLE_LOCATION_SUFFIX = "/_iceberg_metadata/%s";
    public static final String GMCE_HIGH_WATERMARK_KEY = "gmce.high.watermark.%s";
    public static final String GMCE_LOW_WATERMARK_KEY = "gmce.low.watermark.%s";
    private static final String EXPIRE_SNAPSHOTS_LOOKBACK_TIME = "gobblin.iceberg.dataset.expire.snapshots.lookBackTime";
    private static final String DEFAULT_EXPIRE_SNAPSHOTS_LOOKBACK_TIME = "3d";
    private static final String ICEBERG_REGISTRATION_BLACKLIST = "iceberg.registration.blacklist";
    private static final String ICEBERG_REGISTRATION_WHITELIST = "iceberg.registration.whitelist";
    private static final String ICEBERG_REGISTRATION_AUDIT_COUNT_BLACKLIST = "iceberg.registration.audit.count.blacklist";
    private static final String ICEBERG_REGISTRATION_AUDIT_COUNT_WHITELIST = "iceberg.registration.audit.count.whitelist";
    private static final String ICEBERG_METADATA_FILE_PERMISSION = "iceberg.metadata.file.permission";
    private static final String CREATE_TABLE_TIME = "iceberg.create.table.time";
    private static final String SCHEMA_CREATION_TIME_KEY = "schema.creation.time";
    private static final String ADDED_FILES_CACHE_EXPIRING_TIME = "added.files.cache.expiring.time";
    private static final int DEFAULT_ADDED_FILES_CACHE_EXPIRING_TIME = 1;
    private static final String OFFSET_RANGE_KEY_PREFIX = "offset.range.";
    private static final String OFFSET_RANGE_KEY_FORMAT = "offset.range.%s";
    private static final String DEFAULT_CREATION_TIME = "0";
    private static final String SNAPSHOT_EXPIRE_THREADS = "snapshot.expire.threads";
    private static final long DEFAULT_WATERMARK = -1;
    private final boolean completenessEnabled;
    private final WhitelistBlacklist completenessWhitelistBlacklist;
    private final String timeZone;
    private final DateTimeFormatter HOURLY_DATEPARTITION_FORMAT;
    private final String newPartitionColumn;
    private final String newPartitionColumnType;
    private final boolean newPartitionEnabled;
    private final WhitelistBlacklist newPartitionTableWhitelistBlacklist;
    private Optional<KafkaAuditCountVerifier> auditCountVerifier;
    private final String auditCheckGranularity;
    protected final MetricContext metricContext;
    protected EventSubmitter eventSubmitter;
    private final WhitelistBlacklist whitelistBlacklist;
    private final WhitelistBlacklist auditWhitelistBlacklist;
    private final Closer closer = Closer.create();
    private final Map<TableIdentifier, Long> tableCurrentWatermarkMap;
    private final Map<TableIdentifier, String> tableTopicPartitionMap;
    private final KafkaSchemaRegistry schemaRegistry;
    protected final Map<TableIdentifier, TableMetadata> tableMetadataMap;
    protected Catalog catalog;
    protected final Configuration conf;
    protected final ReadWriteLock readWriteLock;
    private final HiveLock locks;
    private final boolean useDataLocationAsTableLocation;
    private final ParallelRunner parallelRunner;
    private FsPermission permission;
    protected State state;
    private static final Logger log = LoggerFactory.getLogger(IcebergMetadataWriter.class);
    private static final String ICEBERG_FILE_PATH_COLUMN = DataFile.FILE_PATH.name();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.gobblin.iceberg.writer.IcebergMetadataWriter$3, reason: invalid class name */
    /* loaded from: input_file:org/apache/gobblin/iceberg/writer/IcebergMetadataWriter$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$gobblin$metadata$OperationType;
        static final /* synthetic */ int[] $SwitchMap$org$apache$gobblin$metadata$SchemaSource = new int[SchemaSource.values().length];

        static {
            try {
                $SwitchMap$org$apache$gobblin$metadata$SchemaSource[SchemaSource.SCHEMAREGISTRY.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$gobblin$metadata$SchemaSource[SchemaSource.EVENT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$gobblin$metadata$SchemaSource[SchemaSource.NONE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$org$apache$gobblin$metadata$OperationType = new int[OperationType.values().length];
            try {
                $SwitchMap$org$apache$gobblin$metadata$OperationType[OperationType.add_files.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$gobblin$metadata$OperationType[OperationType.rewrite_files.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$gobblin$metadata$OperationType[OperationType.drop_files.ordinal()] = 3;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$gobblin$metadata$OperationType[OperationType.change_property.ordinal()] = 4;
            } catch (NoSuchFieldError e7) {
            }
        }
    }

    /* loaded from: input_file:org/apache/gobblin/iceberg/writer/IcebergMetadataWriter$TableMetadata.class */
    public class TableMetadata {
        String datasetName;
        boolean completenessEnabled;
        boolean newPartitionColumnEnabled;
        Cache<CharSequence, String> addedFiles;
        Optional<Table> table = Optional.absent();
        Optional<Transaction> transaction = Optional.absent();
        private Optional<AppendFiles> appendFiles = Optional.absent();
        private Optional<DeleteFiles> deleteFiles = Optional.absent();
        public Optional<Map<String, String>> newProperties = Optional.absent();
        Optional<Map<String, String>> lastProperties = Optional.absent();
        Optional<Cache<String, Pair<Schema, String>>> candidateSchemas = Optional.absent();
        Optional<Map<String, List<Range>>> dataOffsetRange = Optional.absent();
        Optional<String> lastSchemaVersion = Optional.absent();
        Optional<Long> lowWatermark = Optional.absent();
        long completionWatermark = -1;
        SortedSet<ZonedDateTime> datePartitions = new TreeSet(Collections.reverseOrder());
        List<String> serializedAuditCountMaps = new ArrayList();
        long lowestGMCEEmittedTime = Long.MAX_VALUE;

        public TableMetadata() {
            this.addedFiles = CacheBuilder.newBuilder().expireAfterAccess(IcebergMetadataWriter.this.conf.getInt(IcebergMetadataWriter.ADDED_FILES_CACHE_EXPIRING_TIME, 1), TimeUnit.HOURS).build();
        }

        AppendFiles getOrInitAppendFiles() {
            ensureTxnInit();
            if (!this.appendFiles.isPresent()) {
                this.appendFiles = Optional.of(((Transaction) this.transaction.get()).newAppend());
            }
            return (AppendFiles) this.appendFiles.get();
        }

        DeleteFiles getOrInitDeleteFiles() {
            ensureTxnInit();
            if (!this.deleteFiles.isPresent()) {
                this.deleteFiles = Optional.of(((Transaction) this.transaction.get()).newDelete());
            }
            return (DeleteFiles) this.deleteFiles.get();
        }

        void ensureTxnInit() {
            if (this.transaction.isPresent()) {
                return;
            }
            this.transaction = Optional.of(((Table) this.table.get()).newTransaction());
        }

        void reset(Map<String, String> map, Long l) {
            this.lastProperties = Optional.of(map);
            this.lastSchemaVersion = Optional.of(map.get(IcebergMetadataWriter.SCHEMA_CREATION_TIME_KEY));
            this.transaction = Optional.absent();
            this.deleteFiles = Optional.absent();
            this.appendFiles = Optional.absent();
            if (this.candidateSchemas.isPresent()) {
                ((Cache) this.candidateSchemas.get()).cleanUp();
            }
            this.candidateSchemas = Optional.absent();
            this.dataOffsetRange = Optional.absent();
            this.newProperties = Optional.absent();
            this.lowestGMCEEmittedTime = Long.MAX_VALUE;
            this.lowWatermark = Optional.of(l);
            this.datePartitions.clear();
            this.serializedAuditCountMaps.clear();
        }

        public void setDatasetName(String str) {
            this.datasetName = str;
        }
    }

    public IcebergMetadataWriter(State state) throws IOException {
        this.state = state;
        this.schemaRegistry = KafkaSchemaRegistry.get(state.getProperties());
        this.conf = HadoopUtils.getConfFromState(state);
        initializeCatalog();
        this.tableTopicPartitionMap = new HashMap();
        this.tableMetadataMap = new HashMap();
        this.tableCurrentWatermarkMap = new HashMap();
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(new Tag("clusterIdentifier", ClustersNames.getInstance().getClusterName()));
        this.metricContext = this.closer.register(GobblinMetricsRegistry.getInstance().getMetricContext(state, IcebergMetadataWriter.class, newArrayList));
        this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, "IcebergWriter").build();
        this.whitelistBlacklist = new WhitelistBlacklist(state.getProp(ICEBERG_REGISTRATION_WHITELIST, ""), state.getProp(ICEBERG_REGISTRATION_BLACKLIST, ""));
        this.auditWhitelistBlacklist = new WhitelistBlacklist(state.getProp(ICEBERG_REGISTRATION_AUDIT_COUNT_WHITELIST, ""), state.getProp(ICEBERG_REGISTRATION_AUDIT_COUNT_BLACKLIST, ""));
        this.readWriteLock = new ReentrantReadWriteLock();
        this.locks = new HiveLock(state.getProperties());
        this.parallelRunner = this.closer.register(new ParallelRunner(state.getPropAsInt(SNAPSHOT_EXPIRE_THREADS, 20), FileSystem.get(HadoopUtils.getConfFromState(state))));
        this.useDataLocationAsTableLocation = state.getPropAsBoolean(USE_DATA_PATH_AS_TABLE_LOCATION, false);
        if (this.useDataLocationAsTableLocation) {
            this.permission = HadoopUtils.deserializeFsPermission(state, ICEBERG_METADATA_FILE_PERMISSION, FsPermission.getDefault());
        }
        this.completenessEnabled = state.getPropAsBoolean(IcebergMetadataWriterConfigKeys.ICEBERG_COMPLETENESS_ENABLED, false);
        this.completenessWhitelistBlacklist = new WhitelistBlacklist(state.getProp(IcebergMetadataWriterConfigKeys.ICEBERG_COMPLETENESS_WHITELIST, ""), state.getProp(IcebergMetadataWriterConfigKeys.ICEBERG_COMPLETENESS_BLACKLIST, ""));
        this.timeZone = state.getProp(IcebergMetadataWriterConfigKeys.TIME_ZONE_KEY, IcebergMetadataWriterConfigKeys.DEFAULT_TIME_ZONE);
        this.HOURLY_DATEPARTITION_FORMAT = DateTimeFormatter.ofPattern(IcebergMetadataWriterConfigKeys.DATEPARTITION_FORMAT).withZone(ZoneId.of(this.timeZone));
        this.auditCountVerifier = Optional.fromNullable(this.completenessEnabled ? new KafkaAuditCountVerifier(state) : null);
        this.newPartitionColumn = state.getProp(IcebergMetadataWriterConfigKeys.NEW_PARTITION_KEY, IcebergMetadataWriterConfigKeys.DEFAULT_NEW_PARTITION);
        this.newPartitionColumnType = state.getProp(IcebergMetadataWriterConfigKeys.NEW_PARTITION_TYPE_KEY, IcebergMetadataWriterConfigKeys.DEFAULT_PARTITION_COLUMN_TYPE);
        this.newPartitionEnabled = state.getPropAsBoolean(IcebergMetadataWriterConfigKeys.ICEBERG_NEW_PARTITION_ENABLED, false);
        this.newPartitionTableWhitelistBlacklist = new WhitelistBlacklist(state.getProp(IcebergMetadataWriterConfigKeys.ICEBERG_NEW_PARTITION_WHITELIST, ""), state.getProp(IcebergMetadataWriterConfigKeys.ICEBERG_NEW_PARTITION_BLACKLIST, ""));
        this.auditCheckGranularity = state.getProp(IcebergMetadataWriterConfigKeys.AUDIT_CHECK_GRANULARITY, IcebergMetadataWriterConfigKeys.DEFAULT_AUDIT_CHECK_GRANULARITY);
    }

    @VisibleForTesting
    protected void setAuditCountVerifier(KafkaAuditCountVerifier kafkaAuditCountVerifier) {
        this.auditCountVerifier = Optional.of(kafkaAuditCountVerifier);
    }

    protected void initializeCatalog() {
        this.catalog = HiveCatalogs.loadCatalog(this.conf);
    }

    private Table getIcebergTable(TableIdentifier tableIdentifier) throws NoSuchTableException {
        TableMetadata computeIfAbsent = this.tableMetadataMap.computeIfAbsent(tableIdentifier, tableIdentifier2 -> {
            return new TableMetadata();
        });
        if (!computeIfAbsent.table.isPresent()) {
            computeIfAbsent.table = Optional.of(this.catalog.loadTable(tableIdentifier));
        }
        return (Table) computeIfAbsent.table.get();
    }

    private Long getAndPersistCurrentWatermark(TableIdentifier tableIdentifier, String str) {
        if (this.tableCurrentWatermarkMap.containsKey(tableIdentifier)) {
            return this.tableCurrentWatermarkMap.get(tableIdentifier);
        }
        try {
            Table icebergTable = getIcebergTable(tableIdentifier);
            return Long.valueOf(icebergTable.properties().containsKey(String.format(GMCE_HIGH_WATERMARK_KEY, str)) ? Long.parseLong((String) icebergTable.properties().get(String.format(GMCE_HIGH_WATERMARK_KEY, str))) : -1L);
        } catch (NoSuchTableException e) {
            return -1L;
        }
    }

    public void write(GobblinMetadataChangeEvent gobblinMetadataChangeEvent, Map<String, Collection<HiveSpec>> map, Map<String, Collection<HiveSpec>> map2, HiveSpec hiveSpec) throws IOException {
        Table createTable;
        TableIdentifier of = TableIdentifier.of(new String[]{hiveSpec.getTable().getDbName(), hiveSpec.getTable().getTableName()});
        TableMetadata computeIfAbsent = this.tableMetadataMap.computeIfAbsent(of, tableIdentifier -> {
            return new TableMetadata();
        });
        try {
            createTable = getIcebergTable(of);
        } catch (NoSuchTableException e) {
            try {
                if (gobblinMetadataChangeEvent.getOperationType() == OperationType.drop_files || gobblinMetadataChangeEvent.getOperationType() == OperationType.change_property) {
                    log.warn("Table {} does not exist, skip processing this {} event", of.toString(), gobblinMetadataChangeEvent.getOperationType());
                    return;
                } else {
                    createTable = createTable(gobblinMetadataChangeEvent, hiveSpec);
                    computeIfAbsent.table = Optional.of(createTable);
                }
            } catch (Exception e2) {
                log.error("skip processing {} for table {}.{} due to error when creating table", new Object[]{gobblinMetadataChangeEvent.toString(), hiveSpec.getTable().getDbName(), hiveSpec.getTable().getTableName()});
                log.debug(e2.toString());
                return;
            }
        }
        if (computeIfAbsent.completenessEnabled) {
            computeIfAbsent.completionWatermark = Long.parseLong((String) createTable.properties().getOrDefault(IcebergMetadataWriterConfigKeys.COMPLETION_WATERMARK_KEY, String.valueOf(-1L)));
        }
        computeCandidateSchema(gobblinMetadataChangeEvent, of, hiveSpec);
        computeIfAbsent.ensureTxnInit();
        computeIfAbsent.lowestGMCEEmittedTime = Long.min(computeIfAbsent.lowestGMCEEmittedTime, gobblinMetadataChangeEvent.getGMCEmittedTime().longValue());
        switch (AnonymousClass3.$SwitchMap$org$apache$gobblin$metadata$OperationType[gobblinMetadataChangeEvent.getOperationType().ordinal()]) {
            case 1:
                updateTableProperty(hiveSpec, of, gobblinMetadataChangeEvent);
                addFiles(gobblinMetadataChangeEvent, map, createTable, computeIfAbsent);
                if (gobblinMetadataChangeEvent.getAuditCountMap() != null && this.auditWhitelistBlacklist.acceptTable(hiveSpec.getTable().getDbName(), hiveSpec.getTable().getTableName())) {
                    computeIfAbsent.serializedAuditCountMaps.add(gobblinMetadataChangeEvent.getAuditCountMap());
                }
                if (gobblinMetadataChangeEvent.getTopicPartitionOffsetsRange() != null) {
                    mergeOffsets(gobblinMetadataChangeEvent, of);
                    return;
                }
                return;
            case 2:
                updateTableProperty(hiveSpec, of, gobblinMetadataChangeEvent);
                rewriteFiles(gobblinMetadataChangeEvent, map, map2, createTable, computeIfAbsent);
                return;
            case 3:
                dropFiles(gobblinMetadataChangeEvent, map2, createTable, computeIfAbsent, of);
                return;
            case 4:
                updateTableProperty(hiveSpec, of, gobblinMetadataChangeEvent);
                if (gobblinMetadataChangeEvent.getTopicPartitionOffsetsRange() != null) {
                    mergeOffsets(gobblinMetadataChangeEvent, of);
                }
                log.info("No file operation need to be performed by Iceberg Metadata Writer at this point.");
                return;
            default:
                log.error("unsupported operation {}", gobblinMetadataChangeEvent.getOperationType().toString());
                return;
        }
    }

    private HashMap<String, List<Range>> getLastOffset(TableMetadata tableMetadata) {
        HashMap<String, List<Range>> hashMap = new HashMap<>();
        if (tableMetadata.lastProperties.isPresent()) {
            for (Map.Entry entry : ((Map) tableMetadata.lastProperties.get()).entrySet()) {
                if (((String) entry.getKey()).startsWith(OFFSET_RANGE_KEY_PREFIX)) {
                    hashMap.put(((String) entry.getKey()).substring(OFFSET_RANGE_KEY_PREFIX.length()), (List) Arrays.asList(((String) entry.getValue()).split(",")).stream().map(str -> {
                        List splitToList = Splitter.on("-").splitToList(str);
                        return Range.openClosed(Long.valueOf(Long.parseLong((String) splitToList.get(0))), Long.valueOf(Long.parseLong((String) splitToList.get(1))));
                    }).collect(Collectors.toList()));
                }
            }
        }
        return hashMap;
    }

    private void mergeOffsets(GobblinMetadataChangeEvent gobblinMetadataChangeEvent, TableIdentifier tableIdentifier) {
        TableMetadata computeIfAbsent = this.tableMetadataMap.computeIfAbsent(tableIdentifier, tableIdentifier2 -> {
            return new TableMetadata();
        });
        computeIfAbsent.dataOffsetRange = Optional.of(computeIfAbsent.dataOffsetRange.or(() -> {
            return getLastOffset(computeIfAbsent);
        }));
        Map map = (Map) computeIfAbsent.dataOffsetRange.get();
        for (Map.Entry entry : gobblinMetadataChangeEvent.getTopicPartitionOffsetsRange().entrySet()) {
            List splitToList = Splitter.on("-").splitToList((CharSequence) entry.getValue());
            Range openClosed = Range.openClosed(Long.valueOf(Long.parseLong((String) splitToList.get(0))), Long.valueOf(Long.parseLong((String) splitToList.get(1))));
            if (!openClosed.lowerEndpoint().equals(openClosed.upperEndpoint())) {
                List<Range> list = (List) map.getOrDefault(entry.getKey(), new ArrayList());
                ArrayList arrayList = new ArrayList();
                for (Range range : list) {
                    if (openClosed.isConnected(range)) {
                        openClosed = openClosed.span(range);
                    } else {
                        arrayList.add(range);
                    }
                }
                arrayList.add(openClosed);
                Collections.sort(arrayList, new Comparator<Range>() { // from class: org.apache.gobblin.iceberg.writer.IcebergMetadataWriter.1
                    @Override // java.util.Comparator
                    public int compare(Range range2, Range range3) {
                        return range2.lowerEndpoint().compareTo(range3.lowerEndpoint());
                    }
                });
                map.put(entry.getKey(), arrayList);
            }
        }
    }

    protected void updateTableProperty(HiveSpec hiveSpec, TableIdentifier tableIdentifier, GobblinMetadataChangeEvent gobblinMetadataChangeEvent) {
        org.apache.hadoop.hive.metastore.api.Table table = HiveMetaStoreUtils.getTable(hiveSpec.getTable());
        TableMetadata computeIfAbsent = this.tableMetadataMap.computeIfAbsent(tableIdentifier, tableIdentifier2 -> {
            return new TableMetadata();
        });
        computeIfAbsent.newProperties = Optional.of(IcebergUtils.getTableProperties(table));
        String str = computeIfAbsent.datasetName;
        ((Map) computeIfAbsent.newProperties.get()).put(IcebergMetadataWriterConfigKeys.TOPIC_NAME_KEY, str.substring(str.lastIndexOf("/") + 1));
    }

    private void computeCandidateSchema(GobblinMetadataChangeEvent gobblinMetadataChangeEvent, TableIdentifier tableIdentifier, HiveSpec hiveSpec) {
        Table icebergTable = getIcebergTable(tableIdentifier);
        TableMetadata computeIfAbsent = this.tableMetadataMap.computeIfAbsent(tableIdentifier, tableIdentifier2 -> {
            return new TableMetadata();
        });
        org.apache.hadoop.hive.metastore.api.Table table = HiveMetaStoreUtils.getTable(hiveSpec.getTable());
        computeIfAbsent.lastProperties = Optional.of(computeIfAbsent.lastProperties.or(() -> {
            return icebergTable.properties();
        }));
        Map map = (Map) computeIfAbsent.lastProperties.get();
        computeIfAbsent.lastSchemaVersion = Optional.of(computeIfAbsent.lastSchemaVersion.or(() -> {
            return (String) map.getOrDefault(SCHEMA_CREATION_TIME_KEY, DEFAULT_CREATION_TIME);
        }));
        String str = (String) computeIfAbsent.lastSchemaVersion.get();
        computeIfAbsent.candidateSchemas = Optional.of(computeIfAbsent.candidateSchemas.or(() -> {
            return CacheBuilder.newBuilder().expireAfterAccess(this.conf.getInt("GMCEWriter.cache.expiring.time.hours", 1), TimeUnit.HOURS).build();
        }));
        Cache cache = (Cache) computeIfAbsent.candidateSchemas.get();
        try {
            switch (AnonymousClass3.$SwitchMap$org$apache$gobblin$metadata$SchemaSource[gobblinMetadataChangeEvent.getSchemaSource().ordinal()]) {
                case 1:
                    String schemaCreationTime = AvroUtils.getSchemaCreationTime(new Schema.Parser().parse(gobblinMetadataChangeEvent.getTableSchema()));
                    if (schemaCreationTime != null) {
                        if (!schemaCreationTime.equals(str)) {
                            cache.put(schemaCreationTime, Pair.of(IcebergUtils.getIcebergSchema(gobblinMetadataChangeEvent.getTableSchema(), table).tableSchema, gobblinMetadataChangeEvent.getTableSchema()));
                            break;
                        }
                    } else {
                        cache.put(DEFAULT_CREATION_TIME, Pair.of(IcebergUtils.getIcebergSchema(gobblinMetadataChangeEvent.getTableSchema(), table).tableSchema, gobblinMetadataChangeEvent.getTableSchema()));
                        break;
                    }
                    break;
                case 2:
                    cache.put(DEFAULT_CREATION_TIME, Pair.of(IcebergUtils.getIcebergSchema(gobblinMetadataChangeEvent.getTableSchema(), table).tableSchema, gobblinMetadataChangeEvent.getTableSchema()));
                    break;
                case 3:
                    log.debug("Schema source set to be none, will ignore the schema");
                    break;
                default:
                    throw new IOException(String.format("unsupported schema source %s", gobblinMetadataChangeEvent.getSchemaSource()));
            }
        } catch (Exception e) {
            log.error("Cannot get candidate schema from event due to", e);
        }
    }

    private Table addPartitionToIcebergTable(Table table, String str, String str2) {
        if (!table.schema().columns().stream().anyMatch(nestedField -> {
            return nestedField.name().equalsIgnoreCase(str);
        })) {
            table.updateSchema().addColumn(str, Types.fromPrimitiveString(str2)).commit();
        }
        if (!table.spec().fields().stream().anyMatch(partitionField -> {
            return partitionField.name().equalsIgnoreCase(str);
        })) {
            table.updateSpec().addField(str).commit();
        }
        table.refresh();
        return table;
    }

    protected Table createTable(GobblinMetadataChangeEvent gobblinMetadataChangeEvent, HiveSpec hiveSpec) throws IOException {
        Timer.Context time;
        Throwable th;
        String tableSchema = gobblinMetadataChangeEvent.getTableSchema();
        org.apache.hadoop.hive.metastore.api.Table table = HiveMetaStoreUtils.getTable(hiveSpec.getTable());
        IcebergUtils.IcebergDataAndPartitionSchema icebergSchema = IcebergUtils.getIcebergSchema(tableSchema, table);
        TableIdentifier of = TableIdentifier.of(new String[]{table.getDbName(), table.getTableName()});
        org.apache.iceberg.Schema schema = icebergSchema.tableSchema;
        Preconditions.checkState(schema != null, "Table schema cannot be null when creating a table");
        PartitionSpec partitionSpec = IcebergUtils.getPartitionSpec(schema, icebergSchema.partitionSchema);
        Table table2 = null;
        String str = null;
        if (this.useDataLocationAsTableLocation) {
            str = gobblinMetadataChangeEvent.getDatasetIdentifier().getNativeName() + String.format(TABLE_LOCATION_SUFFIX, table.getDbName());
            Path path = new Path(str);
            WriterUtils.mkdirsWithRecursivePermission(path.getFileSystem(this.conf), path, this.permission);
        }
        try {
            time = this.metricContext.timer(CREATE_TABLE_TIME).time();
            th = null;
        } catch (AlreadyExistsException e) {
            log.warn("table {} already exist, there may be some other process try to create table concurrently", of);
        }
        try {
            try {
                table2 = this.catalog.createTable(of, schema, partitionSpec, str, IcebergUtils.getTableProperties(table));
                table2.updateProperties().set(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), tableSchema).commit();
                log.info("Created table {}, schema: {} partition spec: {}", new Object[]{of, schema, partitionSpec});
                if (time != null) {
                    if (0 != 0) {
                        try {
                            time.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        time.close();
                    }
                }
                return table2;
            } finally {
            }
        } finally {
        }
    }

    protected void rewriteFiles(GobblinMetadataChangeEvent gobblinMetadataChangeEvent, Map<String, Collection<HiveSpec>> map, Map<String, Collection<HiveSpec>> map2, Table table, TableMetadata tableMetadata) throws IOException {
        PartitionSpec spec = table.spec();
        tableMetadata.ensureTxnInit();
        HashSet hashSet = new HashSet();
        getIcebergDataFilesToBeAddedHelper(gobblinMetadataChangeEvent, table, map, tableMetadata).forEach(dataFile -> {
            hashSet.add(dataFile);
            tableMetadata.addedFiles.put(dataFile.path(), "");
        });
        Set<DataFile> icebergDataFilesToBeDeleted = getIcebergDataFilesToBeDeleted(gobblinMetadataChangeEvent, table, map, map2, spec);
        if (!icebergDataFilesToBeDeleted.isEmpty() || hashSet.isEmpty()) {
            ((Transaction) tableMetadata.transaction.get()).newRewrite().rewriteFiles(icebergDataFilesToBeDeleted, hashSet).commit();
            return;
        }
        if (FindFiles.in(table).withMetadataMatching(Expressions.startsWith(ICEBERG_FILE_PATH_COLUMN, ((DataFile) hashSet.iterator().next()).path().toString())).collect().iterator().hasNext()) {
            return;
        }
        AppendFiles orInitAppendFiles = tableMetadata.getOrInitAppendFiles();
        orInitAppendFiles.getClass();
        hashSet.forEach(orInitAppendFiles::appendFile);
    }

    private org.apache.iceberg.Schema getSchemaWithOriginId(GobblinMetadataChangeEvent gobblinMetadataChangeEvent) {
        org.apache.iceberg.Schema schema = null;
        if (gobblinMetadataChangeEvent.getAvroSchemaWithIcebergSchemaID() != null) {
            schema = AvroSchemaUtil.toIceberg(new Schema.Parser().parse(gobblinMetadataChangeEvent.getAvroSchemaWithIcebergSchemaID()));
        }
        return schema;
    }

    protected void dropFiles(GobblinMetadataChangeEvent gobblinMetadataChangeEvent, Map<String, Collection<HiveSpec>> map, final Table table, TableMetadata tableMetadata, final TableIdentifier tableIdentifier) throws IOException {
        PartitionSpec spec = table.spec();
        DeleteFiles orInitDeleteFiles = tableMetadata.getOrInitDeleteFiles();
        Set<DataFile> icebergDataFilesToBeDeleted = getIcebergDataFilesToBeDeleted(gobblinMetadataChangeEvent, table, new HashMap(), map, spec);
        orInitDeleteFiles.getClass();
        icebergDataFilesToBeDeleted.forEach(orInitDeleteFiles::deleteFile);
        this.parallelRunner.submitCallable(new Callable<Void>() { // from class: org.apache.gobblin.iceberg.writer.IcebergMetadataWriter.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                try {
                    long expireSnapshotTime = IcebergMetadataWriter.this.getExpireSnapshotTime();
                    long currentTimeMillis = System.currentTimeMillis();
                    ExpireSnapshots expireSnapshots = table.expireSnapshots();
                    final Table table2 = table;
                    expireSnapshots.deleteWith(new Consumer<String>() { // from class: org.apache.gobblin.iceberg.writer.IcebergMetadataWriter.2.1
                        @Override // java.util.function.Consumer
                        public void accept(String str) {
                            if (str.startsWith(table2.location())) {
                                table2.io().deleteFile(str);
                            }
                        }
                    }).expireOlderThan(expireSnapshotTime).commit();
                    IcebergMetadataWriter.log.info("Spent {} ms to expire snapshots older than {} ({}) in table {}", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis), new DateTime(expireSnapshotTime).toString(), Long.valueOf(expireSnapshotTime), tableIdentifier.toString()});
                    return null;
                } catch (Exception e) {
                    IcebergMetadataWriter.log.error(String.format("Fail to expire snapshots for table %s due to exception ", tableIdentifier.toString()), e);
                    return null;
                }
            }
        }, tableIdentifier.toString());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long getExpireSnapshotTime() {
        return DateTime.now().minus(new PeriodFormatterBuilder().appendYears().appendSuffix("y").appendMonths().appendSuffix("M").appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m").toFormatter().parsePeriod(this.conf.get(EXPIRE_SNAPSHOTS_LOOKBACK_TIME, DEFAULT_EXPIRE_SNAPSHOTS_LOOKBACK_TIME))).getMillis();
    }

    protected void addFiles(GobblinMetadataChangeEvent gobblinMetadataChangeEvent, Map<String, Collection<HiveSpec>> map, Table table, TableMetadata tableMetadata) {
        AppendFiles orInitAppendFiles = tableMetadata.getOrInitAppendFiles();
        getIcebergDataFilesToBeAddedHelper(gobblinMetadataChangeEvent, table, map, tableMetadata).forEach(dataFile -> {
            orInitAppendFiles.appendFile(dataFile);
            tableMetadata.addedFiles.put(dataFile.path(), "");
        });
    }

    private Stream<DataFile> getIcebergDataFilesToBeAddedHelper(GobblinMetadataChangeEvent gobblinMetadataChangeEvent, Table table, Map<String, Collection<HiveSpec>> map, TableMetadata tableMetadata) {
        return getIcebergDataFilesToBeAdded(table, tableMetadata, gobblinMetadataChangeEvent, gobblinMetadataChangeEvent.getNewFiles(), table.spec(), map, IcebergUtils.getSchemaIdMap(getSchemaWithOriginId(gobblinMetadataChangeEvent), table.schema())).stream().filter(dataFile -> {
            return tableMetadata.addedFiles.getIfPresent(dataFile.path()) == null;
        });
    }

    private Set<DataFile> getIcebergDataFilesToBeDeleted(GobblinMetadataChangeEvent gobblinMetadataChangeEvent, Table table, Map<String, Collection<HiveSpec>> map, Map<String, Collection<HiveSpec>> map2, PartitionSpec partitionSpec) throws IOException {
        HashSet hashSet = new HashSet();
        if (gobblinMetadataChangeEvent.getOldFilePrefixes() != null) {
            Expression alwaysFalse = Expressions.alwaysFalse();
            for (String str : gobblinMetadataChangeEvent.getOldFilePrefixes()) {
                alwaysFalse = Expressions.or(Expressions.or(alwaysFalse, Expressions.startsWith(ICEBERG_FILE_PATH_COLUMN, str)), Expressions.startsWith(ICEBERG_FILE_PATH_COLUMN, new Path(str).toUri().getRawPath()));
            }
            long currentTimeMillis = System.currentTimeMillis();
            hashSet.addAll(Sets.newHashSet(FindFiles.in(table).withMetadataMatching(alwaysFalse).collect().iterator()));
            log.info("Spent {}ms to query all old files in iceberg.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        } else {
            for (String str2 : gobblinMetadataChangeEvent.getOldFiles()) {
                String path = new Path(str2).getParent().toString();
                hashSet.add(IcebergUtils.getIcebergDataFileWithoutMetric(str2, partitionSpec, getIcebergPartitionVal(map2.containsKey(path) ? map2.get(path) : map.get(path), str2, partitionSpec)));
            }
        }
        return hashSet;
    }

    private Set<DataFile> getIcebergDataFilesToBeAdded(Table table, TableMetadata tableMetadata, GobblinMetadataChangeEvent gobblinMetadataChangeEvent, List<org.apache.gobblin.metadata.DataFile> list, PartitionSpec partitionSpec, Map<String, Collection<HiveSpec>> map, Map<Integer, Integer> map2) {
        HashSet hashSet = new HashSet();
        for (org.apache.gobblin.metadata.DataFile dataFile : list) {
            try {
                Collection<HiveSpec> collection = map.get(new Path(dataFile.getFilePath()).getParent().toString());
                StructLike icebergPartitionVal = getIcebergPartitionVal(collection, dataFile.getFilePath(), partitionSpec);
                if (tableMetadata.newPartitionColumnEnabled && gobblinMetadataChangeEvent.getOperationType() == OperationType.add_files) {
                    String str = (String) icebergPartitionVal.get(0, (Class) null);
                    icebergPartitionVal = addLatePartitionValueToIcebergTable(table, tableMetadata, (HivePartition) collection.iterator().next().getPartition().get(), str);
                    tableMetadata.datePartitions.add(getDateTimeFromDatepartitionString(str));
                }
                hashSet.add(IcebergUtils.getIcebergDataFileWithMetric(dataFile, table.spec(), icebergPartitionVal, this.conf, map2));
            } catch (Exception e) {
                log.warn("Cannot get DataFile for {} dur to {}", dataFile.getFilePath(), e);
            }
        }
        return hashSet;
    }

    private StructLike addLatePartitionValueToIcebergTable(Table table, TableMetadata tableMetadata, HivePartition hivePartition, String str) {
        PartitionSpec spec = addPartitionToIcebergTable(table, this.newPartitionColumn, this.newPartitionColumnType).spec();
        int isLate = !tableMetadata.completenessEnabled ? 0 : isLate(str, tableMetadata.completionWatermark);
        ArrayList arrayList = new ArrayList(hivePartition.getValues());
        arrayList.add(String.valueOf(isLate));
        return IcebergUtils.getPartition(spec.partitionType(), arrayList);
    }

    private int isLate(String str, long j) {
        ZonedDateTime parse = ZonedDateTime.parse(str, this.HOURLY_DATEPARTITION_FORMAT);
        long epochMilli = parse.toInstant().toEpochMilli();
        if (epochMilli >= j) {
            return 0;
        }
        return (epochMilli >= j || !parse.toLocalDate().equals(getDateFromEpochMillis(j))) ? 2 : 1;
    }

    private LocalDate getDateFromEpochMillis(long j) {
        return ZonedDateTime.ofInstant(Instant.ofEpochMilli(j), ZoneId.of(this.timeZone)).toLocalDate();
    }

    private ZonedDateTime getDateTimeFromDatepartitionString(String str) {
        return ZonedDateTime.parse(str, this.HOURLY_DATEPARTITION_FORMAT);
    }

    private StructLike getIcebergPartitionVal(Collection<HiveSpec> collection, String str, PartitionSpec partitionSpec) throws IOException {
        if (collection == null || collection.isEmpty()) {
            throw new IOException("Cannot get hive spec for " + str);
        }
        HivePartition hivePartition = (HivePartition) collection.iterator().next().getPartition().orNull();
        return hivePartition == null ? null : IcebergUtils.getPartition(partitionSpec.partitionType(), hivePartition.getValues());
    }

    protected String getTopicName(TableIdentifier tableIdentifier, TableMetadata tableMetadata) {
        if (!tableMetadata.dataOffsetRange.isPresent() || ((Map) tableMetadata.dataOffsetRange.get()).size() == 0) {
            return (String) ((Map) tableMetadata.newProperties.or(Maps.newHashMap((Map) tableMetadata.lastProperties.or(getIcebergTable(tableIdentifier).properties())))).get(IcebergMetadataWriterConfigKeys.TOPIC_NAME_KEY);
        }
        String str = (String) ((Map) tableMetadata.dataOffsetRange.get()).keySet().iterator().next();
        return str.substring(0, str.lastIndexOf(45));
    }

    public void flush(String str, String str2) throws IOException {
        Lock writeLock = this.readWriteLock.writeLock();
        writeLock.lock();
        try {
            try {
                TableIdentifier of = TableIdentifier.of(new String[]{str, str2});
                TableMetadata orDefault = this.tableMetadataMap.getOrDefault(of, new TableMetadata());
                if (orDefault.transaction.isPresent()) {
                    Transaction transaction = (Transaction) orDefault.transaction.get();
                    Map<String, String> map = (Map) orDefault.newProperties.or(Maps.newHashMap((Map) orDefault.lastProperties.or(getIcebergTable(of).properties())));
                    setDatasetOffsetRange(orDefault, map);
                    String topicName = getTopicName(of, orDefault);
                    if (orDefault.appendFiles.isPresent()) {
                        ((AppendFiles) orDefault.appendFiles.get()).commit();
                        sendAuditCounts(topicName, orDefault.serializedAuditCountMaps);
                        if (orDefault.completenessEnabled) {
                            checkAndUpdateCompletenessWatermark(orDefault, topicName, orDefault.datePartitions, map);
                        }
                    }
                    if (orDefault.deleteFiles.isPresent()) {
                        ((DeleteFiles) orDefault.deleteFiles.get()).commit();
                    }
                    if (!orDefault.appendFiles.isPresent() && !orDefault.deleteFiles.isPresent() && orDefault.completenessEnabled) {
                        if (orDefault.completionWatermark > -1) {
                            log.info(String.format("Checking kafka audit for %s on change_property ", topicName));
                            TreeSet treeSet = new TreeSet();
                            treeSet.add(ZonedDateTime.now(ZoneId.of(this.timeZone)).truncatedTo(ChronoUnit.HOURS));
                            checkAndUpdateCompletenessWatermark(orDefault, topicName, treeSet, map);
                        } else {
                            log.info(String.format("Need valid watermark, current watermark is %s, Not checking kafka audit for %s", Long.valueOf(orDefault.completionWatermark), topicName));
                        }
                    }
                    Long l = this.tableCurrentWatermarkMap.get(of);
                    map.put(String.format(GMCE_HIGH_WATERMARK_KEY, this.tableTopicPartitionMap.get(of)), l.toString());
                    map.put(String.format(GMCE_LOW_WATERMARK_KEY, this.tableTopicPartitionMap.get(of)), ((Long) orDefault.lowWatermark.get()).toString());
                    if (this.conf.getBoolean(IcebergMetadataWriterConfigKeys.ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY, true)) {
                        map.put("write.metadata.delete-after-commit.enabled", Boolean.toString(this.conf.getBoolean("write.metadata.delete-after-commit.enabled", false)));
                        map.put("write.metadata.previous-versions-max", Integer.toString(this.conf.getInt("write.metadata.previous-versions-max", 100)));
                    }
                    updateSchema(orDefault, map, topicName);
                    UpdateProperties updateProperties = transaction.updateProperties();
                    updateProperties.getClass();
                    map.forEach(updateProperties::set);
                    updateProperties.commit();
                    AutoCloseableHiveLock tableLock = this.locks.getTableLock(str, str2);
                    Throwable th = null;
                    try {
                        try {
                            transaction.commitTransaction();
                            if (tableLock != null) {
                                if (0 != 0) {
                                    try {
                                        tableLock.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    tableLock.close();
                                }
                            }
                            Snapshot currentSnapshot = ((Table) orDefault.table.get()).currentSnapshot();
                            Map<String, String> properties = ((Table) orDefault.table.get()).properties();
                            submitSnapshotCommitEvent(currentSnapshot, orDefault, str, str2, properties, l);
                            orDefault.reset(properties, l);
                            log.info(String.format("Finish commit of new snapshot %s for table %s", Long.valueOf(currentSnapshot.snapshotId()), of.toString()));
                        } finally {
                        }
                    } catch (Throwable th3) {
                        if (tableLock != null) {
                            if (th != null) {
                                try {
                                    tableLock.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                tableLock.close();
                            }
                        }
                        throw th3;
                    }
                } else {
                    log.info("There's no transaction initiated for the table {}", of.toString());
                }
            } catch (RuntimeException e) {
                throw new IOException(String.format("Fail to flush table %s %s", str, str2), e);
            } catch (Exception e2) {
                throw new IOException(String.format("Fail to flush table %s %s", str, str2), e2);
            }
        } finally {
            writeLock.unlock();
        }
    }

    public void reset(String str, String str2) throws IOException {
        this.tableMetadataMap.remove(TableIdentifier.of(new String[]{str, str2}));
    }

    private void checkAndUpdateCompletenessWatermark(TableMetadata tableMetadata, String str, SortedSet<ZonedDateTime> sortedSet, Map<String, String> map) {
        String name = ((Table) tableMetadata.table.get()).name();
        if (str == null) {
            log.error(String.format("Not performing audit check. %s is null. Please set as table property of %s", IcebergMetadataWriterConfigKeys.TOPIC_NAME_KEY, name));
        }
        long computeCompletenessWatermark = computeCompletenessWatermark(name, str, sortedSet, tableMetadata.completionWatermark);
        if (computeCompletenessWatermark > tableMetadata.completionWatermark) {
            log.info(String.format("Updating %s for %s to %s", IcebergMetadataWriterConfigKeys.COMPLETION_WATERMARK_KEY, ((Table) tableMetadata.table.get()).name(), Long.valueOf(computeCompletenessWatermark)));
            map.put(IcebergMetadataWriterConfigKeys.COMPLETION_WATERMARK_KEY, String.valueOf(computeCompletenessWatermark));
            map.put(IcebergMetadataWriterConfigKeys.COMPLETION_WATERMARK_TIMEZONE_KEY, this.timeZone);
            tableMetadata.completionWatermark = computeCompletenessWatermark;
        }
    }

    private long computeCompletenessWatermark(String str, String str2, SortedSet<ZonedDateTime> sortedSet, long j) {
        log.info(String.format("Compute completion watermark for %s and timestamps %s with previous watermark %s", str2, sortedSet, Long.valueOf(j)));
        long j2 = j;
        ZonedDateTime now = ZonedDateTime.now(ZoneId.of(this.timeZone));
        if (sortedSet != null) {
            try {
            } catch (IOException e) {
                log.warn("Exception during audit count check: ", e);
            }
            if (sortedSet.size() > 0) {
                TimeIterator.Granularity valueOf = TimeIterator.Granularity.valueOf(this.auditCheckGranularity);
                ZonedDateTime atZone = Instant.ofEpochMilli(j).atZone(ZoneId.of(this.timeZone));
                TimeIterator timeIterator = new TimeIterator(sortedSet.first(), sortedSet.last(), valueOf, true);
                while (true) {
                    if (!timeIterator.hasNext()) {
                        break;
                    }
                    ZonedDateTime next = timeIterator.next();
                    if (!next.isAfter(atZone) || TimeIterator.durationBetween(atZone, now, valueOf) <= 0) {
                        break;
                    }
                    long epochMilli = next.toInstant().toEpochMilli();
                    if (((KafkaAuditCountVerifier) this.auditCountVerifier.get()).isComplete(str2, TimeIterator.dec(next, valueOf, 1L).toInstant().toEpochMilli(), epochMilli)) {
                        j2 = epochMilli;
                        this.state.setProp(String.format(IcebergMetadataWriterConfigKeys.STATE_COMPLETION_WATERMARK_KEY_OF_TABLE, str.toLowerCase(Locale.ROOT)), Long.valueOf(j2));
                        break;
                    }
                }
                return j2;
            }
        }
        log.error("Cannot create time iterator. Empty for null timestamps");
        return j;
    }

    private void submitSnapshotCommitEvent(Snapshot snapshot, TableMetadata tableMetadata, String str, String str2, Map<String, String> map, Long l) {
        GobblinEventBuilder gobblinEventBuilder = new GobblinEventBuilder("IcebergMetadataCommitEvent");
        long snapshotId = snapshot.snapshotId();
        long currentTimeMillis = System.currentTimeMillis() - tableMetadata.lowestGMCEEmittedTime;
        String str3 = this.tableTopicPartitionMap.get(TableIdentifier.of(new String[]{str, str2}));
        gobblinEventBuilder.addMetadata("gmceTopicName", str3.split("-")[0]);
        gobblinEventBuilder.addMetadata("gmceTopicPartition", str3.split("-")[1]);
        gobblinEventBuilder.addMetadata("gmceHighWatermark", l.toString());
        gobblinEventBuilder.addMetadata("gmceLowWatermark", ((Long) tableMetadata.lowWatermark.get()).toString());
        gobblinEventBuilder.addMetadata("endToEndLag", Long.toString(currentTimeMillis));
        gobblinEventBuilder.addMetadata("currentSnapshotId", Long.toString(snapshotId));
        gobblinEventBuilder.addMetadata("currentManifestLocation", snapshot.manifestListLocation());
        gobblinEventBuilder.addMetadata("currentSnapshotDetailedInformation", Joiner.on(",").withKeyValueSeparator("=").join(snapshot.summary()));
        gobblinEventBuilder.addMetadata("icebergTableName", str2);
        gobblinEventBuilder.addMetadata("icebergDatabaseName", str);
        gobblinEventBuilder.addMetadata("datasetHdfsPath", tableMetadata.datasetName);
        for (Map.Entry<String, String> entry : map.entrySet()) {
            if (entry.getKey().startsWith(OFFSET_RANGE_KEY_PREFIX)) {
                gobblinEventBuilder.addMetadata(entry.getKey(), entry.getValue());
            }
        }
        if (tableMetadata.completenessEnabled) {
            gobblinEventBuilder.addMetadata(IcebergMetadataWriterConfigKeys.COMPLETION_WATERMARK_KEY, Long.toString(tableMetadata.completionWatermark));
        }
        this.eventSubmitter.submit(gobblinEventBuilder);
    }

    private boolean setDatasetOffsetRange(TableMetadata tableMetadata, Map<String, String> map) {
        if (!tableMetadata.dataOffsetRange.isPresent() || ((Map) tableMetadata.dataOffsetRange.get()).isEmpty()) {
            return false;
        }
        for (Map.Entry entry : ((Map) tableMetadata.dataOffsetRange.get()).entrySet()) {
            map.put(String.format(OFFSET_RANGE_KEY_FORMAT, entry.getKey()), (String) ((List) entry.getValue()).stream().map(range -> {
                return Joiner.on("-").join(range.lowerEndpoint(), range.upperEndpoint(), new Object[0]);
            }).collect(Collectors.joining(",")));
        }
        return true;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void updateSchema(TableMetadata tableMetadata, Map<String, String> map, String str) {
        map.put(SCHEMA_CREATION_TIME_KEY, tableMetadata.lastSchemaVersion.or(DEFAULT_CREATION_TIME));
        try {
            if (tableMetadata.candidateSchemas.isPresent() && ((Cache) tableMetadata.candidateSchemas.get()).size() > 0) {
                Cache cache = (Cache) tableMetadata.candidateSchemas.get();
                if (cache.size() != 1 || cache.getIfPresent(DEFAULT_CREATION_TIME) == null) {
                    String schemaCreationTime = AvroUtils.getSchemaCreationTime((org.apache.avro.Schema) this.schemaRegistry.getLatestSchemaByTopic(str));
                    if (schemaCreationTime == null) {
                        log.warn("Schema from schema registry does not contain creation time, check config for schema registry class");
                    } else if (cache.getIfPresent(schemaCreationTime) != null) {
                        updateSchemaHelper(schemaCreationTime, (Pair) cache.getIfPresent(schemaCreationTime), map, (Table) tableMetadata.table.get());
                    }
                } else {
                    updateSchemaHelper(DEFAULT_CREATION_TIME, (Pair) cache.getIfPresent(DEFAULT_CREATION_TIME), map, (Table) tableMetadata.table.get());
                }
            }
        } catch (SchemaRegistryException e) {
            log.error("Cannot get schema form schema registry, will not update this schema", e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void updateSchemaHelper(String str, Pair<org.apache.iceberg.Schema, String> pair, Map<String, String> map, Table table) {
        try {
            table.updateSchema().unionByNameWith((org.apache.iceberg.Schema) pair.getLeft()).commit();
            map.put(SCHEMA_CREATION_TIME_KEY, str);
            map.put(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), pair.getRight());
        } catch (Exception e) {
            log.error("Cannot update schema to " + pair.toString() + "for table " + table.location(), e);
        }
    }

    public void writeEnvelope(RecordEnvelope<GenericRecord> recordEnvelope, Map<String, Collection<HiveSpec>> map, Map<String, Collection<HiveSpec>> map2, HiveSpec hiveSpec) throws IOException {
        Lock readLock = this.readWriteLock.readLock();
        readLock.lock();
        try {
            GenericRecord genericRecord = (GenericRecord) recordEnvelope.getRecord();
            GobblinMetadataChangeEvent gobblinMetadataChangeEvent = (GobblinMetadataChangeEvent) SpecificData.get().deepCopy(genericRecord.getSchema(), genericRecord);
            String dbName = hiveSpec.getTable().getDbName();
            String tableName = hiveSpec.getTable().getTableName();
            if (this.whitelistBlacklist.acceptTable(dbName, tableName)) {
                TableIdentifier of = TableIdentifier.of(new String[]{dbName, tableName});
                String computeIfAbsent = this.tableTopicPartitionMap.computeIfAbsent(of, tableIdentifier -> {
                    return recordEnvelope.getWatermark().getSource();
                });
                Long andPersistCurrentWatermark = getAndPersistCurrentWatermark(of, computeIfAbsent);
                Long valueOf = Long.valueOf(recordEnvelope.getWatermark().getWatermark().getValue());
                if (valueOf.longValue() > andPersistCurrentWatermark.longValue()) {
                    if (!this.tableMetadataMap.computeIfAbsent(of, tableIdentifier2 -> {
                        return new TableMetadata();
                    }).lowWatermark.isPresent()) {
                        this.tableMetadataMap.get(of).lowWatermark = Optional.of(Long.valueOf(valueOf.longValue() - 1));
                        this.tableMetadataMap.get(of).setDatasetName(gobblinMetadataChangeEvent.getDatasetIdentifier().getNativeName());
                        if (this.newPartitionEnabled && this.newPartitionTableWhitelistBlacklist.acceptTable(dbName, tableName)) {
                            this.tableMetadataMap.get(of).newPartitionColumnEnabled = true;
                            if (this.completenessEnabled && this.completenessWhitelistBlacklist.acceptTable(dbName, tableName)) {
                                this.tableMetadataMap.get(of).completenessEnabled = true;
                            }
                        }
                    }
                    write(gobblinMetadataChangeEvent, map, map2, hiveSpec);
                    this.tableCurrentWatermarkMap.put(of, valueOf);
                } else {
                    log.warn(String.format("Skip processing record for table: %s.%s, GMCE offset: %d, GMCE partition: %s since it has lower watermark", dbName, tableName, valueOf, computeIfAbsent));
                }
            } else {
                log.info(String.format("Skip table %s.%s since it's not selected", hiveSpec.getTable().getDbName(), hiveSpec.getTable().getTableName()));
            }
        } finally {
            readLock.unlock();
        }
    }

    public void sendAuditCounts(String str, Collection<String> collection) {
    }

    public void close() throws IOException {
        this.closer.close();
    }

    public KafkaSchemaRegistry getSchemaRegistry() {
        return this.schemaRegistry;
    }

    public void setCatalog(Catalog catalog) {
        this.catalog = catalog;
    }
}
