package org.apache.skywalking.apm.commons.datacarrier.consumer;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer;

/* loaded from: input_file:org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.class */
public class ConsumerThread<T> extends Thread {
    private volatile boolean running;
    private IConsumer<T> consumer;
    private List<ConsumerThread<T>.DataSource> dataSources;
    private long consumeCycle;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread$DataSource.class */
    public class DataSource {
        private Buffer<T> sourceBuffer;
        private int start;
        private int end;

        DataSource(Buffer<T> buffer, int i, int i2) {
            this.sourceBuffer = buffer;
            this.start = i;
            this.end = i2;
        }

        LinkedList<T> obtain() {
            return this.sourceBuffer.obtain(this.start, this.end);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerThread(String str, IConsumer<T> iConsumer, long j) {
        super(str);
        this.consumer = iConsumer;
        this.running = false;
        this.dataSources = new LinkedList();
        this.consumeCycle = j;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addDataSource(Buffer<T> buffer, int i, int i2) {
        this.dataSources.add(new DataSource(buffer, i, i2));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addDataSource(Buffer<T> buffer) {
        this.dataSources.add(new DataSource(buffer, 0, buffer.getBufferSize()));
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        this.running = true;
        while (this.running) {
            if (!consume()) {
                try {
                    Thread.sleep(this.consumeCycle);
                } catch (InterruptedException e) {
                }
            }
        }
        consume();
        this.consumer.onExit();
    }

    private boolean consume() {
        boolean z = false;
        LinkedList linkedList = new LinkedList();
        Iterator<ConsumerThread<T>.DataSource> it = this.dataSources.iterator();
        while (it.hasNext()) {
            LinkedList<T> obtain = it.next().obtain();
            if (obtain.size() != 0) {
                Iterator<T> it2 = obtain.iterator();
                while (it2.hasNext()) {
                    linkedList.add(it2.next());
                }
                z = true;
            }
        }
        if (linkedList.size() > 0) {
            try {
                this.consumer.consume(linkedList);
            } catch (Throwable th) {
                this.consumer.onError(linkedList, th);
            }
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown() {
        this.running = false;
    }
}
