diff --git a/src/test/java/com/rabbitmq/integration/tests/SelectedTopicMessageIT.java b/src/test/java/com/rabbitmq/integration/tests/SelectedTopicMessageIT.java index dc344d24..fcae86ac 100644 --- a/src/test/java/com/rabbitmq/integration/tests/SelectedTopicMessageIT.java +++ b/src/test/java/com/rabbitmq/integration/tests/SelectedTopicMessageIT.java @@ -2,13 +2,18 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. // -// Copyright (c) 2013-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// Copyright (c) 2013-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. package com.rabbitmq.integration.tests; +import com.rabbitmq.jms.util.Shell; import org.junit.jupiter.api.Test; import jakarta.jms.*; +import java.io.IOException; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; /** @@ -60,6 +65,50 @@ public void testSendAndReceiveTextMessages() throws Exception { } } + @Test + public void durableTopicSubscriberWithSelectorCreatesExchangesBetweenRestarts() throws Exception { + String topicName = TOPIC_NAME + UUID.randomUUID().toString().substring(0, 10); + int exchangeInitialCount = exchangeCount(); + String subscriberName = UUID.randomUUID().toString(); + topicConn.start(); + TopicSession topicSession = topicConn.createTopicSession(false, Session.DUPS_OK_ACKNOWLEDGE); + Topic topic = topicSession.createTopic(topicName); + TopicSubscriber receiver = topicSession.createDurableSubscriber( + topic, subscriberName, "boolProp", false); + TopicPublisher sender = topicSession.createPublisher(topic); + sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + TextMessage message = topicSession.createTextMessage("hello true"); + message.setBooleanProperty("boolProp", true); + sender.send(message); + + assertThat(receiver.receive(1000)).isNotNull(); + assertThat(exchangeCount()).isEqualTo(exchangeInitialCount + 1); + + sender.close(); + receiver.close(); + + reconnect(); + + topicConn.start(); + topicSession = topicConn.createTopicSession(false, Session.DUPS_OK_ACKNOWLEDGE); + topic = topicSession.createTopic(topicName); + receiver = topicSession.createDurableSubscriber( + topic, subscriberName, "boolProp", false); + sender = topicSession.createPublisher(topic); + sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + message = topicSession.createTextMessage("hello true"); + message.setBooleanProperty("boolProp", true); + sender.send(message); + + assertThat(receiver.receive(1000)).isNotNull(); + + assertThat(exchangeCount()).isEqualTo(exchangeInitialCount + 2); + } + + private static int exchangeCount() throws IOException { + return Shell.listExchanges().size(); + } + @FunctionalInterface private interface MessageConfigurator { diff --git a/src/test/java/com/rabbitmq/jms/util/Shell.java b/src/test/java/com/rabbitmq/jms/util/Shell.java index 70502035..1d87ae72 100644 --- a/src/test/java/com/rabbitmq/jms/util/Shell.java +++ b/src/test/java/com/rabbitmq/jms/util/Shell.java @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. // -// Copyright (c) 2014-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// Copyright (c) 2014-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. package com.rabbitmq.jms.util; import java.io.BufferedReader; @@ -54,6 +54,20 @@ public static void restartNode() throws IOException { executeCommand(rabbitmqctlCommand() + rabbitmqctlNodenameArgument() + " start_app"); } + public static List listExchanges() throws IOException { + List exchanges = new ArrayList<>(); + ProcessState process = executeCommand(rabbitmqctlCommand() + rabbitmqctlNodenameArgument() + " list_exchanges name type --quiet"); + String output = process.output(); + String[] lines = output.split("\n"); + if (lines.length > 0) { + for (int i = 1; i < lines.length; i++) { + String [] fields = lines[i].split("\t"); + exchanges.add(new Exchange(fields[0], fields[1])); + } + } + return exchanges; + } + public static List listBindings(boolean includeDefaults) throws IOException { List bindings = new ArrayList<>(); ProcessState process = executeCommand(rabbitmqctlCommand() + rabbitmqctlNodenameArgument() + " list_bindings source_name destination_name routing_key --quiet"); @@ -221,4 +235,30 @@ public String toString() { '}'; } } + + public static class Exchange { + + private final String name, type; + + public Exchange(String name, String type) { + this.name = name; + this.type = type; + } + + public String name() { + return this.name; + } + + public String type() { + return this.type; + } + + @Override + public String toString() { + return "Exchange{" + + "name='" + name + '\'' + + ", type='" + type + '\'' + + '}'; + } + } }