Last active
April 12, 2018 15:28
-
-
Save kjkrol/619c4043c2d9d14423c83b8df88abc64 to your computer and use it in GitHub Desktop.
EmbeddedAMQPBroker
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package kjkrol.amqpbroker | |
import org.apache.qpid.server.SystemLauncher | |
import org.springframework.amqp.core.BindingBuilder | |
import org.springframework.amqp.core.TopicExchange | |
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory | |
import org.springframework.amqp.rabbit.core.RabbitAdmin | |
import org.springframework.amqp.rabbit.core.RabbitTemplate | |
class EmbeddedAMQPBroker { | |
private static final String INITIAL_CONFIGURATION = "test-initial-config.json" | |
private SystemLauncher systemLauncher | |
EmbeddedAMQPBroker() { | |
systemLauncher = new SystemLauncher() | |
} | |
void start() throws Exception { | |
systemLauncher.startup(createSystemConfig()) | |
} | |
void stop() { | |
systemLauncher.shutdown() | |
} | |
TopicExchange createExchange(String exchangeName) { | |
CachingConnectionFactory cf = new CachingConnectionFactory() | |
try { | |
RabbitAdmin admin = new RabbitAdmin(cf) | |
TopicExchange exchange = new TopicExchange(exchangeName) | |
admin.declareExchange(exchange) | |
return exchange | |
} finally { | |
cf.destroy() | |
} | |
} | |
org.springframework.amqp.core.Queue createQueue(String queueName) { | |
CachingConnectionFactory cf = new CachingConnectionFactory() | |
try { | |
RabbitAdmin admin = new RabbitAdmin(cf) | |
org.springframework.amqp.core.Queue queue = new org.springframework.amqp.core.Queue(queueName, false) | |
admin.declareQueue(queue) | |
return queue | |
} finally { | |
cf.destroy() | |
} | |
} | |
void createBinding(TopicExchange exchange, org.springframework.amqp.core.Queue queue, String routingQueue) { | |
CachingConnectionFactory cf = new CachingConnectionFactory() | |
try { | |
RabbitAdmin admin = new RabbitAdmin(cf) | |
admin.declareBinding(BindingBuilder | |
.bind(queue) | |
.to(exchange) | |
.with(routingQueue)) | |
} finally { | |
cf.destroy() | |
} | |
} | |
void deleteExchange(String exchangeName) { | |
CachingConnectionFactory cf = new CachingConnectionFactory() | |
RabbitAdmin admin = new RabbitAdmin(cf) | |
admin.deleteExchange(exchangeName) | |
cf.destroy() | |
} | |
void deleteQueue(String queueName) { | |
CachingConnectionFactory cf = new CachingConnectionFactory() | |
RabbitAdmin admin = new RabbitAdmin(cf) | |
admin.deleteQueue(queueName) | |
cf.destroy() | |
} | |
void sendMessage(String exchangeName, String message) throws Exception { | |
CachingConnectionFactory connectionFactory = new CachingConnectionFactory() | |
RabbitTemplate template = new RabbitTemplate(connectionFactory) | |
template.convertAndSend(exchangeName, "#", message) | |
connectionFactory.destroy() | |
// waitForMessageBeConsumed(); | |
} | |
private static Map<String, Object> createSystemConfig() { | |
Map<String, Object> attributes = [] as Map | |
URL initialConfig = EmbeddedAMQPBroker.class.getClassLoader().getResource(INITIAL_CONFIGURATION) | |
attributes.put("type", "Memory") | |
attributes.put("initialConfigurationLocation", initialConfig.toExternalForm()) | |
attributes.put("startupLoggedToSystemOut", true) | |
return attributes | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment