package eu.dnetlib.iis.wf.export.actionmanager.sequencefile;

import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.iis.common.ClassPathResourceProvider;
import eu.dnetlib.iis.common.OrderedProperties;
import eu.dnetlib.iis.common.java.PortBindings;
import eu.dnetlib.iis.common.java.Process;
import eu.dnetlib.iis.common.java.io.FileSystemPath;
import eu.dnetlib.iis.common.java.io.SequenceFileTextValueReader;
import eu.dnetlib.iis.common.java.porttype.AnyPortType;
import eu.dnetlib.iis.common.java.porttype.PortType;
import eu.dnetlib.iis.wf.export.actionmanager.AtomicActionDeserializationUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;

/* loaded from: input_file:eu/dnetlib/iis/wf/export/actionmanager/sequencefile/TestingConsumer.class */
public class TestingConsumer implements Process {
    public static final String EXPECTATION_FILE_PATHS = "expectation_file_paths";
    public static final String PORT_INPUT = "seqfile";
    public static final String NULL_VALUE_INDICATOR = "$NULL$";
    private final FieldAccessor accessor;
    private static final Logger log = Logger.getLogger(TestingConsumer.class);
    private final Map<String, PortType> inputPorts = new HashMap();

    public TestingConsumer() {
        this.inputPorts.put(PORT_INPUT, new AnyPortType());
        this.accessor = new FieldAccessor();
    }

    public Map<String, PortType> getInputPorts() {
        return this.inputPorts;
    }

    public Map<String, PortType> getOutputPorts() {
        return Collections.emptyMap();
    }

    public void run(PortBindings portBindings, Configuration configuration, Map<String, String> map) throws Exception {
        String str = map.get(EXPECTATION_FILE_PATHS);
        if (StringUtils.isEmpty(str)) {
            throw new Exception("no expectation_file_paths property value provided, output requirements were not specified!");
        }
        FileSystem fileSystem = FileSystem.get(configuration);
        Path path = (Path) portBindings.getInput().get(PORT_INPUT);
        if (!fileSystem.exists(path)) {
            throw new Exception(path + " hdfs location does not exist!");
        }
        SequenceFileTextValueReader sequenceFileTextValueReader = new SequenceFileTextValueReader(new FileSystemPath(fileSystem, path));
        Throwable th = null;
        try {
            int i = 0;
            String[] split = StringUtils.split(str, ',');
            while (sequenceFileTextValueReader.hasNext()) {
                String text = sequenceFileTextValueReader.next().toString();
                AtomicAction<? extends Oaf> deserializeAction = AtomicActionDeserializationUtils.deserializeAction(text);
                i++;
                if (i > split.length) {
                    throw new Exception("got more records than expected: unable to verify record no " + i + ", no field specification provided! Record contents: " + text);
                }
                evaluateExpectations(split[i - 1], deserializeAction, text);
            }
            if (i < split.length) {
                throw new Exception("records count mismatch: got: " + i + " expected: " + split.length);
            }
            if (sequenceFileTextValueReader != null) {
                if (0 == 0) {
                    sequenceFileTextValueReader.close();
                    return;
                }
                try {
                    sequenceFileTextValueReader.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (sequenceFileTextValueReader != null) {
                if (0 != 0) {
                    try {
                        sequenceFileTextValueReader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    sequenceFileTextValueReader.close();
                }
            }
            throw th3;
        }
    }

    private void evaluateExpectations(String str, AtomicAction<? extends Oaf> atomicAction, String str2) throws Exception {
        log.info("output specification location: " + str);
        OrderedProperties orderedProperties = new OrderedProperties();
        orderedProperties.load(ClassPathResourceProvider.getResourceInputStream(str.trim()));
        for (Map.Entry entry : orderedProperties.entrySet()) {
            Object value = this.accessor.getValue((String) entry.getKey(), atomicAction);
            if ((value != null && !entry.getValue().equals(value.toString())) || (value == null && !NULL_VALUE_INDICATOR.equals(entry.getValue()))) {
                throw new Exception("invalid field value for path: " + entry.getKey() + ", expected: '" + entry.getValue() + "', got: '" + value + "' Full object content: " + str2);
            }
        }
    }
}
