|
/* |
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
|
* contributor license agreements. See the NOTICE file distributed with |
|
* this work for additional information regarding copyright ownership. |
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
|
* (the "License"); you may not use this file except in compliance with |
|
* the License. You may obtain a copy of the License at |
|
* |
|
* http://www.apache.org/licenses/LICENSE-2.0 |
|
* |
|
* Unless required by applicable law or agreed to in writing, software |
|
* distributed under the License is distributed on an "AS IS" BASIS, |
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
* See the License for the specific language governing permissions and |
|
* limitations under the License. |
|
*/ |
|
package org.apache.activemq.artemis.tests.integration.amqp; |
|
|
|
import org.apache.activemq.artemis.api.core.RoutingType; |
|
import org.apache.activemq.artemis.api.core.SimpleString; |
|
import org.apache.activemq.artemis.core.server.ActiveMQServer; |
|
import org.apache.activemq.artemis.core.server.impl.AddressInfo; |
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; |
|
import org.junit.Test; |
|
|
|
import javax.jms.*; |
|
|
|
public class JMSTransactedRedeliveryBugTest extends JMSClientTestSupport { |
|
|
|
private static final String INITIAL_QUEUE_NAME = "InitialQueue", FINAL_QUEUE_NAME = "FinalQueue"; |
|
|
|
private static final SimpleString INITIAL_QUEUE_SS = new SimpleString(INITIAL_QUEUE_NAME), FINAL_QUEUE_SS = new SimpleString(FINAL_QUEUE_NAME); |
|
|
|
@Override |
|
protected String getConfiguredProtocols() { |
|
return "AMQP,OPENWIRE,CORE"; |
|
} |
|
|
|
@Override |
|
protected void addConfiguration(ActiveMQServer server) { |
|
server.getAddressSettingsRepository().addMatch(INITIAL_QUEUE_NAME, new AddressSettings().setExpiryAddress(FINAL_QUEUE_SS)); |
|
} |
|
|
|
@Override |
|
protected void createAddressAndQueues(ActiveMQServer server) throws Exception { |
|
super.createAddressAndQueues(server); |
|
server.addAddressInfo(new AddressInfo(INITIAL_QUEUE_SS, RoutingType.ANYCAST)); |
|
server.createQueue(INITIAL_QUEUE_SS, RoutingType.ANYCAST, INITIAL_QUEUE_SS, null, true, false, -1, false, true); |
|
server.addAddressInfo(new AddressInfo(FINAL_QUEUE_SS, RoutingType.ANYCAST)); |
|
server.createQueue(FINAL_QUEUE_SS, RoutingType.ANYCAST, FINAL_QUEUE_SS, null, true, false, -1, false, true); |
|
|
|
} |
|
|
|
|
|
@Test |
|
public void testAMQPProducerAMQPConsumer() throws Exception { |
|
Connection producerConnection = createConnection(); |
|
Connection consumerConnection = createConnection(); |
|
test(producerConnection, consumerConnection); |
|
} |
|
|
|
@Test |
|
public void testCoreProducerCoreConsumer() throws Exception { |
|
Connection producerConnection = createCoreConnection(); |
|
Connection consumerConnection = createCoreConnection(); |
|
test(producerConnection, consumerConnection); |
|
} |
|
|
|
@Test |
|
public void testCoreProducerAMQPConsumer() throws Exception { |
|
Connection producerConnection = createCoreConnection(); |
|
Connection consumerConnection = createConnection(); |
|
test(producerConnection, consumerConnection); |
|
} |
|
|
|
@Test |
|
public void testAMQPProducerCoreConsumer() throws Exception { |
|
Connection producerConnection = createConnection(); |
|
Connection consumerConnection = createCoreConnection(); |
|
test(producerConnection, consumerConnection); |
|
} |
|
|
|
@Test |
|
public void testOpenWireProducerOpenWireConsumer() throws Exception { |
|
Connection producerConnection = createOpenWireConnection(); |
|
Connection consumerConnection = createOpenWireConnection(); |
|
test(producerConnection, consumerConnection); |
|
} |
|
|
|
@Test |
|
public void testCoreProducerOpenWireConsumer() throws Exception { |
|
Connection producerConnection = createCoreConnection(); |
|
Connection consumerConnection = createOpenWireConnection(); |
|
test(producerConnection, consumerConnection); |
|
} |
|
|
|
@Test |
|
public void testOpenWireProducerCoreConsumer() throws Exception { |
|
Connection producerConnection = createOpenWireConnection(); |
|
Connection consumerConnection = createCoreConnection(); |
|
test(producerConnection, consumerConnection); |
|
} |
|
|
|
@Test |
|
public void testAMQPProducerOpenWireConsumer() throws Exception { |
|
Connection producerConnection = createConnection(); |
|
Connection consumerConnection = createOpenWireConnection(); |
|
test(producerConnection, consumerConnection); |
|
} |
|
|
|
@Test |
|
public void testOpenWireProducerAMQPConsumer() throws Exception { |
|
Connection producerConnection = createOpenWireConnection(); |
|
Connection consumerConnection = createConnection(); |
|
test(producerConnection, consumerConnection); |
|
} |
|
|
|
public void test(Connection producerConnection, Connection consumerConnection) throws Exception { |
|
|
|
try { |
|
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
|
Queue queue1 = producerSession.createQueue(INITIAL_QUEUE_NAME); |
|
MessageProducer p = producerSession.createProducer(queue1); |
|
|
|
TextMessage message1 = producerSession.createTextMessage(); |
|
message1.setStringProperty("something", "KEY"); |
|
message1.setText("how are you"); |
|
p.send(message1, DeliveryMode.PERSISTENT, 4, 10); |
|
|
|
//Simulate a small pause, else both messages could be consumed if consumer is fast enough |
|
Thread.sleep(5_000); |
|
|
|
Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED); |
|
Queue consumerQueue = consumerSession.createQueue(FINAL_QUEUE_NAME); |
|
MessageConsumer consumer = consumerSession.createConsumer(consumerQueue); |
|
|
|
Message msg = consumer.receive(1_000); |
|
assertNotNull(msg); |
|
assertEquals("1", msg.getStringProperty("JMSXDeliveryCount")); |
|
assertEquals("KEY", msg.getStringProperty("something")); |
|
assertEquals("how are you", ((TextMessage) msg).getText()); |
|
|
|
consumerSession.rollback(); |
|
|
|
msg = consumer.receive(1_000); |
|
assertNotNull(msg); |
|
assertEquals("2", msg.getStringProperty("JMSXDeliveryCount")); |
|
assertEquals("KEY", msg.getStringProperty("something")); |
|
assertEquals("how are you", ((TextMessage) msg).getText()); |
|
|
|
consumerSession.rollback(); |
|
|
|
msg = consumer.receive(1_000); |
|
assertNotNull(msg); |
|
assertEquals("3", msg.getStringProperty("JMSXDeliveryCount")); |
|
assertEquals("KEY", msg.getStringProperty("something")); |
|
assertEquals("how are you", ((TextMessage) msg).getText()); |
|
|
|
consumerSession.rollback(); |
|
|
|
msg = consumer.receive(1_000); |
|
assertNotNull(msg); |
|
assertEquals("4", msg.getStringProperty("JMSXDeliveryCount")); |
|
assertEquals("KEY", msg.getStringProperty("something")); |
|
assertEquals("how are you", ((TextMessage) msg).getText()); |
|
|
|
consumerSession.rollback(); |
|
|
|
msg = consumer.receive(1_000); |
|
assertNotNull(msg); |
|
assertEquals("5", msg.getStringProperty("JMSXDeliveryCount")); |
|
assertEquals("KEY", msg.getStringProperty("something")); |
|
assertEquals("how are you", ((TextMessage) msg).getText()); |
|
|
|
consumerSession.commit(); |
|
|
|
consumer.close(); |
|
} finally { |
|
producerConnection.close(); |
|
consumerConnection.close(); |
|
} |
|
} |
|
} |