package com.socklabs.elasticservices.core.message;

import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.Message;
import com.netflix.servo.DefaultMonitorRegistry;
import com.socklabs.elasticservices.core.ServiceProto;
import com.socklabs.elasticservices.core.collection.Pair;
import com.socklabs.elasticservices.core.service.DefaultMessageController;
import com.socklabs.elasticservices.core.service.MessageController;
import com.socklabs.elasticservices.core.service.ServiceRegistry;
import com.socklabs.servo.ext.Gauges;
import com.socklabs.servo.ext.MapSizeCallable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import org.joda.time.DateTime;

/* loaded from: input_file:com/socklabs/elasticservices/core/message/DefaultResponseManager.class */
public class DefaultResponseManager implements ResponseManager {
    private final ServiceProto.ServiceRef serviceRef;
    private final ServiceRegistry serviceRegistry;
    private final ConcurrentMap<Integer, Pair<SettableFuture<Message>, DateTime>> resultsFutures = Maps.newConcurrentMap();

    public DefaultResponseManager(ServiceProto.ServiceRef serviceRef, ServiceRegistry serviceRegistry) {
        this.serviceRef = serviceRef;
        this.serviceRegistry = serviceRegistry;
        DefaultMonitorRegistry.getInstance().register(Gauges.gauge(MessageUtils.serviceRefToString(serviceRef) + ".waitingResults", new MapSizeCallable(this.resultsFutures)));
    }

    @Override // com.socklabs.elasticservices.core.message.ResponseManager
    public Future<Message> sendAndReceive(ServiceProto.ServiceRef serviceRef, AbstractMessage abstractMessage, Class cls, Optional<Expiration> optional) {
        SettableFuture create = SettableFuture.create();
        byte[] randomMessageId = MessageUtils.randomMessageId(24);
        DefaultMessageController defaultMessageController = new DefaultMessageController(this.serviceRef, serviceRef, ContentTypes.fromClass(cls), Optional.of(randomMessageId), Optional.absent(), optional.isPresent() ? Optional.of(((Expiration) optional.get()).getExpiration()) : Optional.absent());
        this.resultsFutures.putIfAbsent(Integer.valueOf(Arrays.hashCode(randomMessageId)), new Pair<>(create, DateTime.now()));
        this.serviceRegistry.sendMessage(defaultMessageController, abstractMessage);
        return create;
    }

    @Override // com.socklabs.elasticservices.core.message.ResponseManager
    public void handleMessage(MessageController messageController, Message message) {
        Pair<SettableFuture<Message>, DateTime> pair;
        Optional<byte[]> correlationId = messageController.getCorrelationId();
        if (!correlationId.isPresent() || (pair = this.resultsFutures.get(Integer.valueOf(Arrays.hashCode((byte[]) correlationId.get())))) == null) {
            return;
        }
        pair.getA().set(message);
        this.resultsFutures.remove(Integer.valueOf(Arrays.hashCode((byte[]) correlationId.get())));
    }

    @Override // com.socklabs.elasticservices.core.message.ResponseManager
    public void clear(DateTime dateTime) {
        Iterator<Map.Entry<Integer, Pair<SettableFuture<Message>, DateTime>>> it = this.resultsFutures.entrySet().iterator();
        while (it.hasNext()) {
            if (it.next().getValue().getB().isBefore(dateTime)) {
                it.remove();
            }
        }
    }
}
