Last active
August 27, 2024 10:02
-
-
Save rponte/2d881a3c531fc862183d7b3d716e2458 to your computer and use it in GitHub Desktop.
Spring Boot: how to test @SqsListener via integration tests
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## | |
# Application configuration | |
## | |
samples.aws.sqs.consumer-queue: customersCreatedQueue | |
## | |
# Spring Cloud AWS | |
## | |
cloud: | |
aws: | |
sqs: | |
listener: | |
auto-startup: false # does not start the listener on startup | |
queue-stop-timeout: 500 # when stopping the listener don't wait too much |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package br.com.zup.edu.demo.samples.aws.sqs; | |
import com.amazonaws.services.sqs.AmazonSQSAsync; | |
import io.awspring.cloud.core.env.ResourceIdResolver; | |
import io.awspring.cloud.messaging.listener.SimpleMessageListenerContainer; | |
import io.awspring.cloud.messaging.support.destination.DynamicQueueUrlDestinationResolver; | |
import org.springframework.beans.BeansException; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.beans.factory.config.BeanPostProcessor; | |
import org.springframework.boot.test.context.TestConfiguration; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.messaging.core.CachingDestinationResolverProxy; | |
import org.springframework.messaging.core.DestinationResolver; | |
@TestConfiguration | |
public class SqsTestConfig { | |
/** | |
* Configures the SimpleMessageListenerContainer to auto-create a SQS Queue in case it does not exist. | |
* | |
* This is necessary because if the queue does not exist during startup the SimpleMessageListenerContainer | |
* stops working with the following warning message: | |
* | |
* > WARN [main] i.a.c.m.l.SimpleMessageListenerContainer: | |
* > Ignoring queue with name 'customersCreatedQueue': The queue does not exist.; | |
* > nested exception is com.amazonaws.services.sqs.model.QueueDoesNotExistException: The specified queue | |
* > does not exist for this wsdl version. | |
*/ | |
@Bean | |
public BeanPostProcessor simpleMessageListenerContainerPostProcessor(DestinationResolver<String> destinationResolver) { | |
return new BeanPostProcessor() { | |
@Override | |
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { | |
if (bean instanceof SimpleMessageListenerContainer container) { | |
container.setQueueStopTimeout(500); | |
container.setDestinationResolver(destinationResolver); | |
} | |
return bean; | |
} | |
}; | |
} | |
/** | |
* Creates a DynamicQueueUrlDestinationResolver capable of auto-creating | |
* a SQS queue in case it does not exist | |
*/ | |
@Bean | |
public DestinationResolver<String> autoCreateQueueDestinationResolver( | |
AmazonSQSAsync sqs, | |
@Autowired(required = false) ResourceIdResolver resourceIdResolver) { | |
DynamicQueueUrlDestinationResolver autoCreateQueueResolver | |
= new DynamicQueueUrlDestinationResolver(sqs, resourceIdResolver); | |
autoCreateQueueResolver.setAutoCreate(true); | |
return new CachingDestinationResolverProxy<>(autoCreateQueueResolver); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package br.com.zup.edu.demo.samples.aws.sqs; | |
import br.com.zup.edu.demo.samples.aws.sqs.model.Customer; | |
import br.com.zup.edu.demo.samples.aws.sqs.model.CustomerRepository; | |
import io.awspring.cloud.messaging.listener.SqsMessageDeletionPolicy; | |
import io.awspring.cloud.messaging.listener.annotation.SqsListener; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.messaging.handler.annotation.Header; | |
import org.springframework.messaging.handler.annotation.Headers; | |
import org.springframework.messaging.handler.annotation.MessageExceptionHandler; | |
import org.springframework.stereotype.Component; | |
import javax.validation.ConstraintViolationException; | |
import java.util.Map; | |
@Component | |
public class CustomerCreatedEventSqsListener { | |
private static final Logger LOGGER = LoggerFactory.getLogger(CustomerCreatedEventSqsListener.class); | |
private final CustomerRepository repository; | |
public CustomerCreatedEventSqsListener(CustomerRepository repository) { | |
this.repository = repository; | |
} | |
@SqsListener( | |
value = "${samples.aws.sqs.consumer-queue}", | |
deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS | |
) | |
public void receive(CustomerCreatedEvent event, @Header("MessageId") String messageId) { | |
LOGGER.info( | |
"Receiving a CustomerCreatedEvent (MessageId=\"{}\") from SQS queue: {}", | |
messageId, event | |
); | |
// converts to domain model and invokes your business logic | |
Customer customer = event.toModel(); | |
repository.save(customer); | |
} | |
@MessageExceptionHandler(ConstraintViolationException.class) | |
public void handleOnError(ConstraintViolationException exception, @Headers Map<String, String> headers) { | |
LOGGER.error( | |
"It was not possible to consume the message with messageId={} (ApproximateReceiveCount ={}): {}", | |
headers.get("MessageId"), | |
headers.get("ApproximateReceiveCount"), | |
exception.getLocalizedMessage() | |
); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package br.com.zup.edu.demo.samples.aws.sqs; | |
import br.com.zup.edu.demo.samples.aws.sqs.model.CustomerRepository; | |
import com.amazonaws.services.sqs.AmazonSQSAsync; | |
import com.amazonaws.services.sqs.model.GetQueueAttributesResult; | |
import com.amazonaws.services.sqs.model.PurgeQueueRequest; | |
import io.awspring.cloud.messaging.core.QueueMessagingTemplate; | |
import org.junit.jupiter.api.AfterEach; | |
import org.junit.jupiter.api.BeforeEach; | |
import org.junit.jupiter.api.DisplayName; | |
import org.junit.jupiter.api.Test; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.boot.test.context.SpringBootTest; | |
import org.springframework.context.annotation.Import; | |
import org.springframework.test.annotation.DirtiesContext; | |
import org.springframework.test.context.ActiveProfiles; | |
import org.springframework.test.context.TestPropertySource; | |
import java.time.LocalDateTime; | |
import java.util.UUID; | |
import static java.util.List.of; | |
import static java.util.concurrent.TimeUnit.SECONDS; | |
import static org.assertj.core.api.Assertions.assertThat; | |
import static org.awaitility.Awaitility.await; | |
@SpringBootTest | |
@ActiveProfiles("test") | |
@Import(SqsTestConfig.class) | |
/** | |
* Starts the listener on startup, and stops the listener after all tests | |
* (the @DirtiesContext annotation closes this context) | |
*/ | |
@DirtiesContext | |
@TestPropertySource(properties = { | |
"cloud.aws.sqs.listener.auto-startup = true" | |
}) | |
class CustomerCreatedEventSqsListenerTest { | |
@Autowired | |
private QueueMessagingTemplate sqsTemplate; | |
@Autowired | |
private AmazonSQSAsync SQS; | |
@Value("${samples.aws.sqs.consumer-queue}") | |
private String consumerQueueName; | |
@Autowired | |
private CustomerRepository repository; | |
@BeforeEach | |
public void setUp() { | |
repository.deleteAll(); | |
SQS.purgeQueue(new PurgeQueueRequest(consumerQueueName)); | |
} | |
@Test | |
@DisplayName("should consume an event from SQS queue") | |
public void t1() { | |
// scenario | |
CustomerCreatedEvent event = new CustomerCreatedEvent( | |
UUID.randomUUID(), | |
"Rafael Ponte", | |
"+5585988776655", | |
LocalDateTime.now() | |
); | |
sqsTemplate | |
.convertAndSend(consumerQueueName, event); | |
// action | |
// ...is async, so it will be performed by our SQS listener | |
// validation | |
await().atMost(3, SECONDS).untilAsserted(() -> { | |
assertThat(numberOfMessagesInQueue()).isEqualTo(0); | |
assertThat(numberOfMessagesNotVisibleInQueue()).isEqualTo(0); | |
assertThat(repository.findAll()) | |
.hasSize(1) | |
.usingRecursiveFieldByFieldElementComparator() | |
.containsExactly(event.toModel()); | |
}); | |
} | |
@Test | |
@DisplayName("should not consume an event from SQS queue when the event is invalid") | |
public void t2() { | |
// scenario | |
CustomerCreatedEvent invalidEvent = new CustomerCreatedEvent( | |
UUID.randomUUID(), null, null, null | |
); | |
sqsTemplate | |
.convertAndSend(consumerQueueName, invalidEvent); | |
// action | |
// ...is async, so it will be performed by our SQS listener | |
// validation | |
await().atMost(3, SECONDS).untilAsserted(() -> { | |
assertThat(repository.count()).isEqualTo(0); | |
assertThat(numberOfMessagesInQueue()).isEqualTo(0); | |
assertThat(numberOfMessagesNotVisibleInQueue()).isEqualTo(1); // messages with errors stay not-visible for 30s | |
}); | |
} | |
private Integer numberOfMessagesInQueue() { | |
GetQueueAttributesResult attributes = SQS | |
.getQueueAttributes(consumerQueueName, of("All")); | |
return Integer.parseInt( | |
attributes.getAttributes().get("ApproximateNumberOfMessages") | |
); | |
} | |
private Integer numberOfMessagesNotVisibleInQueue() { | |
GetQueueAttributesResult attributes = SQS | |
.getQueueAttributes(consumerQueueName, of("All")); | |
return Integer.parseInt( | |
attributes.getAttributes().get("ApproximateNumberOfMessagesNotVisible") | |
); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package br.com.zup.edu.demo.samples.aws.sqs; | |
import br.com.zup.edu.demo.samples.aws.sqs.model.CustomerRepository; | |
import com.amazonaws.services.sqs.AmazonSQSAsync; | |
import com.amazonaws.services.sqs.model.GetQueueAttributesResult; | |
import com.amazonaws.services.sqs.model.PurgeQueueRequest; | |
import io.awspring.cloud.messaging.core.QueueMessagingTemplate; | |
import io.awspring.cloud.messaging.listener.SimpleMessageListenerContainer; | |
import org.junit.jupiter.api.AfterEach; | |
import org.junit.jupiter.api.BeforeEach; | |
import org.junit.jupiter.api.DisplayName; | |
import org.junit.jupiter.api.Test; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.boot.test.context.SpringBootTest; | |
import org.springframework.context.annotation.Import; | |
import org.springframework.test.context.ActiveProfiles; | |
import java.time.LocalDateTime; | |
import java.util.UUID; | |
import static java.util.List.of; | |
import static java.util.concurrent.TimeUnit.SECONDS; | |
import static org.assertj.core.api.Assertions.assertThat; | |
import static org.awaitility.Awaitility.await; | |
@SpringBootTest | |
@ActiveProfiles("test") | |
@Import(SqsTestConfig.class) | |
class CustomerCreatedEventSqsListenerTest { | |
@Autowired | |
private QueueMessagingTemplate sqsTemplate; | |
@Autowired | |
private AmazonSQSAsync SQS; | |
@Value("${samples.aws.sqs.consumer-queue}") | |
private String consumerQueueName; | |
/** | |
* We inject the listener here because we must start and stop the listener on each test | |
*/ | |
@Autowired | |
private SimpleMessageListenerContainer consumerInBackground; | |
@Autowired | |
private CustomerRepository repository; | |
@BeforeEach | |
public void setUp() { | |
repository.deleteAll(); | |
SQS.purgeQueue(new PurgeQueueRequest(consumerQueueName)); | |
} | |
@AfterEach | |
public void cleanUp() { | |
// stop the listener after each test | |
consumerInBackground.stop(consumerQueueName); | |
} | |
@Test | |
@DisplayName("should consume an event from SQS queue") | |
public void t1() { | |
// scenario | |
CustomerCreatedEvent event = new CustomerCreatedEvent( | |
UUID.randomUUID(), | |
"Rafael Ponte", | |
"+5585988776655", | |
LocalDateTime.now() | |
); | |
sqsTemplate | |
.convertAndSend(consumerQueueName, event); | |
// action | |
// ...is async, so it will be performed by our SQS listener | |
consumerInBackground.start(consumerQueueName); // starts the listener | |
// validation | |
await().atMost(3, SECONDS).untilAsserted(() -> { | |
assertThat(numberOfMessagesInQueue()).isEqualTo(0); | |
assertThat(numberOfMessagesNotVisibleInQueue()).isEqualTo(0); | |
assertThat(repository.findAll()) | |
.hasSize(1) | |
.usingRecursiveFieldByFieldElementComparator() | |
.containsExactly(event.toModel()); | |
}); | |
} | |
@Test | |
@DisplayName("should not consume an event from SQS queue when the event is invalid") | |
public void t2() { | |
// scenario | |
CustomerCreatedEvent invalidEvent = new CustomerCreatedEvent( | |
UUID.randomUUID(), null, null, null | |
); | |
sqsTemplate | |
.convertAndSend(consumerQueueName, invalidEvent); | |
// action | |
// ...is async, so it will be performed by our SQS listener | |
consumerInBackground.start(consumerQueueName); // starts the listener | |
// validation | |
await().atMost(3, SECONDS).untilAsserted(() -> { | |
assertThat(repository.count()).isEqualTo(0); | |
assertThat(numberOfMessagesInQueue()).isEqualTo(0); | |
assertThat(numberOfMessagesNotVisibleInQueue()).isEqualTo(1); // messages with errors stay not-visible for 30s | |
}); | |
} | |
private Integer numberOfMessagesInQueue() { | |
GetQueueAttributesResult attributes = SQS | |
.getQueueAttributes(consumerQueueName, of("All")); | |
return Integer.parseInt( | |
attributes.getAttributes().get("ApproximateNumberOfMessages") | |
); | |
} | |
private Integer numberOfMessagesNotVisibleInQueue() { | |
GetQueueAttributesResult attributes = SQS | |
.getQueueAttributes(consumerQueueName, of("All")); | |
return Integer.parseInt( | |
attributes.getAttributes().get("ApproximateNumberOfMessagesNotVisible") | |
); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version: "3.8" | |
services: | |
localstack: | |
container_name: "${LOCALSTACK_DOCKER_NAME-localstack_main}" | |
image: localstack/localstack | |
ports: | |
- "127.0.0.1:4566:4566" # LocalStack Gateway | |
- "127.0.0.1:4510-4559:4510-4559" # external services port range | |
- "127.0.0.1:53:53" # DNS config (only required for Pro) | |
- "127.0.0.1:53:53/udp" # DNS config (only required for Pro) | |
- "127.0.0.1:443:443" # LocalStack HTTPS Gateway (only required for Pro) | |
environment: | |
- AWS_DEFAULT_REGION=sa-east-1 | |
- AWS_ACCESS_KEY_ID=localstackAccessKeyId | |
- AWS_SECRET_ACCESS_KEY=localstackSecretAccessKey | |
- DEFAULT_REGION=sa-east-1 | |
- USE_SINGLE_REGION=1 | |
- SERVICES=${SERVICES-} | |
- EAGER_SERVICE_LOADING=0 | |
- DEBUG=${DEBUG-} | |
- PERSISTENCE=${PERSISTENCE-} | |
- LAMBDA_EXECUTOR=${LAMBDA_EXECUTOR-} | |
- LOCALSTACK_API_KEY=${LOCALSTACK_API_KEY-1QKEfmFpC5} # only required for Pro | |
- DOCKER_HOST=unix:///var/run/docker.sock | |
volumes: | |
- "${LOCALSTACK_VOLUME_DIR:-./localstack}:/var/lib/localstack" | |
- "/var/run/docker.sock:/var/run/docker.sock" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
this is a better version of the above code using TestContainers to start the Localstack up: https://gist.github.com/rponte/8a46133aeca05f07ae49035879a18143