package org.kitesdk.tools;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.io.Closeables;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import org.apache.avro.generic.GenericRecord;
import org.apache.crunch.DoFn;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.Target;
import org.apache.crunch.impl.mem.MemPipeline;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.types.avro.AvroType;
import org.apache.crunch.types.avro.Avros;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hive.conf.HiveConf;
import org.kitesdk.compat.DynMethods;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetException;
import org.kitesdk.data.DatasetWriter;
import org.kitesdk.data.View;
import org.kitesdk.data.crunch.CrunchDatasets;

/* loaded from: input_file:org/kitesdk/tools/TransformTask.class */
public class TransformTask<S, T> extends Configured {
    private static DynMethods.StaticMethod getEnumByName = new DynMethods.Builder("valueOf").impl("org.apache.hadoop.mapred.Task$Counter", new Class[]{String.class}).impl("org.apache.hadoop.mapreduce.TaskCounter", new Class[]{String.class}).defaultNoop().buildStatic();
    private static final Enum<?> MAP_INPUT_RECORDS = (Enum) getEnumByName.invoke(new Object[]{"MAP_INPUT_RECORDS"});
    private static final String LOCAL_FS_SCHEME = "file";
    private final View<S> from;
    private final View<T> to;
    private final DoFn<S, T> transform;
    private boolean compact = true;
    private int numWriters = -1;
    private long count = 0;

    @SuppressWarnings(value = {"SE_NO_SERIALVERSIONID"}, justification = "Purposely not supported across versions")
    /* loaded from: input_file:org/kitesdk/tools/TransformTask$CheckEntityClass.class */
    public static class CheckEntityClass<E> extends MapFn<E, E> {
        private final Class<?> entityClass;

        public CheckEntityClass(Class<?> cls) {
            this.entityClass = cls;
        }

        public E map(E e) {
            if (e == null || !this.entityClass.isAssignableFrom(e.getClass())) {
                throw new DatasetException("Object does not match expected type " + this.entityClass + ": " + String.valueOf(e));
            }
            return e;
        }
    }

    public TransformTask(View<S> view, View<T> view2, DoFn<S, T> doFn) {
        this.from = view;
        this.to = view2;
        this.transform = doFn;
    }

    public long getCount() {
        return this.count;
    }

    public TransformTask noCompaction() {
        this.compact = false;
        this.numWriters = 0;
        return this;
    }

    public TransformTask setNumWriters(int i) {
        Preconditions.checkArgument(i >= 0, "Invalid number of reducers: " + i);
        if (i == 0) {
            noCompaction();
        } else {
            this.numWriters = i;
        }
        return this;
    }

    public PipelineResult run() throws IOException {
        boolean z = true;
        if (isLocal(this.from.getDataset()) || isLocal(this.to.getDataset())) {
            z = false;
        }
        AvroType ptype = ptype(this.to);
        CheckEntityClass checkEntityClass = new CheckEntityClass(this.to.getType());
        if (z) {
            TaskUtil.configure(getConf()).addJarPathForClass(HiveConf.class);
            MRPipeline mRPipeline = new MRPipeline(getClass(), getConf());
            PCollection parallelDo = mRPipeline.read(CrunchDatasets.asSource(this.from)).parallelDo(this.transform, ptype).parallelDo(checkEntityClass, ptype);
            if (this.compact) {
                parallelDo = CrunchDatasets.partition(parallelDo, this.to, this.numWriters);
            }
            mRPipeline.write(parallelDo, CrunchDatasets.asTarget(this.to), Target.WriteMode.APPEND);
            PipelineResult done = mRPipeline.done();
            PipelineResult.StageResult stageResult = (PipelineResult.StageResult) Iterables.getFirst(done.getStageResults(), (Object) null);
            if (stageResult != null && MAP_INPUT_RECORDS != null) {
                this.count = stageResult.getCounterValue(MAP_INPUT_RECORDS);
            }
            return done;
        }
        Pipeline memPipeline = MemPipeline.getInstance();
        PCollection parallelDo2 = memPipeline.read(CrunchDatasets.asSource(this.from)).parallelDo(this.transform, ptype).parallelDo(checkEntityClass, ptype);
        DatasetWriter datasetWriter = null;
        try {
            datasetWriter = this.to.newWriter();
            Iterator<T> it = parallelDo2.materialize().iterator();
            while (it.hasNext()) {
                datasetWriter.write(it.next());
                this.count++;
            }
            Closeables.close(datasetWriter, false);
            return memPipeline.done();
        } catch (Throwable th) {
            Closeables.close(datasetWriter, true);
            throw th;
        }
    }

    private static boolean isLocal(Dataset<?> dataset) {
        URI location = dataset.getDescriptor().getLocation();
        return location != null && LOCAL_FS_SCHEME.equals(location.getScheme());
    }

    private static <T> AvroType<T> ptype(View<T> view) {
        Class type = view.getType();
        return GenericRecord.class.isAssignableFrom(type) ? Avros.generics(view.getDataset().getDescriptor().getSchema()) : Avros.records(type);
    }
}
