Skip to content

Instantly share code, notes, and snippets.

@jalp
Created August 21, 2016 17:53
Show Gist options
  • Save jalp/db89f95378fe88c3ed9f1089be36ef5b to your computer and use it in GitHub Desktop.
Save jalp/db89f95378fe88c3ed9f1089be36ef5b to your computer and use it in GitHub Desktop.
Initial configuration for sns, sqs and kinesis services
package com.schibsted.notification.queueconsumer.config;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.sns.AmazonSNSClient;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClient;
import com.amazonaws.services.sqs.buffered.AmazonSQSBufferedAsyncClient;
import com.amazonaws.services.sqs.buffered.QueueBufferConfig;
import com.netflix.appinfo.AmazonInfo;
import com.schibsted.notification.queueconsumer.QueueType;
import com.schibsted.notification.queueconsumer.handlers.DeleteSubscriptionMessageHandler;
import com.schibsted.notification.queueconsumer.handlers.NotificationMessageHandler;
import com.schibsted.notification.queueconsumer.handlers.NotificationMessageRouterHandler;
import com.schibsted.notification.queueconsumer.handlers.PostSubscriptionMessageHandler;
import com.schibsted.notification.queueconsumer.handlers.SendRequestMessageHandler;
import com.schibsted.notification.queueconsumer.pushservices.PushService;
import com.schibsted.notification.queueconsumer.pushservices.SNSPushService;
import com.schibsted.notification.queueconsumer.queueservices.QueueService;
import com.schibsted.notification.queueconsumer.queueservices.SQSQueueService;
import com.schibsted.notification.queueconsumer.readers.DeleteSubscriptionReader;
import com.schibsted.notification.queueconsumer.readers.NotificationAndroidReader;
import com.schibsted.notification.queueconsumer.readers.NotificationIOSReader;
import com.schibsted.notification.queueconsumer.readers.NotificationRouterReader;
import com.schibsted.notification.queueconsumer.readers.PostSubscriptionReader;
import com.schibsted.notification.queueconsumer.readers.SQSReader;
import com.schibsted.notification.queueconsumer.readers.SendRequestReader;
import com.schibsted.notification.queueconsumer.trackingservices.KinesisTrackingService;
import com.schibsted.notification.queueconsumer.trackingservices.TrackingService;
import org.apache.commons.configuration.ConfigurationException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.aws.messaging.config.QueueMessageHandlerFactory;
import org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.cloud.aws.messaging.listener.QueueMessageHandler;
import org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.commons.util.InetUtils;
import org.springframework.cloud.commons.util.InetUtilsProperties;
import org.springframework.cloud.netflix.eureka.EurekaInstanceConfigBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Profile;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.client.RestTemplate;
@Configuration
public class QueueConsumerConfig {
@Autowired
ApplicationContext appContext;
@Autowired
QueuesProperties queuesProperties;
@Value("${server.port}")
String port;
@Value("${cloud.region.static}")
private String region;
@Value("${notification.executor.core.pool}")
private int corePoolSize;
@Bean
@LoadBalanced
public RestTemplate restTemplate() {
return new RestTemplate();
}
@Bean(name = "amazonKinesis", destroyMethod = "shutdown")
public AmazonKinesisClient amazonKinesisClient() {
AmazonKinesisClient client = new AmazonKinesisClient();
client.setRegion(Region.getRegion(Regions.fromName(region)));
return client;
}
@Bean
@Profile("cloud")
public EurekaInstanceConfigBean eurekaInstanceConfig() {
EurekaInstanceConfigBean eurekaInstanceConfigBean = new EurekaInstanceConfigBean(
new InetUtils(new InetUtilsProperties()));
AmazonInfo amazonInfo = AmazonInfo.Builder.newBuilder().autoBuild("eureka");
eurekaInstanceConfigBean.setDataCenterInfo(amazonInfo);
String baseUrl = "http://" + amazonInfo.getMetadata().get("local-hostname") + ":" + port;
eurekaInstanceConfigBean.setHealthCheckUrl(baseUrl + "/healthcheck");
eurekaInstanceConfigBean.setStatusPageUrl(baseUrl + "/Status");
return eurekaInstanceConfigBean;
}
@Bean
public QueueMessagingTemplate queueMessagingTemplate() {
return new QueueMessagingTemplate(amazonSQSClient());
}
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory() {
SimpleMessageListenerContainerFactory msgListenerContainerFactory = new SimpleMessageListenerContainerFactory();
msgListenerContainerFactory.setAmazonSqs(amazonSQSClient());
return msgListenerContainerFactory;
}
@Bean
public QueueMessageHandler queueMessageHandler() {
QueueMessageHandlerFactory queueMsgHandlerFactory = new QueueMessageHandlerFactory();
queueMsgHandlerFactory.setAmazonSqs(amazonSQSClient());
return queueMsgHandlerFactory.createQueueMessageHandler();
}
@Bean
public AsyncTaskExecutor notificationTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix("NotificationExecutor");
threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
// No use of a thread pool executor queue to avoid retaining message to long in memory
threadPoolTaskExecutor.setQueueCapacity(0);
return threadPoolTaskExecutor;
}
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer msgListenerContainer = simpleMessageListenerContainerFactory()
.createSimpleMessageListenerContainer();
msgListenerContainer.setMessageHandler(queueMessageHandler());
msgListenerContainer.setTaskExecutor(notificationTaskExecutor());
msgListenerContainer.setMaxNumberOfMessages(10);
return msgListenerContainer;
}
@Lazy
@Bean(name = "amazonSQS", destroyMethod = "shutdown")
public AmazonSQSAsync amazonSQSClient() {
AmazonSQSAsyncClient awsSQSAsyncClient;
awsSQSAsyncClient = new AmazonSQSAsyncClient();
awsSQSAsyncClient.setRegion(Region.getRegion(Regions.fromName(region)));
QueueBufferConfig config = new QueueBufferConfig()
.withMaxInflightReceiveBatches(20)
.withMaxDoneReceiveBatches(15);
return new AmazonSQSBufferedAsyncClient(awsSQSAsyncClient, config);
}
@Lazy
@Bean(name = "amazonSNS")
public AmazonSNSClient amazonSNSClient() {
AmazonSNSClient client = new AmazonSNSClient();
client.setRegion(Region.getRegion(Regions.fromName(region)));
return client;
}
@Bean
public SQSReader sqsReader(@Value("${notification.queueListener.active}") String queueType)
throws ConfigurationException {
QueueType qt = QueueType.fromString(queueType);
switch (qt) {
case SUBSCRIBE_REQUEST:
return new PostSubscriptionReader(appContext.getBean(PostSubscriptionMessageHandler.class));
case UNSUBSCRIBE_REQUEST:
return new DeleteSubscriptionReader(appContext.getBean(DeleteSubscriptionMessageHandler.class));
case SEND_REQUEST:
return new SendRequestReader(appContext.getBean(SendRequestMessageHandler.class));
case SEND_NOTIFICATION_ANDROID:
return new NotificationAndroidReader(
appContext.getBean(NotificationMessageHandler.class));
case SEND_NOTIFICATION_IOS:
return new NotificationIOSReader(
appContext.getBean(NotificationMessageHandler.class));
case SEND_NOTIFICATION_ROUTER:
return new NotificationRouterReader(
appContext.getBean(NotificationMessageRouterHandler.class));
default:
throw new ConfigurationException("Message type is not found");
}
}
@Bean
public QueueService androidQueueService() {
return new SQSQueueService(
queueMessagingTemplate(),
queuesProperties.getNotificationAndroidQueue());
}
@Bean
public QueueService iosQueueService() {
return new SQSQueueService(
queueMessagingTemplate(),
queuesProperties.getNotificationIOSQueue());
}
@Bean
public QueueService routerQueueService() {
return new SQSQueueService(
queueMessagingTemplate(),
queuesProperties.getNotificationRouterQueue());
}
@Bean
public PushService snsPushService(SNSPushService pushService) {
return pushService;
}
@Bean
public TrackingService trackingService(KinesisTrackingService trackingService) {
return trackingService;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment