package com.ning.metrics.meteo.subscribers;

import com.espertech.esper.client.EPServiceProvider;
import com.google.inject.Inject;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import net.sf.json.JSONObject;
import net.sf.json.JSONSerializer;
import net.sf.json.xml.XMLSerializer;
import org.apache.log4j.Logger;
import org.xml.sax.SAXException;

/* loaded from: input_file:com/ning/metrics/meteo/subscribers/UdpJsonSubscriber.class */
class UdpJsonSubscriber implements Subscriber {
    private final EPServiceProvider esperSink;
    private final UdpJsonSubscriberConfig config;
    private final DatagramSocket socket;
    private final DatagramPacket packet;
    private Thread acceptThread;
    private ExecutorService handlerPool;
    private final Logger log = Logger.getLogger(UdpJsonSubscriber.class);
    boolean running = false;

    /* loaded from: input_file:com/ning/metrics/meteo/subscribers/UdpJsonSubscriber$PacketHandler.class */
    protected class PacketHandler implements Runnable {
        private final byte[] packetData;

        public PacketHandler(byte[] bArr) {
            this.packetData = bArr;
        }

        @Override // java.lang.Runnable
        public void run() {
            UdpJsonSubscriber.this.log.debug("Running handler...");
            XMLSerializer xMLSerializer = new XMLSerializer();
            try {
                JSONObject json = JSONSerializer.toJSON(new String(this.packetData));
                if (!json.has("timestamp")) {
                    json.put("timestamp", Long.valueOf(new Date().getTime()));
                }
                xMLSerializer.setRootName(UdpJsonSubscriber.this.config.getEventOutputName());
                try {
                    UdpJsonSubscriber.this.esperSink.getEPRuntime().sendEvent(DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(new ByteArrayInputStream(xMLSerializer.write(json).getBytes())));
                    UdpJsonSubscriber.this.log.debug("JSON event submitted: " + json);
                } catch (IOException e) {
                    UdpJsonSubscriber.this.log.error("IO Exception: " + e.getMessage());
                    e.printStackTrace();
                } catch (ParserConfigurationException e2) {
                    UdpJsonSubscriber.this.log.error("Error with parser configuration: " + e2.getMessage());
                } catch (SAXException e3) {
                    UdpJsonSubscriber.this.log.error("SAX Exception: " + e3.getMessage());
                }
            } catch (ClassCastException e4) {
                UdpJsonSubscriber.this.log.error("Error converting packet to JSON: " + e4.getMessage());
            } catch (Exception e5) {
                UdpJsonSubscriber.this.log.error("Got exception: " + e5.getMessage());
            }
        }
    }

    @Inject
    public UdpJsonSubscriber(UdpJsonSubscriberConfig udpJsonSubscriberConfig, EPServiceProvider ePServiceProvider) throws SocketException, UnknownHostException {
        this.config = udpJsonSubscriberConfig;
        this.esperSink = ePServiceProvider;
        this.socket = new DatagramSocket(udpJsonSubscriberConfig.getPort(), InetAddress.getByName("0.0.0.0"));
        this.packet = new DatagramPacket(new byte[udpJsonSubscriberConfig.getPacketSize()], udpJsonSubscriberConfig.getPacketSize());
        this.log.info("Created UDP socket on port " + udpJsonSubscriberConfig.getPort() + " with packet size " + udpJsonSubscriberConfig.getPacketSize());
    }

    @Override // com.ning.metrics.meteo.subscribers.Subscriber
    public void subscribe() {
        this.running = true;
        this.handlerPool = Executors.newCachedThreadPool();
        this.acceptThread = new Thread() { // from class: com.ning.metrics.meteo.subscribers.UdpJsonSubscriber.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (UdpJsonSubscriber.this.running) {
                    try {
                        UdpJsonSubscriber.this.log.debug("Waiting on packet");
                        UdpJsonSubscriber.this.socket.receive(UdpJsonSubscriber.this.packet);
                        UdpJsonSubscriber.this.log.debug("Got packet: " + UdpJsonSubscriber.this.packet.getLength());
                    } catch (IOException e) {
                        UdpJsonSubscriber.this.log.error("Error receiving packet: " + e.getMessage());
                    }
                    UdpJsonSubscriber.this.handlerPool.submit(new PacketHandler(UdpJsonSubscriber.this.packet.getData()));
                }
            }
        };
        this.acceptThread.setDaemon(true);
        this.acceptThread.start();
    }

    @Override // com.ning.metrics.meteo.subscribers.Subscriber
    public void unsubscribe() {
        this.log.info("Unsubscribing...");
        this.running = false;
        this.acceptThread.interrupt();
        try {
            this.acceptThread.join();
        } catch (InterruptedException e) {
        }
        this.handlerPool.shutdownNow();
    }
}
