diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java index c0e6f92..12749c0 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java @@ -573,12 +573,18 @@ public synchronized void initialize(OperatorContext context) tracer.log(TraceLevel.TRACE, "Begin initialize()"); //$NON-NLS-1$ + tracer.log(TraceLevel.TRACE, "Calling super class initialization"); //$NON-NLS-1$ super.initialize(context); + tracer.log(TraceLevel.TRACE, "Returned from super class initialization"); //$NON-NLS-1$ + JmsClasspathUtil.setupClassPaths(context); // set SSL system properties if(isSslConnection()) { + + tracer.log(TraceLevel.TRACE, "Setting up SSL connection"); //$NON-NLS-1$ + if(context.getParameterNames().contains("keyStore")) System.setProperty("javax.net.ssl.keyStore", getAbsolutePath(getKeyStore())); if(context.getParameterNames().contains("keyStorePassword")) diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java index 7ebe86a..c965395 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java @@ -15,8 +15,11 @@ import java.util.List; import java.util.logging.Logger; +import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.Queue; +import javax.jms.Topic; import javax.naming.NamingException; import javax.xml.parsers.ParserConfigurationException; @@ -727,7 +730,9 @@ public synchronized void initialize(OperatorContext context) tracer.log(TraceLevel.TRACE, "Begin initialize()"); //$NON-NLS-1$ + tracer.log(TraceLevel.TRACE, "Calling super class initialization"); //$NON-NLS-1$ super.initialize(context); + tracer.log(TraceLevel.TRACE, "Returned from super class initialization"); //$NON-NLS-1$ consistentRegionContext = context.getOptionalContext(ConsistentRegionContext.class); @@ -735,6 +740,9 @@ public synchronized void initialize(OperatorContext context) // set SSL system properties if(isSslConnection()) { + + tracer.log(TraceLevel.TRACE, "Setting up SSL connection"); //$NON-NLS-1$ + if(context.getParameterNames().contains("keyStore")) System.setProperty("javax.net.ssl.keyStore", getAbsolutePath(getKeyStore())); if(context.getParameterNames().contains("keyStorePassword")) @@ -1110,7 +1118,7 @@ private void handleJmsHeaderValues(Message msg, OutputTuple outTuple) throws JMS jmsDestinationAttrIdx = streamSchema.getAttributeIndex(this.getJmsDestinationOutAttrName()); } if(jmsDestinationAttrIdx != -1 && msg.getJMSDestination() != null) { - outTuple.setObject(jmsDestinationAttrIdx, new RString(msg.getJMSDestination().toString())); + outTuple.setObject(jmsDestinationAttrIdx, new RString(getDestinationName(msg.getJMSDestination()))); } if(this.getJmsDeliveryModeOutAttrName() != null) { @@ -1159,7 +1167,7 @@ private void handleJmsHeaderValues(Message msg, OutputTuple outTuple) throws JMS jmsReplyToAttrIdx = streamSchema.getAttributeIndex(this.getJmsReplyToOutAttrName()); } if(jmsReplyToAttrIdx != -1 && msg.getJMSReplyTo() != null) { - outTuple.setObject(jmsReplyToAttrIdx, new RString(msg.getJMSReplyTo().toString())); + outTuple.setObject(jmsReplyToAttrIdx, new RString(getDestinationName(msg.getJMSReplyTo()))); } if(this.getJmsTypeOutAttrName() != null) { @@ -1177,7 +1185,39 @@ private void handleJmsHeaderValues(Message msg, OutputTuple outTuple) throws JMS } } + + /** + * Determines and returns the destination name + * + * @param destination The destination to determine the name for. + * @return The name of a Queue or Topic, or a string representation of the destination object. + * @throws JMSException + */ + private String getDestinationName(Destination destination) throws JMSException { + if (destination instanceof Queue) return ((Queue)destination).getQueueName(); + if (destination instanceof Topic) return ((Topic)destination).getTopicName(); + return destination.toString(); + } + + /** + * Handles the property values of the current message. + * + * @param msg The current JMS message. + * @param outTuple The output tuple. + * @throws JMSException + */ +// private void handleJmsMessagePropertyValues(Message msg, OutputTuple outTuple) throws JMSException { +// Enumeration propertyNames = msg.getPropertyNames(); +// +// while (propertyNames.hasMoreElements()) { +// String name = propertyNames.nextElement(); +// +// +// } +// } + + // Send the error message on to the error output port if one is specified private void sendOutputErrorMsg(String errorMessage) { OutputTuple errorTuple = errorOutputPort.newTuple(); diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JmsClasspathUtil.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JmsClasspathUtil.java index 2577369..9a75a77 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JmsClasspathUtil.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JmsClasspathUtil.java @@ -5,34 +5,77 @@ package com.ibm.streamsx.messaging.jms; import java.net.MalformedURLException; +import java.util.Map; +import java.util.Set; +import java.util.logging.Logger; import com.ibm.streams.operator.OperatorContext; +import com.ibm.streams.operator.logging.TraceLevel; public class JmsClasspathUtil { + private static final String CLASS_NAME = "com.ibm.streamsx.messaging.jms.JmsClasspathUtil"; //$NON-NLS-1$ + private static final Logger tracer = Logger.getLogger(CLASS_NAME); + public static void setupClassPaths(OperatorContext context) { + + tracer.log(TraceLevel.TRACE, "Setting up classpath"); //$NON-NLS-1$ + + boolean classpathSet = false; + + + // Dump the provided environment + Map sysEnvMap = System.getenv(); + Set sysEnvKeys = sysEnvMap.keySet(); + tracer.log(TraceLevel.TRACE, "------------------------------------------------------------------------------------"); //$NON-NLS-1$ + tracer.log(TraceLevel.TRACE, "--- System Environment used during initialization"); //$NON-NLS-1$ + for( String key : sysEnvKeys) { + tracer.log(TraceLevel.TRACE, key + " = " + System.getenv(key)); //$NON-NLS-1$ + } + tracer.log(TraceLevel.TRACE, "------------------------------------------------------------------------------------"); //$NON-NLS-1$ + + String AMQ_HOME = System.getenv("STREAMS_MESSAGING_AMQ_HOME"); //$NON-NLS-1$ if (AMQ_HOME != null) { + + tracer.log(TraceLevel.TRACE, "Apache Active MQ classpath!"); //$NON-NLS-1$ + String lib = AMQ_HOME + "/lib/*"; //$NON-NLS-1$ String libOptional = AMQ_HOME + "/lib/optional/*"; //$NON-NLS-1$ try { + tracer.log(TraceLevel.TRACE, "Adding class libs to context"); //$NON-NLS-1$ context.addClassLibraries(new String[] { lib, libOptional }); - } catch (MalformedURLException e) { - } + catch (MalformedURLException e) { + tracer.log(TraceLevel.ERROR, "Failed to add class libs to context: " + e.getMessage()); //$NON-NLS-1$ + } + classpathSet = true; } String WMQ_HOME = System.getenv("STREAMS_MESSAGING_WMQ_HOME"); //$NON-NLS-1$ if (WMQ_HOME != null) { + + tracer.log(TraceLevel.TRACE, "IBM Websphere MQ classpath!"); //$NON-NLS-1$ + String javaLib = WMQ_HOME + "/java/lib/*"; //$NON-NLS-1$ try { + tracer.log(TraceLevel.TRACE, "Adding class libs to context"); //$NON-NLS-1$ context.addClassLibraries(new String[] { javaLib }); - } catch (MalformedURLException e) { - } + catch (MalformedURLException e) { + tracer.log(TraceLevel.ERROR, "Failed to add class libs to context: " + e.getMessage()); //$NON-NLS-1$ + } + classpathSet = true; } + + if( classpathSet != true ) { + tracer.log(TraceLevel.ERROR, "No classpath has been set!"); //$NON-NLS-1$ + } + + tracer.log(TraceLevel.TRACE, "Finished setting up classpath!"); //$NON-NLS-1$ + } } \ No newline at end of file diff --git a/com.ibm.streamsx.messaging/info.xml b/com.ibm.streamsx.messaging/info.xml index 536e111..c663ebf 100644 --- a/com.ibm.streamsx.messaging/info.xml +++ b/com.ibm.streamsx.messaging/info.xml @@ -684,7 +684,7 @@ The <attribute> element has three possible attributes: * composite types * xml - 5.4.0 + 5.4.1 4.2.0.0 diff --git a/com.ibm.streamsx.messaging/pom.xml b/com.ibm.streamsx.messaging/pom.xml index a309b3a..066c405 100644 --- a/com.ibm.streamsx.messaging/pom.xml +++ b/com.ibm.streamsx.messaging/pom.xml @@ -6,7 +6,7 @@ com.ibm.streamsx.messaging streamsx.messaging jar - 5.4.0 + 5.4.1 com.ibm.streamsx.messaging