Created
March 17, 2017 17:51
-
-
Save Toparvion/348c184ca7797d749ece33329da74e7c to your computer and use it in GitHub Desktop.
Code snippet for answer to http://stackoverflow.com/q/42562053/3507435
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.integration.core.MessagingTemplate; | |
import org.springframework.integration.dsl.AggregatorSpec; | |
import org.springframework.integration.store.MessageGroup; | |
import org.springframework.messaging.Message; | |
import org.springframework.messaging.MessageChannel; | |
import org.springframework.messaging.support.MessageBuilder; | |
import reactor.util.function.Tuple2; | |
import reactor.util.function.Tuples; | |
import java.math.BigDecimal; | |
import java.util.Objects; | |
import static java.util.concurrent.TimeUnit.MILLISECONDS; | |
import static java.util.stream.Collectors.joining; | |
import static org.springframework.integration.IntegrationMessageHeaderAccessor.CORRELATION_ID; | |
public class AggregatorConfigurer { | |
private static final Logger log = LoggerFactory.getLogger(AggregatorConfigurer.class); | |
private final MessageChannel preAggregatorQueueChannel; | |
public AggregatorConfigurer(MessageChannel preAggregatorQueueChannel) { | |
this.preAggregatorQueueChannel = preAggregatorQueueChannel; | |
} | |
public void configure(AggregatorSpec spec) { | |
spec.correlationStrategy(message -> BigDecimal.ONE) | |
.releaseStrategy(this::releaseStrategy) | |
.outputProcessor(this::outputProcessor) | |
.poller(p -> p.fixedDelay(50, MILLISECONDS)) | |
.id("recordAggregator"); | |
} | |
private boolean releaseStrategy(MessageGroup group) { | |
if (group.size() == 1) { | |
return false; // i.e. if group consists of just one element and thus is insufficient for making decision | |
} | |
Tuple2<Message<?>, Message<?>> prevAndLastMessages = findPrevAndLastMessages(group); | |
Message<?> lastMessage = prevAndLastMessages.getT1(); | |
Message<?> second2LastMessage = prevAndLastMessages.getT2(); | |
assert (lastMessage != null) && (second2LastMessage != null); // relying on check for singleton group | |
Long lastMessageCorrId = lastMessage.getHeaders().get(CORRELATION_ID, Long.class); | |
Long prevMessageCorrId = second2LastMessage.getHeaders().get(CORRELATION_ID, Long.class); | |
boolean isGroupComplete = !Objects.equals(lastMessageCorrId, prevMessageCorrId); | |
if (isGroupComplete) { | |
log.debug("Group {} is about to be released as last corrId {} differs from previous ones {}.", | |
group.getGroupId(), lastMessageCorrId, prevMessageCorrId); | |
} | |
return isGroupComplete; | |
} | |
private Object outputProcessor(MessageGroup group) { | |
assert group.size() > 0; | |
if (group.size() == 1) { // the singleton group is special case and must be handled separately | |
return MessageBuilder.fromMessage(group.getOne()).build(); | |
} | |
Tuple2<Message<?>, Message<?>> prevAndLastMessages = findPrevAndLastMessages(group); | |
Message<?> lastMessage = prevAndLastMessages.getT1(); | |
assert (lastMessage != null); // relying on check for singleton group | |
group.remove(lastMessage); | |
new MessagingTemplate(preAggregatorQueueChannel).send(lastMessage); | |
return composeRecord(group); | |
} | |
private Object composeRecord(MessageGroup group) { | |
return MessageBuilder | |
.withPayload(group.getMessages() | |
.stream() | |
.map(Message::getPayload) | |
.map(Object::toString) | |
.collect(joining("\n"))) | |
.copyHeadersIfAbsent(group.getOne().getHeaders()) | |
.build(); | |
} | |
/** | |
* @return last (T1) and second to last (T2) messages from given group; both messages can be {@code null} if the | |
* group contains less than 2 or 1 elements | |
*/ | |
private Tuple2<Message<?>, Message<?>> findPrevAndLastMessages(MessageGroup group) { | |
Message<?> lastMessage = null, second2LastMessage = null; | |
for (Message<?> message : group.getMessages()) { | |
second2LastMessage = lastMessage; | |
lastMessage = message; | |
} | |
return Tuples.of(lastMessage, second2LastMessage); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// A fragment of typical @Configuration class to show how to use the aggregator | |
MessageChannel preAggregatorQueueChannel = MessageChannels.queue(RECORD_AGGREGATOR_INPUT_CHANNEL).get(); | |
RecordAggregatorConfigurer recordAggregatorConfigurer = new RecordAggregatorConfigurer(preAggregatorQueueChannel); | |
return IntegrationFlows | |
.from(...) | |
.channel(preAggregatorQueueChannel) | |
.aggregate(recordAggregatorConfigurer::configure) | |
.handle(...) | |
.get(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment