package org.springframework.batch.integration.partition;

import java.util.Collection;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.StepExecutionSplitter;
import org.springframework.integration.Message;
import org.springframework.integration.annotation.Aggregator;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Payloads;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.core.MessagingOperations;
import org.springframework.integration.core.PollableChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.util.Assert;

@MessageEndpoint
/* loaded from: input_file:WEB-INF/lib/spring-batch-integration-1.2.1.RELEASE.jar:org/springframework/batch/integration/partition/MessageChannelPartitionHandler.class */
public class MessageChannelPartitionHandler implements PartitionHandler {
    private static Log logger = LogFactory.getLog(MessageChannelPartitionHandler.class);
    private int gridSize = 1;
    private MessagingOperations messagingGateway;
    private String stepName;

    public void afterPropertiesSet() throws Exception {
        Assert.notNull(this.stepName, "A step name must be provided for the remote workers.");
        Assert.state(this.messagingGateway != null, "The MessagingOperations must be set");
    }

    public void setMessagingOperations(MessagingOperations messagingOperations) {
        this.messagingGateway = messagingOperations;
    }

    public void setGridSize(int i) {
        this.gridSize = i;
    }

    public void setStepName(String str) {
        this.stepName = str;
    }

    @Aggregator(sendPartialResultsOnExpiry = true)
    public List<?> aggregate(@Payloads List<?> list) {
        return list;
    }

    @Override // org.springframework.batch.core.partition.PartitionHandler
    public Collection<StepExecution> handle(StepExecutionSplitter stepExecutionSplitter, StepExecution stepExecution) throws Exception {
        Set<StepExecution> split = stepExecutionSplitter.split(stepExecution, this.gridSize);
        int i = 0;
        QueueChannel queueChannel = new QueueChannel();
        for (StepExecution stepExecution2 : split) {
            int i2 = i;
            i++;
            Message<StepExecutionRequest> createMessage = createMessage(i2, split.size(), new StepExecutionRequest(this.stepName, stepExecution2.getJobExecutionId(), stepExecution2.getId()), queueChannel);
            if (logger.isDebugEnabled()) {
                logger.debug("Sending request: " + createMessage);
            }
            this.messagingGateway.send(createMessage);
        }
        Message receive = this.messagingGateway.receive(queueChannel);
        if (logger.isDebugEnabled()) {
            logger.debug("Received replies: " + receive);
        }
        return (Collection) receive.getPayload();
    }

    private Message<StepExecutionRequest> createMessage(int i, int i2, StepExecutionRequest stepExecutionRequest, PollableChannel pollableChannel) {
        return MessageBuilder.withPayload(stepExecutionRequest).setSequenceNumber(Integer.valueOf(i)).setSequenceSize(Integer.valueOf(i2)).setCorrelationId(stepExecutionRequest.getJobExecutionId() + ":" + stepExecutionRequest.getStepName()).setReplyChannel(pollableChannel).build();
    }
}
