package com.cloudera.dim.atlas.events;

import com.cloudera.dim.atlas.AtlasPlugin;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.hortonworks.registries.common.QueryParam;
import com.hortonworks.registries.schemaregistry.AtlasEventStorable;
import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
import com.hortonworks.registries.schemaregistry.SchemaMetadataStorable;
import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
import com.hortonworks.registries.schemaregistry.SchemaVersionStorable;
import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
import com.hortonworks.registries.storage.StorageManager;
import com.hortonworks.registries.storage.TransactionManager;
import com.hortonworks.registries.storage.transaction.ManagedTransaction;
import com.hortonworks.registries.storage.transaction.TransactionIsolation;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/dim/atlas/events/AtlasEventsProcessor.class */
public class AtlasEventsProcessor implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(AtlasEventsProcessor.class);
    private final AtlasPlugin atlasPlugin;
    private final StorageManager storageManager;
    private final long waitBetweenProcessing;
    private final boolean connectWithKafka;
    private final ManagedTransaction managedTransaction;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.cloudera.dim.atlas.events.AtlasEventsProcessor$1, reason: invalid class name */
    /* loaded from: input_file:com/cloudera/dim/atlas/events/AtlasEventsProcessor$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$hortonworks$registries$schemaregistry$AtlasEventStorable$EventType = new int[AtlasEventStorable.EventType.values().length];

        static {
            try {
                $SwitchMap$com$hortonworks$registries$schemaregistry$AtlasEventStorable$EventType[AtlasEventStorable.EventType.CREATE_META.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$hortonworks$registries$schemaregistry$AtlasEventStorable$EventType[AtlasEventStorable.EventType.UPDATE_META.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$hortonworks$registries$schemaregistry$AtlasEventStorable$EventType[AtlasEventStorable.EventType.CREATE_VERSION.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public AtlasEventsProcessor(AtlasPlugin atlasPlugin, StorageManager storageManager, TransactionManager transactionManager, long j, boolean z) {
        this.atlasPlugin = (AtlasPlugin) Preconditions.checkNotNull(atlasPlugin, "atlasPlugin");
        this.storageManager = (StorageManager) Preconditions.checkNotNull(storageManager, "storageManager");
        Preconditions.checkNotNull(transactionManager, "transactionManager");
        this.connectWithKafka = z;
        this.waitBetweenProcessing = j;
        Preconditions.checkState(j > 0, "Wait period must be greater than 0");
        this.managedTransaction = new ManagedTransaction(transactionManager, TransactionIsolation.SERIALIZABLE);
        LOG.info("Connecting schemas with kafka topics is {}", z ? "ENABLED" : "DISABLED");
    }

    @Override // java.lang.Runnable
    public synchronized void run() {
        LOG.trace("Pre-sleep of AtlasEvents processor");
        try {
            Thread.sleep(5000L);
        } catch (InterruptedException e) {
            LOG.warn("Interrupted while performing initial sleep.", e);
        }
        LOG.debug("Starting Atlas events processor.");
        while (!Thread.interrupted()) {
            try {
                processAtlasEvents();
                wait(this.waitBetweenProcessing);
            } catch (InterruptedException e2) {
                LOG.warn("Atlas events processor was interrupted.", e2);
                return;
            } catch (Exception e3) {
                LOG.error("An error occurred while processing Atlas events. The AtlasEventsProcessor thread will terminate.", e3);
                throw new UndeclaredThrowableException(e3);
            }
        }
    }

    @VisibleForTesting
    void processAtlasEvents() throws Exception {
        List asList = Arrays.asList(new QueryParam("processed", "false"), new QueryParam("failed", "false"));
        this.managedTransaction.executeFunction(() -> {
            Collection<AtlasEventStorable> find = this.storageManager.find("atlas_events", asList);
            if (find == null || find.isEmpty()) {
                return false;
            }
            LOG.info("Processing {} Atlas events.", Integer.valueOf(find.size()));
            for (AtlasEventStorable atlasEventStorable : find) {
                try {
                    processAtlasEvent(atlasEventStorable);
                } catch (Exception e) {
                    LOG.error("Could not process Atlas event. Setting it to failed state: {}", atlasEventStorable, e);
                    setAtlasEntryToFailed(atlasEventStorable);
                }
            }
            return true;
        });
    }

    @VisibleForTesting
    void processAtlasEvent(AtlasEventStorable atlasEventStorable) throws Exception {
        Preconditions.checkNotNull(atlasEventStorable.getProcessedId(), "Atlas event ID is null.");
        Preconditions.checkNotNull(atlasEventStorable.getType(), "Atlas event type is null.");
        switch (AnonymousClass1.$SwitchMap$com$hortonworks$registries$schemaregistry$AtlasEventStorable$EventType[AtlasEventStorable.EventType.forNumValue(atlasEventStorable.getType().intValue()).ordinal()]) {
            case 1:
                createMeta(atlasEventStorable.getProcessedId());
                break;
            case 2:
                updateMeta(atlasEventStorable.getProcessedId());
                break;
            case 3:
                createVersion(atlasEventStorable.getProcessedId());
                break;
            default:
                throw new IllegalArgumentException("Unsupported event type: " + atlasEventStorable);
        }
        atlasEventStorable.setProcessed(true);
        this.storageManager.update(atlasEventStorable);
    }

    private void createMeta(Long l) throws SchemaNotFoundException {
        SchemaMetadataInfo metaById = getMetaById(l);
        if (metaById == null) {
            throw new SchemaNotFoundException("Did not find schema with ID " + l, String.valueOf(l));
        }
        String createMeta = this.atlasPlugin.createMeta(metaById);
        if (createMeta == null || !isConnectWithKafka()) {
            return;
        }
        connectSchemaWithTopic(createMeta, metaById);
    }

    private void connectSchemaWithTopic(String str, SchemaMetadataInfo schemaMetadataInfo) {
        LOG.debug("Connect schema with Kafka topic");
        if (this.atlasPlugin.isKafkaSchemaModelInitialized()) {
            this.atlasPlugin.connectSchemaWithTopic(str, schemaMetadataInfo);
        }
    }

    private void updateMeta(Long l) throws SchemaNotFoundException {
        SchemaMetadataInfo metaById = getMetaById(l);
        if (metaById == null) {
            throw new SchemaNotFoundException("Did not find schema with ID " + l, String.valueOf(l));
        }
        this.atlasPlugin.updateMeta(metaById.getSchemaMetadata());
    }

    private void createVersion(Long l) throws SchemaNotFoundException {
        SchemaVersionInfo versionById = getVersionById(l);
        if (versionById == null) {
            throw new SchemaNotFoundException("Did not find schema version with ID " + l, String.valueOf(l));
        }
        SchemaMetadataInfo metaById = getMetaById(versionById.getSchemaMetadataId());
        if (metaById == null) {
            throw new SchemaNotFoundException("Did not find schema with ID " + versionById.getSchemaMetadataId() + " for version with ID " + l, String.valueOf(l));
        }
        this.atlasPlugin.addSchemaVersion(metaById.getSchemaMetadata().getName(), versionById);
    }

    private void setAtlasEntryToFailed(AtlasEventStorable atlasEventStorable) {
        if (atlasEventStorable == null) {
            return;
        }
        try {
            atlasEventStorable.setFailed(true);
            this.storageManager.update(atlasEventStorable);
        } catch (Exception e) {
            LOG.error("Failed to set state to 'failed' for Atlas event entry {}", atlasEventStorable, e);
        }
    }

    @Nullable
    private SchemaMetadataInfo getMetaById(@Nonnull Long l) {
        new SchemaMetadataStorable().setId(l);
        Collection find = this.storageManager.find("schema_metadata_info", Collections.singletonList(new QueryParam("id", l.toString())));
        SchemaMetadataInfo schemaMetadataInfo = null;
        if (find != null && !find.isEmpty()) {
            schemaMetadataInfo = ((SchemaMetadataStorable) find.iterator().next()).toSchemaMetadataInfo();
            if (find.size() > 1) {
                LOG.warn("No unique entry with schemaMetatadataId: [{}]", l);
            }
        }
        return schemaMetadataInfo;
    }

    @Nullable
    private SchemaVersionInfo getVersionById(@Nonnull Long l) {
        SchemaVersionStorable schemaVersionStorable = new SchemaVersionStorable();
        schemaVersionStorable.setId(l);
        SchemaVersionStorable schemaVersionStorable2 = this.storageManager.get(schemaVersionStorable.getStorableKey());
        if (schemaVersionStorable2 != null) {
            return schemaVersionStorable2.toSchemaVersionInfo();
        }
        return null;
    }

    private boolean isConnectWithKafka() {
        return this.connectWithKafka;
    }
}
