Last active
January 13, 2021 01:11
-
-
Save EvgeniGordeev/5e2c0827aa73614f26fcac598f8237c6 to your computer and use it in GitHub Desktop.
ActiveMQ - copy pending messages from one server to another
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.activemq.ActiveMQConnection; | |
import org.apache.activemq.ActiveMQConnectionFactory; | |
import org.apache.activemq.ActiveMQMessageProducer; | |
import org.apache.activemq.ActiveMQSession; | |
import org.apache.activemq.command.ActiveMQMessage; | |
import org.apache.activemq.command.ActiveMQQueue; | |
import org.apache.activemq.util.IdGenerator; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.lang.reflect.InvocationTargetException; | |
import java.lang.reflect.Method; | |
import java.util.Collections; | |
import java.util.List; | |
import java.util.Set; | |
import java.util.stream.IntStream; | |
import javax.jms.Connection; | |
import javax.jms.JMSException; | |
import javax.jms.Queue; | |
import javax.jms.QueueBrowser; | |
import javax.jms.Session; | |
/** | |
* Migrate from old activemq to amazon mq. | |
* | |
* @author evgeni.gordeev | |
*/ | |
public class Migrate { | |
private static final Logger LOG = LoggerFactory.getLogger(Migrate.class); | |
public static void main(String[] args) throws JMSException { | |
ActiveMQConnection source = (ActiveMQConnection) getSourceConnFactory().createConnection(); | |
source.start(); | |
Session sourceSession = source.createSession(false, Session.AUTO_ACKNOWLEDGE); | |
ActiveMQConnectionFactory targetConnFactory = getTargetConnFactory(); | |
ActiveMQConnection target = (ActiveMQConnection) targetConnFactory.createConnection(); | |
target.start(); | |
Session targetSes = target.createSession(false, Session.AUTO_ACKNOWLEDGE); | |
Set<ActiveMQQueue> queues = source.getDestinationSource().getQueues(); | |
int qCounter = 1; | |
for (ActiveMQQueue queue : queues) { | |
if ("ActiveMQ.DLQ".equals(queue.getQueueName())) { | |
continue; | |
} | |
LOG.info("Processing queue #{}. '{}'", qCounter++, queue.getQueueName()); | |
QueueBrowser sourceBrowser = sourceSession.createBrowser(queue); | |
List<ActiveMQMessage> msgs = Collections.list(sourceBrowser.getEnumeration()); | |
targetSes.createQueue(queue.getQueueName()); | |
if (msgs.isEmpty()) { | |
LOG.info("No messages in queue '{}'", queue); | |
} else { | |
LOG.info("{} messages in queue '{}'", msgs.size(), queue); | |
int mCounter = 1; | |
for (ActiveMQMessage message : msgs) { | |
LOG.info("Copying message #{}. '{}'", mCounter++, message.getJMSMessageID()); | |
ActiveMQMessage destMsg = (ActiveMQMessage) message.copy(); | |
copy(targetConnFactory, destMsg); | |
} | |
} | |
} | |
source.stop(); | |
target.stop(); | |
System.exit(0); | |
} | |
private static ActiveMQConnectionFactory getSourceConnFactory() { | |
return getConnFactory("tcp://127.0.0.1:61616", null, null); | |
} | |
private static ActiveMQConnectionFactory getTargetConnFactory() { | |
return getConnFactory("ssl://hash.mq.us-east-1.amazonaws.com:61617", "user", "pass"); | |
} | |
private static ActiveMQConnectionFactory getConnFactory(String brokerUrl, String username, String password) { | |
return new ActiveMQConnectionFactory(username, password, brokerUrl); | |
} | |
private static void copy(ActiveMQConnectionFactory targetConnFactory, ActiveMQMessage message) { | |
try { | |
ActiveMQConnectionFactory current = new CopyConnectionFactory(targetConnFactory, message); | |
final Connection connection = current.createConnection(); | |
connection.start(); | |
// PRESERVE MessageId - START | |
ActiveMQSession targetSes = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); | |
Queue targetQueue = targetSes.createQueue(message.getDestination().getPhysicalName()); | |
Method getNextProducerId = targetSes.getClass().getDeclaredMethod("getNextProducerId"); | |
getNextProducerId.setAccessible(true); | |
IntStream.range(1, (int) message.getProducerId().getValue()).forEachOrdered(n -> { | |
try { | |
getNextProducerId.invoke(targetSes); | |
} catch (IllegalAccessException | InvocationTargetException e) { | |
e.printStackTrace(); | |
} | |
}); | |
ActiveMQMessageProducer producer = (ActiveMQMessageProducer) targetSes.createProducer(targetQueue); | |
Method getMessageSequence = producer.getClass().getDeclaredMethod("getMessageSequence"); | |
getMessageSequence.setAccessible(true); | |
IntStream.range(1, (int) message.getMessageId().getProducerSequenceId()).forEachOrdered(n -> { | |
try { | |
getMessageSequence.invoke(producer); | |
} catch (IllegalAccessException | InvocationTargetException e) { | |
e.printStackTrace(); | |
} | |
}); | |
// PRESERVE MessageId - END | |
producer.send(targetQueue, | |
message, | |
message.getJMSDeliveryMode(), | |
message.getPriority(), | |
message.getJMSExpiration() - message.getJMSTimestamp()); | |
connection.close(); | |
} catch (NoSuchMethodException | JMSException e) { | |
e.printStackTrace(); | |
} | |
} | |
/** | |
* To preserve the original MessageId. | |
*/ | |
static class CopyConnectionFactory extends ActiveMQConnectionFactory { | |
private final String clientId; | |
public CopyConnectionFactory(ActiveMQConnectionFactory targetConnFactory, ActiveMQMessage message) { | |
super(targetConnFactory.getUserName(), targetConnFactory.getPassword(), targetConnFactory.getBrokerURL()); | |
this.clientId = message.getMessageId().getProducerId().getConnectionId(); | |
} | |
protected synchronized IdGenerator getClientIdGenerator() { | |
return new IdGenerator() { | |
public synchronized String generateId() { | |
return clientId; | |
} | |
}; | |
} | |
protected synchronized IdGenerator getConnectionIdGenerator() { | |
return new IdGenerator() { | |
public synchronized String generateId() { | |
return clientId; | |
} | |
}; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment