package org.apache.hedwig.admin.console;

import com.google.protobuf.ByteString;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.hedwig.admin.HedwigAdmin;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.server.meta.FactoryLayout;

/* loaded from: input_file:org/apache/hedwig/admin/console/ReadTopic.class */
public class ReadTopic {
    final HedwigAdmin admin;
    final ByteString topic;
    long startSeqId;
    long leastConsumedSeqId;
    final boolean inConsole;
    static final int RC_OK = 0;
    static final int RC_ERROR = -1;
    static final int RC_NOTOPIC = -2;
    static final int RC_NOLEDGERS = -3;
    static final int RC_NOSUBSCRIBERS = -4;
    static final int NUM_MESSAGES_TO_PRINT = 15;
    List<PubSubProtocol.LedgerRange> ledgers;

    public ReadTopic(HedwigAdmin hedwigAdmin, ByteString byteString, boolean z) {
        this(hedwigAdmin, byteString, 1L, z);
    }

    public ReadTopic(HedwigAdmin hedwigAdmin, ByteString byteString, long j, boolean z) {
        this.leastConsumedSeqId = Long.MAX_VALUE;
        this.ledgers = new ArrayList();
        this.admin = hedwigAdmin;
        this.topic = byteString;
        this.startSeqId = j;
        this.inConsole = z;
    }

    protected int checkTopic() throws Exception {
        return this.admin.hasTopic(this.topic) ? RC_OK : RC_NOTOPIC;
    }

    protected int getTopicLedgers() throws Exception {
        List<PubSubProtocol.LedgerRange> topicLedgers = this.admin.getTopicLedgers(this.topic);
        if (RC_OK == topicLedgers || topicLedgers.isEmpty()) {
            return RC_NOLEDGERS;
        }
        this.ledgers.addAll(topicLedgers);
        return RC_OK;
    }

    protected int getLeastSubscription() throws Exception {
        Map<ByteString, PubSubProtocol.SubscriptionData> topicSubscriptions = this.admin.getTopicSubscriptions(this.topic);
        if (topicSubscriptions.isEmpty()) {
            return RC_NOSUBSCRIBERS;
        }
        Iterator<Map.Entry<ByteString, PubSubProtocol.SubscriptionData>> it = topicSubscriptions.entrySet().iterator();
        while (it.hasNext()) {
            long localComponent = it.next().getValue().getState().getMsgId().getLocalComponent();
            if (localComponent < this.leastConsumedSeqId) {
                this.leastConsumedSeqId = localComponent;
            }
        }
        if (this.leastConsumedSeqId != Long.MAX_VALUE) {
            return RC_OK;
        }
        this.leastConsumedSeqId = 0L;
        return RC_OK;
    }

    public void readTopic() {
        try {
            switch (_readTopic()) {
                case RC_NOLEDGERS /* -3 */:
                    System.err.println("No message is published to topic " + this.topic);
                    break;
                case RC_NOTOPIC /* -2 */:
                    System.err.println("No topic " + this.topic + " found.");
                    break;
            }
        } catch (Exception e) {
            System.err.println("ERROR: read messages of topic " + this.topic + " failed.");
            e.printStackTrace();
        }
    }

    protected int _readTopic() throws Exception {
        int checkTopic = checkTopic();
        if (RC_OK != checkTopic) {
            return checkTopic;
        }
        int topicLedgers = getTopicLedgers();
        if (RC_OK != topicLedgers) {
            return topicLedgers;
        }
        int leastSubscription = getLeastSubscription();
        if (RC_NOSUBSCRIBERS == leastSubscription) {
            this.startSeqId = 1L;
        } else {
            if (RC_OK != leastSubscription) {
                return leastSubscription;
            }
            if (this.leastConsumedSeqId > this.startSeqId) {
                this.startSeqId = this.leastConsumedSeqId + 1;
            }
        }
        for (PubSubProtocol.LedgerRange ledgerRange : this.ledgers) {
            long localComponent = ledgerRange.getEndSeqIdIncluded().getLocalComponent();
            if (localComponent >= this.startSeqId) {
                boolean readLedger = readLedger(ledgerRange);
                this.startSeqId = localComponent + 1;
                if (!readLedger) {
                    return RC_OK;
                }
            }
        }
        return RC_OK;
    }

    protected boolean readLedger(PubSubProtocol.LedgerRange ledgerRange) throws BKException, IOException, InterruptedException {
        PubSubProtocol.Message parseFrom;
        long localComponent = ledgerRange.getEndSeqIdIncluded().getLocalComponent();
        if (localComponent < this.startSeqId) {
            return true;
        }
        long ledgerId = ledgerRange.getLedgerId();
        System.out.println("\n>>>>> " + ledgerRange + " <<<<<\n");
        LedgerHandle ledgerHandle = RC_OK;
        try {
            ledgerHandle = this.admin.getBkHandle().openLedgerNoRecovery(ledgerId, this.admin.getBkDigestType(), this.admin.getBkPasswd());
        } catch (BKException e) {
            System.err.println("ERROR: No ledger " + ledgerId + " found. maybe garbage collected due to the messages are consumed.");
        }
        if (RC_OK == ledgerHandle) {
            return true;
        }
        long startSeqIdIncluded = this.startSeqId - ledgerRange.getStartSeqIdIncluded();
        long j = localComponent;
        while (this.startSeqId <= localComponent) {
            try {
                j = Math.min((this.startSeqId + 15) - 1, localComponent);
                try {
                    Enumeration readEntries = ledgerHandle.readEntries(this.startSeqId - ledgerRange.getStartSeqIdIncluded(), j - ledgerRange.getStartSeqIdIncluded());
                    while (readEntries.hasMoreElements()) {
                        LedgerEntry ledgerEntry = (LedgerEntry) readEntries.nextElement();
                        try {
                            parseFrom = PubSubProtocol.Message.parseFrom(ledgerEntry.getEntryInputStream());
                        } catch (IOException e2) {
                            System.out.println("WARN: Unreadable message found\n");
                            startSeqIdIncluded++;
                        }
                        if (startSeqIdIncluded != ledgerEntry.getEntryId() || parseFrom.getMsgId().getLocalComponent() - ledgerRange.getStartSeqIdIncluded() != startSeqIdIncluded) {
                            throw new IOException("ERROR: Message ids are out of order : expected entry id " + startSeqIdIncluded + ", current entry id " + ledgerEntry.getEntryId() + ", msg seq id " + parseFrom.getMsgId().getLocalComponent());
                        }
                        startSeqIdIncluded++;
                        formatMessage(parseFrom);
                    }
                    this.startSeqId = j + 1;
                    if (this.inConsole && !pressKeyToContinue()) {
                        return false;
                    }
                } catch (BKException.BKReadException e3) {
                    throw e3;
                }
            } catch (BKException e4) {
                if (localComponent != Long.MAX_VALUE) {
                    System.err.println("ERROR: ledger " + ledgerId + " may be corrupted, since read messages [" + this.startSeqId + " ~ " + j + " ] failed :");
                    throw e4;
                }
            }
        }
        System.out.println(FactoryLayout.LSEP);
        return true;
    }

    protected void formatMessage(PubSubProtocol.Message message) {
        String sb;
        if (message.hasMsgId()) {
            PubSubProtocol.MessageSeqId msgId = message.getMsgId();
            StringBuilder sb2 = new StringBuilder();
            if (msgId.hasLocalComponent()) {
                sb2.append("LOCAL(").append(msgId.getLocalComponent()).append(")");
            } else {
                List<PubSubProtocol.RegionSpecificSeqId> remoteComponentsList = msgId.getRemoteComponentsList();
                int i = RC_OK;
                int size = remoteComponentsList.size();
                sb2.append("REMOTE(");
                for (PubSubProtocol.RegionSpecificSeqId regionSpecificSeqId : remoteComponentsList) {
                    sb2.append(regionSpecificSeqId.getRegion().toStringUtf8());
                    sb2.append("[");
                    sb2.append(regionSpecificSeqId.getSeqId());
                    sb2.append("]");
                    i++;
                    if (i < size) {
                        sb2.append(",");
                    }
                }
                sb2.append(")");
            }
            sb = sb2.toString();
        } else {
            sb = "N/A";
        }
        System.out.println("---------- MSGID=" + sb + " ----------");
        System.out.println("MsgId:     " + sb);
        if (message.hasSrcRegion()) {
            System.out.println("SrcRegion: " + message.getSrcRegion().toStringUtf8());
        } else {
            System.out.println("SrcRegion: N/A");
        }
        System.out.println("Message:");
        System.out.println();
        if (message.hasBody()) {
            System.out.println(message.getBody().toStringUtf8());
        } else {
            System.out.println("N/A");
        }
        System.out.println();
    }

    boolean pressKeyToContinue() throws IOException {
        System.out.println("Press Y to continue...");
        int read = new BufferedReader(new InputStreamReader(System.in)).read();
        return read == 121 || read == 89;
    }
}
