package org.apache.helix.controller.restlet;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import org.apache.helix.AccessOption;
import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.log4j.Logger;
import org.restlet.Component;
import org.restlet.Context;
import org.restlet.data.Protocol;
import org.restlet.engine.Engine;

/* loaded from: input_file:org/apache/helix/controller/restlet/ZKPropertyTransferServer.class */
public class ZKPropertyTransferServer {
    public static final String PORT = "port";
    public static final String SERVER = "ZKPropertyTransferServer";
    int _localWebservicePort;
    String _webserviceUrl;
    ZkBaseDataAccessor<ZNRecord> _accessor;
    String _zkAddress;
    AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>> _dataBufferRef = new AtomicReference<>();
    boolean _initialized = false;
    boolean _shutdownFlag = false;
    Component _component = null;
    Timer _timer = null;
    static ZKPropertyTransferServer _instance;
    public static String RESTRESOURCENAME = "ZNRecordUpdates";
    public static int PERIOD = 10000;
    public static int MAX_UPDATE_LIMIT = 10000;
    private static Logger LOG = Logger.getLogger(ZKPropertyTransferServer.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/helix/controller/restlet/ZKPropertyTransferServer$ZKPropertyTransferTask.class */
    public class ZKPropertyTransferTask extends TimerTask {
        ZKPropertyTransferTask() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            try {
                ZKPropertyTransferServer.this.sendData();
            } catch (Throwable th) {
                ZKPropertyTransferServer.LOG.error("", th);
            }
        }
    }

    void sendData() {
        ConcurrentHashMap<String, ZNRecordUpdate> andSet;
        LOG.info("ZKPropertyTransferServer transfering data to zookeeper");
        synchronized (this._dataBufferRef) {
            andSet = this._dataBufferRef.getAndSet(new ConcurrentHashMap<>());
        }
        if (andSet == null) {
            LOG.warn("null _dataQueueRef. Should be in the beginning only");
            return;
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        for (ZNRecordUpdate zNRecordUpdate : andSet.values()) {
            arrayList.add(zNRecordUpdate.getPath());
            arrayList2.add(zNRecordUpdate.getZNRecordUpdater());
            arrayList3.add(zNRecordUpdate.getRecord());
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (arrayList.size() > 0) {
            this._accessor.updateChildren(arrayList, arrayList2, AccessOption.PERSISTENT);
        }
        LOG.info("ZKPropertyTransferServer updated " + arrayList3.size() + " records in " + (System.currentTimeMillis() - currentTimeMillis) + " ms");
    }

    private ZKPropertyTransferServer() {
        this._dataBufferRef.getAndSet(new ConcurrentHashMap<>());
    }

    public static ZKPropertyTransferServer getInstance() {
        return _instance;
    }

    public boolean isInitialized() {
        return this._initialized;
    }

    public void init(int i, String str) {
        if (this._initialized || this._shutdownFlag) {
            LOG.error("Already initialized with port " + this._localWebservicePort + " shutdownFlag: " + this._shutdownFlag);
            return;
        }
        LOG.error("Initializing with port " + i + " zkAddress: " + str);
        this._localWebservicePort = i;
        ZkClient zkClient = new ZkClient(str);
        zkClient.setZkSerializer(new ZNRecordSerializer());
        this._accessor = new ZkBaseDataAccessor<>(zkClient);
        this._zkAddress = str;
        startServer();
    }

    public String getWebserviceUrl() {
        if (this._initialized && !this._shutdownFlag) {
            return this._webserviceUrl;
        }
        LOG.debug("inited:" + this._initialized + " shutdownFlag:" + this._shutdownFlag + " , return");
        return null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void enqueueData(ZNRecordUpdate zNRecordUpdate) {
        if (!this._initialized || this._shutdownFlag) {
            LOG.error("zkDataTransferServer inited:" + this._initialized + " shutdownFlag:" + this._shutdownFlag + " , return");
            return;
        }
        synchronized (this._dataBufferRef) {
            zNRecordUpdate.getRecord().setSimpleField(SERVER, this._webserviceUrl);
            if (this._dataBufferRef.get().containsKey(zNRecordUpdate.getPath())) {
                this._dataBufferRef.get().get(zNRecordUpdate.getPath())._record = (ZNRecord) zNRecordUpdate.getZNRecordUpdater().update(this._dataBufferRef.get().get(zNRecordUpdate.getPath()).getRecord());
            } else {
                this._dataBufferRef.get().put(zNRecordUpdate.getPath(), zNRecordUpdate);
            }
        }
        if (this._dataBufferRef.get().size() > MAX_UPDATE_LIMIT) {
            sendData();
        }
    }

    void startServer() {
        LOG.info("zkDataTransferServer starting on Port " + this._localWebservicePort + " zkAddress " + this._zkAddress);
        this._component = new Component();
        this._component.getServers().add(Protocol.HTTP, this._localWebservicePort);
        Context createChildContext = this._component.getContext().createChildContext();
        createChildContext.getAttributes().put(SERVER, this);
        createChildContext.getAttributes().put("port", "" + this._localWebservicePort);
        this._component.getDefaultHost().attach(new ZkPropertyTransferApplication(createChildContext));
        this._timer = new Timer(true);
        this._timer.schedule(new ZKPropertyTransferTask(), PERIOD, PERIOD);
        try {
            this._webserviceUrl = "http://" + InetAddress.getLocalHost().getCanonicalHostName() + ":" + this._localWebservicePort + "/" + RESTRESOURCENAME;
            this._component.start();
            this._initialized = true;
        } catch (Exception e) {
            LOG.error("", e);
        }
        LOG.info("zkDataTransferServer started on Port " + this._localWebservicePort + " zkAddress " + this._zkAddress);
    }

    public void shutdown() {
        if (this._shutdownFlag) {
            LOG.error("ZKPropertyTransferServer already has been shutdown...");
            return;
        }
        LOG.info("zkDataTransferServer shuting down on Port " + this._localWebservicePort + " zkAddress " + this._zkAddress);
        if (this._timer != null) {
            this._timer.cancel();
        }
        if (this._component != null) {
            try {
                this._component.stop();
            } catch (Exception e) {
                LOG.error("", e);
            }
        }
        this._shutdownFlag = true;
    }

    public void reset() {
        if (this._shutdownFlag) {
            this._shutdownFlag = false;
            this._initialized = false;
            this._component = null;
            this._timer = null;
            this._dataBufferRef.getAndSet(new ConcurrentHashMap<>());
        }
    }

    static {
        Engine.setLogLevel(Level.SEVERE);
        _instance = new ZKPropertyTransferServer();
    }
}
