package org.kitesdk.data.crunch;

import java.net.URI;
import java.util.Iterator;
import java.util.Map;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.SourceTarget;
import org.apache.crunch.Target;
import org.apache.crunch.io.CrunchOutputs;
import org.apache.crunch.io.FormatBundle;
import org.apache.crunch.io.MapReduceTarget;
import org.apache.crunch.io.OutputHandler;
import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.avro.AvroType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.kitesdk.data.DatasetException;
import org.kitesdk.data.DatasetNotFoundException;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.Signalable;
import org.kitesdk.data.View;
import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
import org.kitesdk.data.spi.LastModifiedAccessor;
import org.kitesdk.shaded.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/kitesdk/data/crunch/DatasetTarget.class */
public class DatasetTarget<E> implements MapReduceTarget {
    private static final Logger LOG = LoggerFactory.getLogger(DatasetTarget.class);
    private View view;
    private final URI uri;
    private final FormatBundle formatBundle;

    public DatasetTarget(View<E> view) {
        this.view = view;
        Configuration emptyConf = emptyConf();
        DatasetKeyOutputFormat.configure(emptyConf).appendTo((View<?>) view);
        this.formatBundle = outputBundle(emptyConf);
        this.uri = view.getDataset().getUri();
    }

    public DatasetTarget(URI uri) {
        this.uri = uri;
        Configuration emptyConf = emptyConf();
        DatasetKeyOutputFormat.configure(emptyConf).appendTo(uri);
        this.formatBundle = outputBundle(emptyConf);
    }

    @Override // org.apache.crunch.Target
    public Target outputConf(String str, String str2) {
        this.formatBundle.set(str, str2);
        return this;
    }

    @Override // org.apache.crunch.Target
    public boolean handleExisting(Target.WriteMode writeMode, long j, Configuration configuration) {
        outputConf(DatasetKeyOutputFormat.KITE_WRITE_MODE, kiteWriteMode(writeMode).toString());
        if (this.view == null) {
            try {
                this.view = Datasets.load(this.uri);
            } catch (DatasetNotFoundException e) {
                LOG.info("Writing to new dataset/view: " + this.uri);
                return true;
            }
        }
        boolean z = false;
        if (this.view instanceof Signalable) {
            z = ((Signalable) this.view).isReady();
        }
        boolean z2 = z || !this.view.isEmpty();
        if (z2) {
            switch (writeMode) {
                case DEFAULT:
                    LOG.error("Dataset/view " + this.view + " already exists!");
                    throw new CrunchRuntimeException("Dataset/view already exists: " + this.view);
                case OVERWRITE:
                    LOG.info("Overwriting existing dataset/view: " + this.view);
                    break;
                case APPEND:
                    LOG.info("Appending to existing dataset/view: " + this.view);
                    break;
                case CHECKPOINT:
                    long j2 = -1;
                    if (this.view instanceof LastModifiedAccessor) {
                        j2 = ((LastModifiedAccessor) this.view).getLastModified();
                    }
                    if (z && j2 > j) {
                        LOG.info("Re-starting pipeline from checkpoint dataset/view: " + this.view);
                        break;
                    } else {
                        if (z) {
                            LOG.info("Source data has recent updates. Deleting data from existing checkpoint dataset/view: " + this.view);
                        } else {
                            LOG.info("Checkpoint is not ready. Deleting data from existing checkpoint dataset/view: " + this.view);
                        }
                        delete(this.view);
                        return false;
                    }
                    break;
                default:
                    throw new CrunchRuntimeException("Unknown WriteMode:  " + writeMode);
            }
        } else {
            LOG.info("Writing to empty dataset/view: " + this.view);
        }
        return z2;
    }

    private DatasetKeyOutputFormat.WriteMode kiteWriteMode(Target.WriteMode writeMode) {
        switch (writeMode) {
            case DEFAULT:
                return DatasetKeyOutputFormat.WriteMode.DEFAULT;
            case OVERWRITE:
                return DatasetKeyOutputFormat.WriteMode.OVERWRITE;
            case APPEND:
                return DatasetKeyOutputFormat.WriteMode.APPEND;
            default:
                return DatasetKeyOutputFormat.WriteMode.APPEND;
        }
    }

    private void delete(View view) {
        try {
            if (!view.deleteAll()) {
                LOG.warn("No data was deleted.");
            }
        } catch (UnsupportedOperationException e) {
            LOG.error("Dataset view " + view + " cannot be deleted!");
            throw new CrunchRuntimeException("Dataset view cannot be deleted:" + view, e);
        }
    }

    @Override // org.apache.crunch.Target
    public boolean accept(OutputHandler outputHandler, PType<?> pType) {
        if (!(pType instanceof AvroType)) {
            return false;
        }
        outputHandler.configure(this, pType);
        return true;
    }

    @Override // org.apache.crunch.Target
    public Converter<?, ?, ?, ?> getConverter(PType<?> pType) {
        if (pType instanceof AvroType) {
            return new KeyConverter((AvroType) pType);
        }
        throw new DatasetException("Cannot create converter for non-Avro type: " + pType);
    }

    @Override // org.apache.crunch.Target
    public <T> SourceTarget<T> asSourceTarget(PType<T> pType) {
        if (!(pType instanceof AvroType)) {
            return null;
        }
        if (this.view != null) {
            return new DatasetSourceTarget(this.view, (AvroType) pType);
        }
        if (this.uri != null) {
            return new DatasetSourceTarget(this.uri, (AvroType) pType);
        }
        return null;
    }

    @Override // org.apache.crunch.io.MapReduceTarget
    public void configureForMapReduce(Job job, PType<?> pType, Path path, String str) {
        Preconditions.checkNotNull(str, "Output name should not be null");
        CrunchOutputs.addNamedOutput(job, str, (FormatBundle<? extends OutputFormat>) this.formatBundle, getConverter(pType).getKeyClass(), Void.class);
        job.setOutputFormatClass(this.formatBundle.getFormatClass());
        this.formatBundle.configure(job.getConfiguration());
    }

    private static Configuration emptyConf() {
        return new Configuration(false);
    }

    private static FormatBundle<DatasetKeyOutputFormat> outputBundle(Configuration configuration) {
        FormatBundle<DatasetKeyOutputFormat> forOutput = FormatBundle.forOutput(DatasetKeyOutputFormat.class);
        Iterator it = configuration.iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            forOutput.set((String) entry.getKey(), (String) entry.getValue());
        }
        return forOutput;
    }

    public String toString() {
        return "Kite(" + this.uri + ")";
    }
}
