package org.apache.flume.source;

import com.google.common.base.Preconditions;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/source/ExecSource.class */
public class ExecSource extends AbstractSource implements EventDrivenSource, Configurable {
    private static final Logger logger = LoggerFactory.getLogger(ExecSource.class);
    private String command;
    private CounterGroup counterGroup;
    private ExecutorService executor;
    private Future<?> runnerFuture;

    /* loaded from: input_file:org/apache/flume/source/ExecSource$ExecRunnable.class */
    private static class ExecRunnable implements Runnable {
        private String command;
        private Channel channel;
        private CounterGroup counterGroup;

        private ExecRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new ProcessBuilder(this.command.split("\\s+")).start().getInputStream()));
                while (true) {
                    String readLine = bufferedReader.readLine();
                    if (readLine == null) {
                        bufferedReader.close();
                        return;
                    }
                    this.counterGroup.incrementAndGet("exec.lines.read");
                    Transaction transaction = this.channel.getTransaction();
                    try {
                        try {
                            transaction.begin();
                            this.channel.put(EventBuilder.withBody(readLine.getBytes()));
                            transaction.commit();
                            transaction.close();
                        } catch (Throwable th) {
                            transaction.close();
                            throw th;
                        }
                    } catch (ChannelException e) {
                        transaction.rollback();
                        throw e;
                    } catch (Exception e2) {
                        transaction.rollback();
                        throw e2;
                    }
                }
            } catch (Exception e3) {
                ExecSource.logger.error("Failed while running command:" + this.command + " - Exception follows.", e3);
            }
        }
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public void start() {
        logger.info("Exec source starting with command:{}", this.command);
        this.executor = Executors.newSingleThreadExecutor();
        this.counterGroup = new CounterGroup();
        ExecRunnable execRunnable = new ExecRunnable();
        execRunnable.command = this.command;
        execRunnable.channel = getChannel();
        execRunnable.counterGroup = this.counterGroup;
        this.runnerFuture = this.executor.submit(execRunnable);
        super.start();
        logger.debug("Exec source started");
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public void stop() {
        logger.info("Stopping exec source with command:{}", this.command);
        if (this.runnerFuture != null) {
            logger.debug("Stopping exec runner");
            this.runnerFuture.cancel(true);
            logger.debug("Exec runner stopped");
        }
        this.executor.shutdown();
        while (!this.executor.isTerminated()) {
            logger.debug("Waiting for exec executor service to stop");
            try {
                this.executor.awaitTermination(500L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                logger.debug("Interrupted while waiting for exec executor service to stop. Just exiting.");
                Thread.currentThread().interrupt();
            }
        }
        super.stop();
        logger.debug("Exec source with command:{} stopped. Metrics:{}", this.command, this.counterGroup);
    }

    @Override // org.apache.flume.conf.Configurable
    public void configure(Context context) {
        this.command = (String) context.get("command", String.class);
        Preconditions.checkState(this.command != null, "The parameter command must be specified");
    }
}
