Skip to content

Instantly share code, notes, and snippets.

@Megaprog
Last active July 13, 2021 02:22
Show Gist options
  • Save Megaprog/ce54338b1c815722973a94e9dfb457af to your computer and use it in GitHub Desktop.
Save Megaprog/ce54338b1c815722973a94e9dfb457af to your computer and use it in GitHub Desktop.
How to start and stop SimpleMessageListenerContainer and wait a result
val events = CopyOnWriteArrayList<UserSubscriptionExternalActivationEvent>()
val listenerContainer = rabbitConfig.bind<UserSubscriptionExternalActivationEvent>(
routingKey.externalActivation,
TopicExchange(properties.template.exchange),
) {
println("result is $it")
events.add(it)
Mono.empty()
}
listenerContainer.start()
val screenApiMessage = ScreenApiLoginEvent(1L, Device.server("test"), Location.server, OffsetDateTime.now(), null)
rabbitSender.send(screenApiMessage, screenApiRabbitProperties.screenApiLoginEvent.routingKey, screenApiRabbitProperties.exchange)
val externalAuthMessage = ExternalAuthLoginEvent("2", null, "none", null, Location.server)
rabbitSender.send(externalAuthMessage, externalAuthRabbitProperties.externalAuthLoginEvent.routingKey, externalAuthRabbitProperties.exchange)
val authenticatorMessage = AuthenticatorLoginEvent("3", "none", "none", Device.server("test"), Location.server)
rabbitSender.send(authenticatorMessage, authenticatorRabbitProperties.authenticatorLoginEvent.routingKey, authenticatorRabbitProperties.exchange)
assertTimeoutPreemptively(Duration.ofSeconds(10)) {
while (events.size < 3) {
LockSupport.parkNanos(1_000_000)
}
}
listenerContainer.stop()
val subs1 = subscriptionDao.getNeededExternalActivationSubscriptions(1L).collectList().block()
val subs2 = subscriptionDao.getNeededExternalActivationSubscriptions(2L).collectList().block()
val subs3 = subscriptionDao.getNeededExternalActivationSubscriptions(3L).collectList().block()
assertThat(subs1).isEmpty()
assertThat(subs2).isEmpty()
assertThat(subs3).isEmpty()
val queueName = "${properties.template.exchange}.${routingKey.externalActivation}"
val stdout = rabbitmq.execInContainer("rabbitmqctl", "list_queues", "name", "messages_ready").stdout
assertThat(stdout).containsPattern("$queueName\\s.*0")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment