Skip to content

Instantly share code, notes, and snippets.

@orvyl
Created February 28, 2017 08:33
Show Gist options
  • Save orvyl/870fba7ddbaeeb6ad651930a1c3c98aa to your computer and use it in GitHub Desktop.
Save orvyl/870fba7ddbaeeb6ad651930a1c3c98aa to your computer and use it in GitHub Desktop.
AmazonSQS implementation that uses memory instead of actual AWS queue (using aws-java-sdk v1.11.86).
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.ResponseMetadata;
import com.amazonaws.regions.Region;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.*;
import com.google.common.hash.Hashing;
import java.nio.charset.Charset;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Created by oftumaneng on 16/02/2017.
* REF: https://gist.github.com/UnquietCode/5717942
*
*/
public class MockAmazonSQS implements AmazonSQS {
private static final int DEFAULT_VISIBILITY_TIMEOUT=30;
public static final String QUEUE_BASE_URL = "http://sqs.aws.com/";
private final Map<String, Queue<MessageInfo>> queues = new HashMap<>();
private final Map<String, ScheduledMessage> receivedMessages = new HashMap<>();
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
@Override
public SendMessageResult sendMessage(SendMessageRequest sendMessageRequest) {
final Queue<MessageInfo> queue = getOrCreateQueue(sendMessageRequest.getQueueUrl());
MessageInfo info = new MessageInfo();
info.id = UUID.randomUUID().toString();
info.body = sendMessageRequest.getMessageBody();
if (Objects.nonNull(sendMessageRequest.getDelaySeconds())) {
executor.schedule((Runnable) () -> queue.add(info), sendMessageRequest.getDelaySeconds(), TimeUnit.SECONDS);
} else {
queue.add(info);
}
return new SendMessageResult().withMessageId(info.id).withMD5OfMessageBody(info.hash());
}
@Override
public SendMessageResult sendMessage(String queueUrl, String messageBody) {
return sendMessage(new SendMessageRequest(queueUrl, messageBody));
}
@Override
public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageRequest) {
final Queue<MessageInfo> queue = getOrCreateQueue(receiveMessageRequest.getQueueUrl());
List<Message> messages = new ArrayList<>();
Integer maxMessage = Objects.nonNull(receiveMessageRequest.getMaxNumberOfMessages())
? receiveMessageRequest.getMaxNumberOfMessages() : 1;
Integer visibilityTimeout = Objects.nonNull(receiveMessageRequest.getVisibilityTimeout())
? receiveMessageRequest.getVisibilityTimeout() : DEFAULT_VISIBILITY_TIMEOUT;
final Integer waitTimeSeconds = Objects.nonNull(receiveMessageRequest.getWaitTimeSeconds())
? receiveMessageRequest.getWaitTimeSeconds() : 0;
int waitSecondsCtr = 0;
do {
final MessageInfo info = queue.poll();
waitSecondsCtr++;
if (Objects.isNull(info)) continue;
final String receiptHandle = UUID.randomUUID().toString();
Message message = new Message();
message.setBody(info.body);
message.setMessageId(info.id);
message.setMD5OfBody(info.hash());
message.setReceiptHandle(receiptHandle);
messages.add(message);
ScheduledMessage scheduled = new ScheduledMessage();
final Runnable command = () -> {
queue.add(info);
receivedMessages.remove(receiptHandle);
};
scheduled.future = executor.schedule(command, visibilityTimeout, TimeUnit.SECONDS);
scheduled.runnable = command;
receivedMessages.put(message.getReceiptHandle(), scheduled);
if (messages.size() == maxMessage) break;
} while (waitTimeSeconds > waitSecondsCtr);
return new ReceiveMessageResult().withMessages(messages);
}
@Override
public ReceiveMessageResult receiveMessage(String queueUrl) {
return receiveMessage(new ReceiveMessageRequest(queueUrl));
}
@Override
public DeleteMessageResult deleteMessage(DeleteMessageRequest deleteMessageRequest) {
ScheduledMessage scheduled = receivedMessages.remove(deleteMessageRequest.getReceiptHandle());
if (scheduled == null) {
throw new RuntimeException("message does not exist");
}
scheduled.future.cancel(true);
return new DeleteMessageResult();
}
@Override
public DeleteMessageResult deleteMessage(String queueUrl, String receiptHandle) {
return deleteMessage(new DeleteMessageRequest(queueUrl, receiptHandle));
}
@Override
public GetQueueUrlResult getQueueUrl(GetQueueUrlRequest request) {
final GetQueueUrlResult result = new GetQueueUrlResult();
final String url = QUEUE_BASE_URL + request.getQueueName();
getOrCreateQueue(url);
result.setQueueUrl(url);
return result;
}
@Override
public GetQueueUrlResult getQueueUrl(String queueName) {
return getQueueUrl(new GetQueueUrlRequest(queueName));
}
@Override
public void shutdown() {
executor.shutdown();
receivedMessages.clear();
queues.clear();
}
private static class MessageInfo {
String id;
String body;
String hash() {
return Hashing.md5().hashString(body, Charset.defaultCharset()).toString();
}
}
private synchronized Queue<MessageInfo> getOrCreateQueue(String url) {
Queue<MessageInfo> queue = queues.get(checkNotNull(url));
if (Objects.isNull(queue)) {
queue = new ArrayDeque<>();
queues.put(url, queue);
}
return queue;
}
private static class ScheduledMessage {
ScheduledFuture future;
Runnable runnable;
}
public Map<String, Queue<MessageInfo>> getQueuesInHeap() {
return queues;
}
public Map<String, ScheduledMessage> getReceivedMessagesInHeap() {
return receivedMessages;
}
/*--Not needed in my testing--*/
@Override
public void setEndpoint(String endpoint) {
}
@Override
public void setRegion(Region region) {
}
@Override
public AddPermissionResult addPermission(AddPermissionRequest addPermissionRequest) {
return null;
}
@Override
public AddPermissionResult addPermission(String queueUrl, String label, List<String> aWSAccountIds, List<String> actions) {
return null;
}
@Override
public ChangeMessageVisibilityResult changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) {
return null;
}
@Override
public ChangeMessageVisibilityResult changeMessageVisibility(String queueUrl, String receiptHandle, Integer visibilityTimeout) {
return null;
}
@Override
public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) {
return null;
}
@Override
public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(String queueUrl, List<ChangeMessageVisibilityBatchRequestEntry> entries) {
return null;
}
@Override
public CreateQueueResult createQueue(CreateQueueRequest createQueueRequest) {
return null;
}
@Override
public CreateQueueResult createQueue(String queueName) {
return null;
}
@Override
public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) {
return null;
}
@Override
public DeleteMessageBatchResult deleteMessageBatch(String queueUrl, List<DeleteMessageBatchRequestEntry> entries) {
return null;
}
@Override
public DeleteQueueResult deleteQueue(DeleteQueueRequest deleteQueueRequest) {
return null;
}
@Override
public DeleteQueueResult deleteQueue(String queueUrl) {
return null;
}
@Override
public GetQueueAttributesResult getQueueAttributes(GetQueueAttributesRequest getQueueAttributesRequest) {
return null;
}
@Override
public GetQueueAttributesResult getQueueAttributes(String queueUrl, List<String> attributeNames) {
return null;
}
@Override
public ListDeadLetterSourceQueuesResult listDeadLetterSourceQueues(ListDeadLetterSourceQueuesRequest listDeadLetterSourceQueuesRequest) {
return null;
}
@Override
public ListQueuesResult listQueues(ListQueuesRequest listQueuesRequest) {
return null;
}
@Override
public ListQueuesResult listQueues() {
return null;
}
@Override
public ListQueuesResult listQueues(String queueNamePrefix) {
return null;
}
@Override
public PurgeQueueResult purgeQueue(PurgeQueueRequest purgeQueueRequest) {
return null;
}
@Override
public RemovePermissionResult removePermission(RemovePermissionRequest removePermissionRequest) {
return null;
}
@Override
public RemovePermissionResult removePermission(String queueUrl, String label) {
return null;
}
@Override
public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest sendMessageBatchRequest) {
return null;
}
@Override
public SendMessageBatchResult sendMessageBatch(String queueUrl, List<SendMessageBatchRequestEntry> entries) {
return null;
}
@Override
public SetQueueAttributesResult setQueueAttributes(SetQueueAttributesRequest setQueueAttributesRequest) {
return null;
}
@Override
public SetQueueAttributesResult setQueueAttributes(String queueUrl, Map<String, String> attributes) {
return null;
}
@Override
public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) {
return null;
}
}
@nhquijano
Copy link

Wow! This is very useful. Thanks for sharing this ^_^

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment