Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add point-to-point messaging capabilities with IBM MQ queue support #4

Merged
merged 4 commits into from
Oct 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ballerina/constants.bal
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
public const OPEN_AS_SUBSCRIPTION = 1;
public const OPEN_AS_PUBLICATION = 2;

// Options that control the opening of the queue for a consumer.
public const MQOO_BROWSE = 8;
public const MQOO_INPUT_AS_Q_DEF = 1;
public const MQOO_INPUT_EXCLUSIVE = 4;
public const MQOO_INPUT_SHARED = 2;

// Options that control the opening of the topic for either publication or subscription.
public const MQOO_ALTERNATE_USER_AUTHORITY = 4096;
public const MQOO_BIND_AS_Q_DEF = 0;
Expand Down
16 changes: 13 additions & 3 deletions ballerina/destination.bal
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,21 @@ public type Destination distinct client object {
remote function get(*GetMessageOptions options) returns Message|Error?;
};

public type Queue distinct client object {
public isolated client class Queue {
*Destination;
};

public client class Topic {
remote function put(Message message) returns Error? =
@java:Method {
'class: "io.ballerina.lib.ibm.ibmmq.Queue"
} external;

remote function get(*GetMessageOptions options) returns Message|Error =
@java:Method {
'class: "io.ballerina.lib.ibm.ibmmq.Queue"
} external;
}

public isolated client class Topic {
*Destination;

remote function put(Message message) returns Error? =
Expand Down
10 changes: 6 additions & 4 deletions ballerina/queue_manager.bal
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ public isolated class QueueManager {
'class: "io.ballerina.lib.ibm.ibmmq.QueueManager"
} external;

public isolated function accessQueue(string queueName, ConnectionOpenOptions options) returns Queue|Error {
return error Error("Not implemented");
}
public isolated function accessQueue(string queueName, AccessQueueOptions options) returns Queue|Error =
@java:Method {
'class: "io.ballerina.lib.ibm.ibmmq.QueueManager"
} external;

public isolated function accessTopic(string topicName, string topicString, OPEN_TOPIC_OPTION openTopicOption, AccessTopicOptions options) returns Topic|Error =
public isolated function accessTopic(string topicName, string topicString, OPEN_TOPIC_OPTION openTopicOption,
AccessTopicOptions options) returns Topic|Error =
@java:Method {
'class: "io.ballerina.lib.ibm.ibmmq.QueueManager"
} external;
Expand Down
8 changes: 1 addition & 7 deletions ballerina/types.bal
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,7 @@ public type QueueManagerConfiguration record {|
string password?;
|};

public enum ConnectionOpenOptions {
// MQOO_OUTPUT = "MQOO_OUTPUT",
MQOO_INPUT_AS_Q_DEF = "MQOO_INPUT_AS_Q_DEF",
MQOO_INPUT_EXCLUSIVE = "MQOO_INPUT_EXCLUSIVE",
MQOO_INPUT_SHARED = "MQOO_INPUT_SHARED"
}

public type AccessQueueOptions MQOO_OUTPUT|MQOO_BROWSE|MQOO_INPUT_AS_Q_DEF|MQOO_INPUT_EXCLUSIVE|MQOO_INPUT_SHARED;
public type AccessTopicOptions MQOO_ALTERNATE_USER_AUTHORITY|MQOO_BIND_AS_Q_DEF|MQOO_FAIL_IF_QUIESCING|MQOO_OUTPUT|MQOO_PASS_ALL_CONTEXT|MQOO_PASS_IDENTITY_CONTEXT|MQOO_SET_ALL_CONTEXT|MQOO_SET_IDENTITY_CONTEXT;

public type GetMessageOptions record {|
Expand Down
12 changes: 12 additions & 0 deletions native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package io.ballerina.lib.ibm.ibmmq;

import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPropertyDescriptor;
import io.ballerina.runtime.api.PredefinedTypes;
Expand Down Expand Up @@ -59,6 +60,8 @@ public class CommonUtils {
private static final BString PD_CONTEXT = StringUtils.fromString("context");
private static final BString PROPERTY_VALUE = StringUtils.fromString("value");
private static final BString PROPERTY_DESCRIPTOR = StringUtils.fromString("descriptor");
private static final BString WAIT_INTERVAL = StringUtils.fromString("waitInterval");
private static final BString OPTIONS = StringUtils.fromString("options");

private static final MQPropertyDescriptor defaultPropertyDescriptor = new MQPropertyDescriptor();

Expand Down Expand Up @@ -179,6 +182,15 @@ private static BMap populateDescriptorFromMQPropertyDescriptor(MQPropertyDescrip
return descriptor;
}

public static MQGetMessageOptions getGetMessageOptions(BMap<BString, Object> bOptions) {
int waitInterval = bOptions.getIntValue(WAIT_INTERVAL).intValue();
int options = bOptions.getIntValue(OPTIONS).intValue();
MQGetMessageOptions getMessageOptions = new MQGetMessageOptions();
getMessageOptions.waitInterval = waitInterval;
getMessageOptions.options = options;
return getMessageOptions;
}

public static BError createError(String errorType, String message, Throwable throwable) {
BError cause = ErrorCreator.createError(throwable);
BMap<BString, Object> errorDetails = ValueCreator.createRecordValue(getModule(), ERROR_DETAILS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public interface Constants {
public static final String IBMMQ_ERROR = "Error";

// Native properties in respective ballerina objects
public static final String NATIVE_QUEUE_MANAGER = "queueManager";

public static final String NATIVE_TOPIC = "topic";
String NATIVE_QUEUE_MANAGER = "queueManager";
String NATIVE_TOPIC = "topic";
String NATIVE_QUEUE = "queue";
}
78 changes: 78 additions & 0 deletions native/src/main/java/io/ballerina/lib/ibm.ibmmq/Queue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.ballerina.lib.ibm.ibmmq;

import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQQueue;
import io.ballerina.runtime.api.Environment;
import io.ballerina.runtime.api.Future;
import io.ballerina.runtime.api.values.BError;
import io.ballerina.runtime.api.values.BMap;
import io.ballerina.runtime.api.values.BObject;
import io.ballerina.runtime.api.values.BString;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static io.ballerina.lib.ibm.ibmmq.CommonUtils.createError;
import static io.ballerina.lib.ibm.ibmmq.Constants.IBMMQ_ERROR;

/**
* Representation of {@link com.ibm.mq.MQQueue} with utility methods to invoke as inter-op functions.
*/
public class Queue {
private static final ExecutorService QUEUE_EXECUTOR_SERVICE = Executors.newCachedThreadPool(
new MQThreadFactory("balx-ibmmq-queue-client-network-thread"));

public static Object put(Environment environment, BObject queueObject, BMap<BString, Object> message) {
MQQueue queue = (MQQueue) queueObject.getNativeData(Constants.NATIVE_QUEUE);
MQMessage mqMessage = CommonUtils.getMqMessageFromBMessage(message);
Future future = environment.markAsync();
QUEUE_EXECUTOR_SERVICE.execute(() -> {
try {
queue.put(mqMessage);
future.complete(null);
} catch (Exception e) {
BError bError = createError(IBMMQ_ERROR,
String.format("Error occurred while putting a message to the queue: %s", e.getMessage()), e);
future.complete(bError);
}
});
return null;
}

public static Object get(Environment environment, BObject queueObject, BMap<BString, Object> options) {
MQQueue queue = (MQQueue) queueObject.getNativeData(Constants.NATIVE_QUEUE);
MQGetMessageOptions getMessageOptions = CommonUtils.getGetMessageOptions(options);
Future future = environment.markAsync();
QUEUE_EXECUTOR_SERVICE.execute(() -> {
try {
MQMessage message = new MQMessage();
queue.get(message, getMessageOptions);
future.complete(CommonUtils.getBMessageFromMQMessage(message));
} catch (Exception e) {
BError bError = createError(IBMMQ_ERROR,
String.format("Error occurred while getting a message from the queue: %s", e.getMessage()), e);
future.complete(bError);
}
});
return null;
}
}
47 changes: 31 additions & 16 deletions native/src/main/java/io/ballerina/lib/ibm.ibmmq/QueueManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package io.ballerina.lib.ibm.ibmmq;

import com.ibm.mq.MQException;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.MQTopic;
import com.ibm.mq.constants.MQConstants;
Expand Down Expand Up @@ -46,9 +47,10 @@ public class QueueManager {
private static final BString USER_ID = StringUtils.fromString("userID");
private static final BString PASSWORD = StringUtils.fromString("password");
private static final String BTOPIC = "Topic";
private static final String BQUEUE = "Queue";

/**
* Creates a JMS connection with the provided configurations.
* Creates a IBM MQ queue manager with the provided configurations.
*
* @param queueManager Ballerina queue-manager object
* @param configurations IBM MQ connection configurations
Expand All @@ -67,21 +69,6 @@ public static Object init(BObject queueManager, BMap<BString, Object> configurat
return null;
}

public static Object accessTopic(BObject queueManagerObject, BString topicName,
BString topicString, Long openTopicOption, Long options) {
MQQueueManager queueManager = (MQQueueManager) queueManagerObject.getNativeData(NATIVE_QUEUE_MANAGER);
try {
MQTopic mqTopic = queueManager.accessTopic(topicName.getValue(), topicString.getValue(),
openTopicOption.intValue(), options.intValue());
BObject bTopic = ValueCreator.createObjectValue(ModuleUtils.getModule(), BTOPIC);
bTopic.addNativeData(Constants.NATIVE_TOPIC, mqTopic);
return bTopic;
} catch (MQException e) {
return createError(IBMMQ_ERROR,
String.format("Error occurred while accessing topic: %s", e.getMessage()), e);
}
}

private static Hashtable<String, Object> getConnectionProperties(BMap<BString, Object> configurations) {
Hashtable<String, Object> properties = new Hashtable<>();
String host = configurations.getStringValue(HOST).getValue();
Expand All @@ -96,4 +83,32 @@ private static Hashtable<String, Object> getConnectionProperties(BMap<BString, O
.ifPresent(password -> properties.put(MQConstants.PASSWORD_PROPERTY, password));
return properties;
}

public static Object accessQueue(BObject queueManagerObject, BString queueName, Long options) {
MQQueueManager queueManager = (MQQueueManager) queueManagerObject.getNativeData(NATIVE_QUEUE_MANAGER);
try {
MQQueue mqQueue = queueManager.accessQueue(queueName.getValue(), options.intValue());
BObject bQueue = ValueCreator.createObjectValue(ModuleUtils.getModule(), BQUEUE);
bQueue.addNativeData(Constants.NATIVE_TOPIC, mqQueue);
return bQueue;
} catch (MQException e) {
return createError(IBMMQ_ERROR,
String.format("Error occurred while accessing queue: %s", e.getMessage()), e);
}
}

public static Object accessTopic(BObject queueManagerObject, BString topicName,
BString topicString, Long openTopicOption, Long options) {
MQQueueManager queueManager = (MQQueueManager) queueManagerObject.getNativeData(NATIVE_QUEUE_MANAGER);
try {
MQTopic mqTopic = queueManager.accessTopic(topicName.getValue(), topicString.getValue(),
openTopicOption.intValue(), options.intValue());
BObject bTopic = ValueCreator.createObjectValue(ModuleUtils.getModule(), BTOPIC);
bTopic.addNativeData(Constants.NATIVE_TOPIC, mqTopic);
return bTopic;
} catch (MQException e) {
return createError(IBMMQ_ERROR,
String.format("Error occurred while accessing topic: %s", e.getMessage()), e);
}
}
}
34 changes: 16 additions & 18 deletions native/src/main/java/io/ballerina/lib/ibm.ibmmq/Topic.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,23 @@
import com.ibm.mq.MQTopic;
import io.ballerina.runtime.api.Environment;
import io.ballerina.runtime.api.Future;
import io.ballerina.runtime.api.utils.StringUtils;
import io.ballerina.runtime.api.values.BError;
import io.ballerina.runtime.api.values.BMap;
import io.ballerina.runtime.api.values.BObject;
import io.ballerina.runtime.api.values.BString;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Topic {
private static final ExecutorService topicExecutorService =
Executors.newCachedThreadPool(new MQThreadFactory("balx-ibmmq-topic-client-network-thread"));
import static io.ballerina.lib.ibm.ibmmq.CommonUtils.createError;
import static io.ballerina.lib.ibm.ibmmq.Constants.IBMMQ_ERROR;

private static final BString WAIT_INTERVAL = StringUtils.fromString("waitInterval");
private static final BString OPTIONS = StringUtils.fromString("options");
/**
* Representation of {@link com.ibm.mq.MQTopic} with utility methods to invoke as inter-op functions.
*/
public class Topic {
private static final ExecutorService topicExecutorService = Executors.newCachedThreadPool(
new MQThreadFactory("balx-ibmmq-topic-client-network-thread"));

public static Object put(Environment environment, BObject topicObject, BMap message) {
MQTopic topic = (MQTopic) topicObject.getNativeData(Constants.NATIVE_TOPIC);
Expand All @@ -47,34 +50,29 @@ public static Object put(Environment environment, BObject topicObject, BMap mess
topic.put(mqMessage);
future.complete(null);
} catch (Exception e) {
future.complete(e);
BError bError = createError(IBMMQ_ERROR,
String.format("Error occurred while putting a message to the topic: %s", e.getMessage()), e);
future.complete(bError);
}
});
return null;
}

public static Object get(Environment environment, BObject topicObject, BMap<BString, Object> options) {
MQTopic topic = (MQTopic) topicObject.getNativeData(Constants.NATIVE_TOPIC);
MQGetMessageOptions getMessageOptions = getGetMessageOptions(options);
MQGetMessageOptions getMessageOptions = CommonUtils.getGetMessageOptions(options);
Future future = environment.markAsync();
topicExecutorService.execute(() -> {
try {
MQMessage message = new MQMessage();
topic.get(message, getMessageOptions);
future.complete(CommonUtils.getBMessageFromMQMessage(message));
} catch (Exception e) {
future.complete(e);
BError bError = createError(IBMMQ_ERROR,
String.format("Error occurred while getting a message from the topic: %s", e.getMessage()), e);
future.complete(bError);
}
});
return null;
}

private static MQGetMessageOptions getGetMessageOptions(BMap<BString, Object> bOptions) {
int waitInterval = bOptions.getIntValue(WAIT_INTERVAL).intValue();
int options = bOptions.getIntValue(OPTIONS).intValue();
MQGetMessageOptions getMessageOptions = new MQGetMessageOptions();
getMessageOptions.waitInterval = waitInterval;
getMessageOptions.options = options;
return getMessageOptions;
}
}