package com.bigdata.ha;

import com.bigdata.ha.HAPipelineGlue;
import com.bigdata.ha.pipeline.HAReceiveService;
import com.bigdata.ha.pipeline.HASendService;
import com.bigdata.io.DirectBufferPool;
import com.bigdata.journal.ha.HAWriteMessage;
import com.bigdata.quorum.QuorumMember;
import com.bigdata.quorum.QuorumStateChangeListener;
import com.bigdata.quorum.QuorumStateChangeListenerBase;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;

/* loaded from: input_file:WEB-INF/lib/bigdata-0.83.2.jar:com/bigdata/ha/QuorumPipelineImpl.class */
public abstract class QuorumPipelineImpl<S extends HAPipelineGlue> extends QuorumStateChangeListenerBase implements QuorumPipeline<S>, QuorumStateChangeListener {
    protected static final transient Logger log = Logger.getLogger(QuorumPipelineImpl.class);
    protected final QuorumMember<S> member;
    protected final UUID serviceId;
    private HASendService sendService;
    private HAReceiveService<HAWriteMessage> receiveService;
    private ByteBuffer receiveBuffer;
    private final ReentrantLock lock = new ReentrantLock();
    private AtomicReference<PipelineState<S>> pipelineStateRef = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/bigdata-0.83.2.jar:com/bigdata/ha/QuorumPipelineImpl$PipelineState.class */
    public static class PipelineState<S extends HAPipelineGlue> implements Externalizable {
        private static final long serialVersionUID = 1;
        public InetSocketAddress addr;
        public S service;

        @Override // java.io.Externalizable
        public void readExternal(ObjectInput objectInput) throws IOException, ClassNotFoundException {
            this.addr = (InetSocketAddress) objectInput.readObject();
            this.service = (S) objectInput.readObject();
        }

        @Override // java.io.Externalizable
        public void writeExternal(ObjectOutput objectOutput) throws IOException {
            objectOutput.writeObject(this.addr);
            objectOutput.writeObject(this.service);
        }
    }

    public QuorumPipelineImpl(QuorumMember<S> quorumMember) {
        if (quorumMember == null) {
            throw new IllegalArgumentException();
        }
        this.member = quorumMember;
        this.serviceId = quorumMember.getServiceId();
    }

    protected void finalize() throws Throwable {
        tearDown();
        super.finalize();
    }

    protected int getIndex(UUID uuid, UUID[] uuidArr) {
        if (uuid == null) {
            throw new IllegalArgumentException();
        }
        for (int i = 0; i < uuidArr.length; i++) {
            if (uuid.equals(uuidArr[i])) {
                return i;
            }
        }
        return -1;
    }

    @Override // com.bigdata.quorum.QuorumStateChangeListenerBase, com.bigdata.quorum.QuorumStateChangeListener
    public void pipelineAdd() {
        super.pipelineAdd();
        this.lock.lock();
        try {
            int index = getIndex(this.serviceId, this.member.getQuorum().getPipeline());
            if (index == 0) {
                setUpSendService();
            } else if (index > 0) {
                setUpReceiveService();
            }
        } finally {
            this.lock.unlock();
        }
    }

    @Override // com.bigdata.quorum.QuorumStateChangeListenerBase, com.bigdata.quorum.QuorumStateChangeListener
    public void pipelineElectedLeader() {
        super.pipelineElectedLeader();
        this.lock.lock();
        try {
            tearDown();
            setUpSendService();
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Override // com.bigdata.quorum.QuorumStateChangeListenerBase, com.bigdata.quorum.QuorumStateChangeListener
    public void pipelineRemove() {
        super.pipelineRemove();
        this.lock.lock();
        try {
            tearDown();
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Override // com.bigdata.quorum.QuorumStateChangeListenerBase, com.bigdata.quorum.QuorumStateChangeListener
    public void pipelineChange(UUID uuid, UUID uuid2) {
        InetSocketAddress writePipelineAddr;
        super.pipelineChange(uuid, uuid2);
        this.lock.lock();
        if (uuid2 == null) {
            writePipelineAddr = null;
        } else {
            try {
                writePipelineAddr = this.member.getService(uuid2).getWritePipelineAddr();
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        }
        InetSocketAddress inetSocketAddress = writePipelineAddr;
        if (this.sendService != null) {
            this.sendService.terminate();
            if (inetSocketAddress != null) {
                this.sendService.start(inetSocketAddress);
            }
        } else if (this.receiveService != null) {
            this.receiveService.changeDownStream(inetSocketAddress);
        }
        cachePipelineState(uuid2);
        this.lock.unlock();
    }

    protected void tearDown() {
        this.lock.lock();
        try {
            if (this.sendService != null) {
                this.sendService.terminate();
                this.sendService = null;
            }
            if (this.receiveService != null) {
                this.receiveService.terminate();
                try {
                    try {
                        this.receiveService.awaitShutdown();
                        this.receiveService = null;
                    } catch (Throwable th) {
                        this.receiveService = null;
                        throw th;
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            if (this.receiveBuffer != null) {
                try {
                    try {
                        DirectBufferPool.INSTANCE.release(this.receiveBuffer);
                        this.receiveBuffer = null;
                    } catch (InterruptedException e2) {
                        throw new RuntimeException(e2);
                    }
                } catch (Throwable th2) {
                    this.receiveBuffer = null;
                    throw th2;
                }
            }
            this.pipelineStateRef.set(null);
            this.lock.unlock();
        } catch (Throwable th3) {
            this.lock.unlock();
            throw th3;
        }
    }

    private void cachePipelineState(UUID uuid) {
        if (uuid == null) {
            this.pipelineStateRef.set(null);
            return;
        }
        S service = this.member.getService(uuid);
        PipelineState<S> pipelineState = new PipelineState<>();
        pipelineState.addr = service.getWritePipelineAddr();
        pipelineState.service = service;
        this.pipelineStateRef.set(pipelineState);
    }

    protected void setUpSendService() {
        RuntimeException runtimeException;
        this.lock.lock();
        try {
            try {
                this.sendService = new HASendService();
                UUID downstreamServiceId = this.member.getDownstreamServiceId();
                if (downstreamServiceId != null) {
                    this.sendService.start(this.member.getService(downstreamServiceId).getWritePipelineAddr());
                }
                cachePipelineState(downstreamServiceId);
                this.lock.unlock();
            } finally {
            }
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    private void setUpReceiveService() {
        this.lock.lock();
        try {
            try {
                UUID downstreamServiceId = this.member.getDownstreamServiceId();
                try {
                    this.receiveBuffer = DirectBufferPool.INSTANCE.acquire();
                    this.receiveService = new HAReceiveService<>(this.member.getService().getWritePipelineAddr(), downstreamServiceId == null ? null : this.member.getService(downstreamServiceId).getWritePipelineAddr(), new HAReceiveService.IHAReceiveCallback<HAWriteMessage>() { // from class: com.bigdata.ha.QuorumPipelineImpl.1
                        @Override // com.bigdata.ha.pipeline.HAReceiveService.IHAReceiveCallback
                        public void callback(HAWriteMessage hAWriteMessage, ByteBuffer byteBuffer) throws Exception {
                            QuorumPipelineImpl.this.handleReplicatedWrite(hAWriteMessage, byteBuffer);
                        }
                    });
                    this.receiveService.start();
                    this.lock.unlock();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            } catch (Throwable th) {
                try {
                    try {
                        tearDown();
                        log.error(th, th);
                    } catch (Throwable th2) {
                        log.error(th2, th2);
                        log.error(th, th);
                        throw new RuntimeException(th);
                    }
                    throw new RuntimeException(th);
                } catch (Throwable th3) {
                    log.error(th, th);
                    throw th3;
                }
            }
        } catch (Throwable th4) {
            this.lock.unlock();
            throw th4;
        }
    }

    protected ByteBuffer getReceiveBuffer() {
        return this.receiveBuffer;
    }

    @Override // com.bigdata.ha.QuorumPipeline
    public HASendService getHASendService() {
        return this.sendService;
    }

    @Override // com.bigdata.ha.QuorumPipeline
    public HAReceiveService<HAWriteMessage> getHAReceiveService() {
        return this.receiveService;
    }

    @Override // com.bigdata.ha.QuorumPipeline
    public Future<Void> replicate(final HAWriteMessage hAWriteMessage, final ByteBuffer byteBuffer) throws IOException {
        this.member.assertLeader(hAWriteMessage.getQuorumToken());
        final PipelineState<S> pipelineState = this.pipelineStateRef.get();
        if (log.isTraceEnabled()) {
            log.trace("Leader will send: " + byteBuffer.remaining() + " bytes");
        }
        FutureTask futureTask = new FutureTask(new Callable<Void>() { // from class: com.bigdata.ha.QuorumPipelineImpl.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                Future<Void> send = QuorumPipelineImpl.this.getHASendService().send(byteBuffer);
                try {
                    Future<Void> receiveAndReplicate = pipelineState.service.receiveAndReplicate(hAWriteMessage);
                    while (!send.isDone() && !receiveAndReplicate.isDone()) {
                        try {
                            try {
                                send.get(1L, TimeUnit.SECONDS);
                            } catch (TimeoutException e) {
                            }
                            try {
                                receiveAndReplicate.get(10L, TimeUnit.MILLISECONDS);
                            } catch (TimeoutException e2) {
                            }
                        } catch (Throwable th) {
                            if (!receiveAndReplicate.isDone()) {
                                receiveAndReplicate.cancel(true);
                            }
                            throw th;
                        }
                    }
                    send.get();
                    receiveAndReplicate.get();
                    if (!receiveAndReplicate.isDone()) {
                        receiveAndReplicate.cancel(true);
                    }
                    return null;
                } finally {
                    send.cancel(true);
                }
            }
        });
        this.member.getExecutor().execute(futureTask);
        return futureTask;
    }

    @Override // com.bigdata.ha.QuorumPipeline
    public Future<Void> receiveAndReplicate(final HAWriteMessage hAWriteMessage) throws IOException {
        final PipelineState<S> pipelineState = this.pipelineStateRef.get();
        if (log.isTraceEnabled()) {
            log.trace("Will receive " + (pipelineState != null ? " and replicate" : "") + ": msg=" + hAWriteMessage);
        }
        final ByteBuffer receiveBuffer = getReceiveBuffer();
        if (pipelineState == null) {
            try {
                return getHAReceiveService().receiveData(hAWriteMessage, receiveBuffer);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        FutureTask futureTask = new FutureTask(new Callable<Void>() { // from class: com.bigdata.ha.QuorumPipelineImpl.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                Future<Void> receiveData = QuorumPipelineImpl.this.getHAReceiveService().receiveData(hAWriteMessage, receiveBuffer);
                try {
                    Future<Void> receiveAndReplicate = pipelineState.service.receiveAndReplicate(hAWriteMessage);
                    while (!receiveData.isDone() && !receiveAndReplicate.isDone()) {
                        try {
                            try {
                                receiveData.get(1L, TimeUnit.SECONDS);
                            } catch (TimeoutException e2) {
                            }
                            try {
                                receiveAndReplicate.get(10L, TimeUnit.MILLISECONDS);
                            } catch (TimeoutException e3) {
                            }
                        } catch (Throwable th) {
                            if (!receiveAndReplicate.isDone()) {
                                receiveAndReplicate.cancel(true);
                            }
                            throw th;
                        }
                    }
                    receiveData.get();
                    receiveAndReplicate.get();
                    if (!receiveAndReplicate.isDone()) {
                        receiveAndReplicate.cancel(true);
                    }
                    return null;
                } finally {
                    receiveData.cancel(true);
                }
            }
        });
        this.member.getExecutor().execute(futureTask);
        return futureTask;
    }

    protected abstract void handleReplicatedWrite(HAWriteMessage hAWriteMessage, ByteBuffer byteBuffer) throws Exception;
}
