diff --git a/src/main/java/org/hobbit/core/Constants.java b/src/main/java/org/hobbit/core/Constants.java index 82bbc7a..547e89a 100644 --- a/src/main/java/org/hobbit/core/Constants.java +++ b/src/main/java/org/hobbit/core/Constants.java @@ -30,6 +30,7 @@ private Constants() { } // =============== ENVIRONMENT CONSTANTS =============== + public static final String IS_RABBIT_MQ_ENABLED = "IS_RABBIT_MQ_ENABLED"; public static final String HOBBIT_SESSION_ID_KEY = "HOBBIT_SESSION_ID"; diff --git a/src/main/java/org/hobbit/core/com/Channel.java b/src/main/java/org/hobbit/core/com/Channel.java new file mode 100644 index 0000000..2918115 --- /dev/null +++ b/src/main/java/org/hobbit/core/com/Channel.java @@ -0,0 +1,73 @@ +package org.hobbit.core.com; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; + +import org.hobbit.core.components.AbstractCommandReceivingComponent; + +import com.rabbitmq.client.AMQP.BasicProperties; +import org.hobbit.core.rabbit.RabbitMQChannel; +import org.hobbit.core.com.java.DirectChannel; + +/** + * An interface with several methods that is implemented by {@link RabbitMQChannel} and {@link DirectChannel}. + * + * @author Altafhusen Makandar + * @author Sourabh Poddar + * @author Yamini Punetha + * @author Melissa Das + * + */ +public interface Channel { + + /** + * Method to accept messages from the channel. + */ + public void readBytes(Object consumerCallback, Object classs, Boolean autoAck, String queue) throws IOException; + + /** + * Method to publish a message of type Byte array the channel. + */ + public void writeBytes(byte data[], String exchange, String routingKey, BasicProperties props) throws IOException; + + /** + * Method to publish a message of type ByteBuffer to the channel. + */ + public void writeBytes(ByteBuffer buffer, String exchange, String routingKey, BasicProperties props) throws IOException; + + /** + * Method to close a channel. + */ + public void close(); + + /** + * Method to create a new channel. + */ + public void createChannel() throws Exception; + + /** + * Method to get the queue name. + */ + public String getQueueName(AbstractCommandReceivingComponent abstractCommandReceivingComponent) throws Exception; + + /** + * Method to create an exchange for the data transfer for a broker + */ + public void exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, + Map arguments) throws IOException; + + /** + * Method to bind a queue with an exchange. This allows an exchange to publish the messages + * to the queues that are bound to this exchange + */ + public void queueBind(String queue, String exchange, String routingKey) throws IOException; + + /** + * Method to return the channel instance. + */ + public Object getChannel(); + + public String declareQueue(String queueName) throws IOException; + +} diff --git a/src/main/java/org/hobbit/core/rabbit/DataHandler.java b/src/main/java/org/hobbit/core/com/DataHandler.java similarity index 70% rename from src/main/java/org/hobbit/core/rabbit/DataHandler.java rename to src/main/java/org/hobbit/core/com/DataHandler.java index a55fd3b..bc31eaa 100644 --- a/src/main/java/org/hobbit/core/rabbit/DataHandler.java +++ b/src/main/java/org/hobbit/core/com/DataHandler.java @@ -1,4 +1,4 @@ -package org.hobbit.core.rabbit; +package org.hobbit.core.com; public interface DataHandler { diff --git a/src/main/java/org/hobbit/core/rabbit/DataReceiver.java b/src/main/java/org/hobbit/core/com/DataReceiver.java similarity index 97% rename from src/main/java/org/hobbit/core/rabbit/DataReceiver.java rename to src/main/java/org/hobbit/core/com/DataReceiver.java index f43d953..a45e234 100644 --- a/src/main/java/org/hobbit/core/rabbit/DataReceiver.java +++ b/src/main/java/org/hobbit/core/com/DataReceiver.java @@ -1,4 +1,4 @@ -package org.hobbit.core.rabbit; +package org.hobbit.core.com; import java.io.Closeable; diff --git a/src/main/java/org/hobbit/core/rabbit/DataSender.java b/src/main/java/org/hobbit/core/com/DataSender.java similarity index 92% rename from src/main/java/org/hobbit/core/rabbit/DataSender.java rename to src/main/java/org/hobbit/core/com/DataSender.java index 7850304..fb80340 100644 --- a/src/main/java/org/hobbit/core/rabbit/DataSender.java +++ b/src/main/java/org/hobbit/core/com/DataSender.java @@ -1,4 +1,4 @@ -package org.hobbit.core.rabbit; +package org.hobbit.core.com; import java.io.Closeable; import java.io.IOException; diff --git a/src/main/java/org/hobbit/core/com/java/DirectCallback.java b/src/main/java/org/hobbit/core/com/java/DirectCallback.java new file mode 100644 index 0000000..a5340a1 --- /dev/null +++ b/src/main/java/org/hobbit/core/com/java/DirectCallback.java @@ -0,0 +1,31 @@ +package org.hobbit.core.com.java; + +import java.util.List; + +import org.hobbit.core.com.Channel; + +import com.rabbitmq.client.AMQP.BasicProperties; + +/** + * This class is used by the DirectChannel implementation + * for a callback function as a consumer callback + * @author altaf, sourabh, yamini, melisa + * + */ +public abstract class DirectCallback { + + protected Channel channel; + protected String queue; + protected BasicProperties props; + + public DirectCallback() {} + + public DirectCallback(Channel channel, String queue, BasicProperties props) { + this.channel = channel; + this.queue = queue; + this.props = props; + } + + public abstract void callback(byte[] data, List classs,BasicProperties props); + +} diff --git a/src/main/java/org/hobbit/core/com/java/DirectChannel.java b/src/main/java/org/hobbit/core/com/java/DirectChannel.java new file mode 100644 index 0000000..89697b4 --- /dev/null +++ b/src/main/java/org/hobbit/core/com/java/DirectChannel.java @@ -0,0 +1,117 @@ +package org.hobbit.core.com.java; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.Pipe; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.commons.lang3.StringUtils; +import org.hobbit.core.com.Channel; +import org.hobbit.core.components.AbstractCommandReceivingComponent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.rabbitmq.client.AMQP.BasicProperties; +/** + * This class implements the necessary functionality for message sharing + * without using RabbitMQ. DirectChannel uses Java NIO Pipe for message queuing + * implementation + * + * @author Altafhusen Makandar + * @author Sourabh Poddar + * @author Yamini Punetha + * @author Melissa Das + * + */ +public class DirectChannel implements Channel { + + private static final Logger LOGGER = LoggerFactory.getLogger(DirectChannel.class); + + static Map pipes = new HashMap<>(); + + PipeChannel pipeChannel; + private ExecutorService threadPool = Executors.newCachedThreadPool(); + public DirectChannel(){} + public DirectChannel(String queue){ + try { + if(pipes.get(queue) == null) { + pipeChannel = new PipeChannel(Pipe.open()); + pipes.put(queue, pipeChannel); + } + } catch (IOException e) { + LOGGER.error("Error creating pipe ",e); + } + } + + + @Override + public void readBytes(Object callback, Object classs, Boolean autoAck, String queue) { + threadPool.execute(new ReadByteChannel(pipes.get(queue), callback, classs)); + } + + @Override + public void writeBytes(byte[] data, String exchange, String routingKey, BasicProperties props) throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(data.length); + buffer.put(data); + writeBytes(buffer, exchange, routingKey, props); + } + + @Override + public void writeBytes(ByteBuffer buffer, String exchange, String routingKey, BasicProperties props) throws IOException { + String queue = StringUtils.isEmpty(exchange) ? routingKey : exchange; + if(!pipes.isEmpty()) { + pipes.get(queue).setProps(props); + String replyQueue = queue; + if(props != null && StringUtils.isNotBlank(props.getReplyTo())) { + replyQueue = props.getReplyTo(); + } + if(pipes.get(replyQueue).getPipe().sink().isOpen()) { + buffer.flip(); + while (buffer.hasRemaining()) + pipes.get(replyQueue).getPipe().sink().write(buffer); + buffer.clear(); + } + } + } + + @Override + public void close() { + /*if(ReadByteChannel.classes != null && ReadByteChannel.classes.size() > 0) { + ReadByteChannel.classes.clear(); + } + pipes.clear();*/ + } + + @Override + public void createChannel() throws Exception { + + } + + @Override + public String getQueueName(AbstractCommandReceivingComponent abstractCommandReceivingComponent) throws Exception { + return null; + } + + @Override + public void exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, + Map arguments) throws IOException { + + } + + @Override + public void queueBind(String queue, String exchange, String routingKey) throws IOException { + + } + + @Override + public Object getChannel() { + return null; + } + + @Override + public String declareQueue(String queueName) throws IOException { + return queueName; + } +} diff --git a/src/main/java/org/hobbit/core/com/java/DirectReceiverImpl.java b/src/main/java/org/hobbit/core/com/java/DirectReceiverImpl.java new file mode 100644 index 0000000..054551d --- /dev/null +++ b/src/main/java/org/hobbit/core/com/java/DirectReceiverImpl.java @@ -0,0 +1,65 @@ +package org.hobbit.core.com.java; + +import java.io.IOException; + +import org.hobbit.core.com.Channel; +import org.hobbit.core.com.DataHandler; +import org.hobbit.core.com.DataReceiver; +import org.hobbit.core.components.communicationfactory.ChannelFactory; +import org.hobbit.core.data.RabbitQueue; + +/** + * This class implements the methods of DataReceiver for {@link DirectChannel} + * + * @author Altafhusen Makander + * @author Sourabh Poddar + * @author Yamini Punetha + * @author Melissa Das + * + */ +public class DirectReceiverImpl implements DataReceiver { + + public DirectReceiverImpl(String queue, Object consumer) { + + Channel channel = new ChannelFactory().getChannel(false, queue, null); + try { + channel.readBytes(consumer, this, null, queue); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void close() throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public int getErrorCount() { + return 0; + } + + @Override + public void closeWhenFinished() { + // TODO Auto-generated method stub + + } + + @Override + public void increaseErrorCount() { + // TODO Auto-generated method stub + + } + + @Override + public DataHandler getDataHandler() { + return null; + } + + @Override + public RabbitQueue getQueue() { + return null; + } + +} diff --git a/src/main/java/org/hobbit/core/com/java/DirectSenderImpl.java b/src/main/java/org/hobbit/core/com/java/DirectSenderImpl.java new file mode 100644 index 0000000..78b9eb1 --- /dev/null +++ b/src/main/java/org/hobbit/core/com/java/DirectSenderImpl.java @@ -0,0 +1,60 @@ +package org.hobbit.core.com.java; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.hobbit.core.com.Channel; +import org.hobbit.core.com.DataSender; +import org.hobbit.core.components.communicationfactory.ChannelFactory; +/** + * DataSender implementation for DirectChannel + * @author altaf, sourabh, yamini, melisa + * + */ +public class DirectSenderImpl implements DataSender { + + Channel senderChannel; + String queue; + + public DirectSenderImpl(String queue){ + this.queue = queue; + senderChannel = new ChannelFactory().getChannel(false, queue, null); + } + + + @Override + public void close() throws IOException { + // TODO Auto-generated method stub + } + + @Override + public void sendData(byte[] data) throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(data.length); + buffer.put(data); + sleep(100000); + senderChannel.writeBytes(buffer, null, this.queue, null); + /*try { + Thread.sleep(0, 100); + sleep(100000); + senderChannel.writeBytes(buffer, null, this.queue, null); + } catch (Exception e) { + LOGGER.error("Error waiting during send data", e); + }*/ + } + + private void sleep(long interval) { + long start = System.nanoTime(); + long totalTime = start + interval; + long end = 0; + do { + end = System.nanoTime(); + }while(totalTime >= end); + } + + + @Override + public void closeWhenFinished() { + // TODO Auto-generated method stub + } + +} \ No newline at end of file diff --git a/src/main/java/org/hobbit/core/com/java/PipeChannel.java b/src/main/java/org/hobbit/core/com/java/PipeChannel.java new file mode 100644 index 0000000..1ab222b --- /dev/null +++ b/src/main/java/org/hobbit/core/com/java/PipeChannel.java @@ -0,0 +1,41 @@ +package org.hobbit.core.com.java; + +import java.nio.channels.Pipe; + +import com.rabbitmq.client.AMQP.BasicProperties; +/** + * PipeChannel is a POJO class to map pipe with the respective properties. + * @author altaf, sourabh, yamini, melisa + * + */ +public class PipeChannel { + + Pipe pipe; + BasicProperties props; + + public PipeChannel(Pipe pipe, BasicProperties props) { + this.pipe = pipe; + this.props = props; + } + + public PipeChannel(Pipe pipe) { + this.pipe = pipe; + } + + public Pipe getPipe() { + return pipe; + } + + public void setPipe(Pipe pipe) { + this.pipe = pipe; + } + + public BasicProperties getProps() { + return props; + } + + public void setProps(BasicProperties props) { + this.props = props; + } + +} diff --git a/src/main/java/org/hobbit/core/com/java/ReadByteChannel.java b/src/main/java/org/hobbit/core/com/java/ReadByteChannel.java new file mode 100644 index 0000000..5fe682e --- /dev/null +++ b/src/main/java/org/hobbit/core/com/java/ReadByteChannel.java @@ -0,0 +1,98 @@ +package org.hobbit.core.com.java; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.rabbitmq.client.AMQP.BasicProperties; +/** + * For each readByte call for DirectChannel implementation, + * a thread of ReadByteChannel is created. This makes sure that + * the execution takes place on a separate thread by waiting for + * messages to be received by the pipe. + * @author altaf, sourabh, yamini, melisa + * + */ +public class ReadByteChannel extends DirectChannel implements Runnable{ + + private static final Logger LOGGER = LoggerFactory.getLogger(ReadByteChannel.class); + + PipeChannel pipeChannel; + DirectCallback callback; + public static ArrayList classes = new ArrayList<>(); + private ExecutorService threadPool = Executors.newCachedThreadPool(); + + public ReadByteChannel(PipeChannel pipeChannel, Object callback, Object classs) { + this.pipeChannel = pipeChannel; + this.callback = (DirectCallback) callback; + classes.add(classs); + } + + /** + * Method implements the read bytes for the incoming data and makes a call to + * the callback method which consumes the data and does further processing. + */ + @Override + public void run() { + ByteBuffer buffer = ByteBuffer.allocate(1024); + try { + if(pipeChannel != null && pipeChannel.getPipe() !=null) { + while(pipeChannel.getPipe().source().isOpen() && + pipeChannel.getPipe().source().read(buffer) > 0){ + threadPool.execute(new ProcessCallback(callback, clone(buffer), pipeChannel.getProps())); + buffer.clear(); + } + } + } catch (IOException e) { + LOGGER.error("Reader pipe not found",e); + } + + } + /** + * Clone a Bytebuffer + */ + public ByteBuffer clone(ByteBuffer original) { + ByteBuffer clone = ByteBuffer.allocate(original.capacity()); + original.rewind(); + clone.put(original); + original.rewind(); + clone.flip(); + return clone; + } + + /** + * Class to make a callback request to incoming data + * @author altaf + * + */ + protected class ProcessCallback implements Runnable { + DirectCallback callbackObj; + ByteBuffer byteBuffer; + BasicProperties props; + + ProcessCallback(DirectCallback callback, ByteBuffer byteBuffer, BasicProperties props){ + this.callbackObj = callback; + this.byteBuffer = byteBuffer; + this.props = props; + } + + @Override + public void run() { + callbackObj.callback(getNonEmptyArray(byteBuffer), classes, props); + } + + public byte[] getNonEmptyArray(ByteBuffer buffer) { + byte[] inputArray = buffer.array().clone(); + int i = inputArray.length - 1; + while (i >= 0 && inputArray[i] == 0) { + --i; + } + return Arrays.copyOf(inputArray, i + 1); + } + } + +} \ No newline at end of file diff --git a/src/main/java/org/hobbit/core/components/AbstractCommandReceivingComponent.java b/src/main/java/org/hobbit/core/components/AbstractCommandReceivingComponent.java index 073d141..a30fea7 100644 --- a/src/main/java/org/hobbit/core/components/AbstractCommandReceivingComponent.java +++ b/src/main/java/org/hobbit/core/components/AbstractCommandReceivingComponent.java @@ -22,6 +22,7 @@ import java.util.concurrent.Future; import java.util.stream.Stream; import java.io.IOException; +import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; @@ -29,6 +30,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -37,6 +39,7 @@ import org.apache.commons.io.IOUtils; import org.hobbit.core.Commands; import org.hobbit.core.Constants; +import org.hobbit.core.com.java.DirectCallback; import org.hobbit.core.data.StartCommandData; import org.hobbit.core.data.StopCommandData; import org.hobbit.core.rabbit.RabbitMQUtils; @@ -79,7 +82,7 @@ public abstract class AbstractCommandReceivingComponent extends AbstractComponen * Consumer of the queue that is used to receive responses for messages that * are sent via the command queue and for which an answer is expected. */ - private Consumer responseConsumer = null; + private Object responseConsumer = null; /** * Factory for generating queues with which the commands are sent and * received. It is separated from the data connections since otherwise the @@ -90,7 +93,7 @@ public abstract class AbstractCommandReceivingComponent extends AbstractComponen /** * Channel that is used for the command queue. */ - protected Channel cmdChannel = null; + //protected Channel cmdChannel = null; /** * Default type of containers created by this container */ @@ -110,6 +113,10 @@ public abstract class AbstractCommandReceivingComponent extends AbstractComponen private ExecutorService cmdThreadPool; + public ExecutorService getCmdThreadPool() { + return cmdThreadPool; + } + public AbstractCommandReceivingComponent() { this(false); } @@ -129,29 +136,12 @@ public void init() throws Exception { super.init(); addCommandHeaderId(getHobbitSessionId()); - cmdQueueFactory = new RabbitQueueFactoryImpl(createConnection()); - cmdChannel = cmdQueueFactory.getConnection().createChannel(); - String queueName = cmdChannel.queueDeclare().getQueue(); - cmdChannel.exchangeDeclare(Constants.HOBBIT_COMMAND_EXCHANGE_NAME, "fanout", false, true, null); - cmdChannel.queueBind(queueName, Constants.HOBBIT_COMMAND_EXCHANGE_NAME, ""); - - Consumer consumer = new DefaultConsumer(cmdChannel) { - @Override - public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, - byte[] body) throws IOException { - cmdThreadPool.execute(new Runnable() { - @Override - public void run() { - try { - handleCmd(body, properties); - } catch (Exception e) { - LOGGER.error("Exception while trying to handle incoming command.", e); - } - } - }); - } - }; - cmdChannel.basicConsume(queueName, true, consumer); + commonChannel.createChannel(); + String queueName = commonChannel.declareQueue(null) == null ? Constants.HOBBIT_COMMAND_EXCHANGE_NAME : commonChannel.getQueueName(this); + commonChannel.exchangeDeclare(Constants.HOBBIT_COMMAND_EXCHANGE_NAME, "fanout", false, true, null); + commonChannel.queueBind(queueName, Constants.HOBBIT_COMMAND_EXCHANGE_NAME, ""); + Object consumerCallback = getCommonConsumer(); + commonChannel.readBytes(consumerCallback, this, true, queueName); containerName = EnvVariables.getString(Constants.CONTAINER_NAME_KEY, containerName); if (containerName == null) { @@ -215,7 +205,7 @@ protected void sendToCmdQueue(byte command, byte data[], BasicProperties props) if (attachData) { buffer.put(data); } - cmdChannel.basicPublish(Constants.HOBBIT_COMMAND_EXCHANGE_NAME, "", props, buffer.array()); + commonChannel.writeBytes(buffer, Constants.HOBBIT_COMMAND_EXCHANGE_NAME, "", props); } /** @@ -239,7 +229,8 @@ protected void addCommandHeaderId(String sessionId) { * properties of the RabbitMQ message */ protected void handleCmd(byte bytes[], AMQP.BasicProperties props) { - handleCmd(bytes, props.getReplyTo()); + String replyTo = props!=null?props.getReplyTo():""; + handleCmd(bytes, replyTo); } /** @@ -251,7 +242,7 @@ protected void handleCmd(byte bytes[], AMQP.BasicProperties props) { * @param replyTo * name of the queue in which response is expected */ - protected void handleCmd(byte bytes[], String replyTo) { + public void handleCmd(byte bytes[], String replyTo) { ByteBuffer buffer = ByteBuffer.wrap(bytes); String sessionId = RabbitMQUtils.readString(buffer); if (acceptedCmdHeaderIds.contains(sessionId)) { @@ -486,43 +477,17 @@ protected void stopContainer(String containerName) { */ private void initResponseQueue() throws IOException { if (responseQueueName == null) { - responseQueueName = cmdChannel.queueDeclare().getQueue(); + try { + commonChannel.createChannel(); + responseQueueName = commonChannel.declareQueue(null);//cmdChannel.queueDeclare().getQueue(); + } catch (Exception e) { + LOGGER.error("Error creating channel",e); + } } if (responseConsumer == null) { - responseConsumer = new DefaultConsumer(cmdChannel) { - @Override - public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, - byte[] body) throws IOException { - String key = properties.getCorrelationId(); - - synchronized (responseFutures) { - SettableFuture future = null; - if (key != null) { - future = responseFutures.remove(key); - if (future == null) { - LOGGER.error("Received a message with correlationId ({}) not in map ({})", key, responseFutures.keySet()); - } - } else { - LOGGER.warn("Received a message with null correlationId. This is an error unless the other component uses an older version of HOBBIT core library."); - Iterator> iter = responseFutures.values().iterator(); - if (iter.hasNext()) { - LOGGER.info("Correlating with the eldest request as a workaround."); - future = iter.next(); - iter.remove(); - } else { - LOGGER.error("There are no pending requests."); - } - } + responseConsumer = getResponseConsumer(); + commonChannel.readBytes(responseConsumer, this, null, responseQueueName); - if (future != null) { - String value = RabbitMQUtils.readString(body); - future.set(value); - } - } - } - }; - - cmdChannel.basicConsume(responseQueueName, responseConsumer); } } @@ -542,17 +507,167 @@ public void setCmdResponseTimeout(long cmdResponseTimeout) { @Override public void close() throws IOException { - if (cmdChannel != null) { + /*if (cmdChannel != null) { try { cmdChannel.close(); } catch (Exception e) { } - } + }*/ IOUtils.closeQuietly(cmdQueueFactory); - if (cmdThreadPool != null) { + /*if (cmdThreadPool != null) { cmdThreadPool.shutdown(); - } + }*/ super.close(); } + /** + * Provides the instance for command queue based on property {@link org.hobbit.core.Constants#IS_RABBIT_MQ_ENABLED} + * @return + */ + private Object getCommonConsumer() { + Object consumer = null; + if(isRabbitMQEnabled()) { + consumer = getCommonDefaultConsumer(); + } else { + consumer = getCommonDirectConsumer(); + } + return consumer; + } + /** + * RabbitMQ consumer for command queue + */ + private Object getCommonDefaultConsumer() { + + return new DefaultConsumer((Channel) commonChannel.getChannel()) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, + byte[] body) throws IOException { + cmdThreadPool.execute(new Runnable() { + @Override + public void run() { + try { + handleCmd(body, properties); + } catch (Exception e) { + LOGGER.error("Exception while trying to handle incoming command.", e); + } + } + }); + } + }; + } + /** + * Direct consumer for command queue + */ + private Object getCommonDirectConsumer() { + return new DirectCallback() { + @Override + public void callback(byte[] data, List cmdCallbackObjectList, BasicProperties props) { + for(Object cmdCallbackObject:cmdCallbackObjectList) { + if(cmdCallbackObject != null && + cmdCallbackObject instanceof AbstractCommandReceivingComponent) { + cmdThreadPool.execute(new Runnable() { + @Override + public void run() { + try { + ((AbstractCommandReceivingComponent) cmdCallbackObject). + handleCmd(data, ""); + } catch (Exception e) { + LOGGER.error("Exception while trying to handle incoming command.", e); + } + } + }); + } + } + } + }; + } + /** + * Provides the consumer for container creation + */ + private Object getResponseConsumer() { + Object consumer = null; + if(isRabbitMQEnabled()) { + consumer = getResponseDefaultConsumer(); + } else { + consumer = getResponseDirectConsumer(); + } + return consumer; + } + /** + * Provides RabbirMQ consumer for container creation + */ + private Object getResponseDefaultConsumer() { + + return new DefaultConsumer((Channel) commonChannel.getChannel()) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, + byte[] body) throws IOException { + String key = properties.getCorrelationId(); + + synchronized (responseFutures) { + SettableFuture future = null; + if (key != null) { + future = responseFutures.remove(key); + if (future == null) { + LOGGER.error("Received a message with correlationId ({}) not in map ({})", key, responseFutures.keySet()); + } + } else { + LOGGER.warn("Received a message with null correlationId. This is an error unless the other component uses an older version of HOBBIT core library."); + Iterator> iter = responseFutures.values().iterator(); + if (iter.hasNext()) { + LOGGER.info("Correlating with the eldest request as a workaround."); + future = iter.next(); + iter.remove(); + } else { + LOGGER.error("There are no pending requests."); + } + } + + if (future != null) { + String value = RabbitMQUtils.readString(body); + future.set(value); + } + } + } + }; + } + /** + * Provides Direct consumer for container creation + */ + private Object getResponseDirectConsumer() { + + return new DirectCallback() { + + @Override + public void callback(byte[] data, List classs, BasicProperties properties) { + String key = properties.getCorrelationId(); + + synchronized (responseFutures) { + SettableFuture future = null; + if (key != null) { + future = responseFutures.remove(key); + if (future == null) { + LOGGER.error("Received a message with correlationId ({}) not in map ({})", key, responseFutures.keySet()); + } + } else { + LOGGER.warn("Received a message with null correlationId. This is an error unless the other component uses an older version of HOBBIT core library."); + Iterator> iter = responseFutures.values().iterator(); + if (iter.hasNext()) { + LOGGER.info("Correlating with the eldest request as a workaround."); + future = iter.next(); + iter.remove(); + } else { + LOGGER.error("There are no pending requests."); + } + } + + if (future != null) { + String value = RabbitMQUtils.readString(data); + future.set(value); + } + } + } + + }; + } } diff --git a/src/main/java/org/hobbit/core/components/AbstractComponent.java b/src/main/java/org/hobbit/core/components/AbstractComponent.java index adf6d8b..5ea0a9f 100644 --- a/src/main/java/org/hobbit/core/components/AbstractComponent.java +++ b/src/main/java/org/hobbit/core/components/AbstractComponent.java @@ -20,6 +20,8 @@ import org.apache.commons.io.IOUtils; import org.hobbit.core.Constants; +import org.hobbit.core.com.Channel; +import org.hobbit.core.components.communicationfactory.ChannelFactory; import org.hobbit.core.rabbit.RabbitQueueFactory; import org.hobbit.core.rabbit.RabbitQueueFactoryImpl; import org.hobbit.utils.EnvVariables; @@ -49,15 +51,17 @@ public abstract class AbstractComponent implements Component { */ public static final long START_WAITING_TIME_BEFORE_RETRY = 5000; + public static final String TRUE = "true"; + private String hobbitSessionId; /** * Factory for creating outgoing data queues. */ - protected RabbitQueueFactory outgoingDataQueuefactory = null; + protected Channel outgoingDataQueuefactory = null; /** * Factory for creating outgoing data queues. */ - protected RabbitQueueFactory incomingDataQueueFactory = null; + protected Channel incomingDataQueueFactory = null; /** * The host name of the RabbitMQ broker. */ @@ -70,11 +74,46 @@ public abstract class AbstractComponent implements Component { */ protected ConnectionFactory connectionFactory; + /** + * Abstract reference for channel abstraction + * @return + */ + protected Channel commonChannel = null; + + public ConnectionFactory getConnectionFactory() { + return connectionFactory; + } + + public void setConnectionFactory(ConnectionFactory connectionFactory) { + this.connectionFactory = connectionFactory; + } + + public String getRabbitMQHostName() { + return rabbitMQHostName; + } + + public void setRabbitMQHostName(String rabbitMQHostName) { + this.rabbitMQHostName = rabbitMQHostName; + } + @Override public void init() throws Exception { hobbitSessionId = EnvVariables.getString(Constants.HOBBIT_SESSION_ID_KEY, Constants.HOBBIT_SESSION_ID_FOR_PLATFORM_COMPONENTS); + setConnectionFactory(); + commonChannel = new ChannelFactory().getChannel(isRabbitMQEnabled(), + Constants.HOBBIT_COMMAND_EXCHANGE_NAME, connectionFactory); + incomingDataQueueFactory = new ChannelFactory().getChannel(isRabbitMQEnabled(), + "", connectionFactory); + outgoingDataQueuefactory = new ChannelFactory().getChannel(isRabbitMQEnabled(), + "", connectionFactory); + incomingDataQueueFactory.createChannel(); + outgoingDataQueuefactory.createChannel(); + + } + private void setConnectionFactory() { + connectionFactory = new ConnectionFactory(); if (rabbitMQHostName == null) { rabbitMQHostName = EnvVariables.getString(Constants.RABBIT_MQ_HOST_NAME_KEY, LOGGER); } @@ -84,15 +123,14 @@ public void init() throws Exception { connectionFactory.setHost(splitted[0]); connectionFactory.setPort(Integer.parseInt(splitted[1])); }else - connectionFactory.setHost(rabbitMQHostName); + connectionFactory.setHost(rabbitMQHostName); connectionFactory.setAutomaticRecoveryEnabled(true); // attempt recovery every 10 seconds connectionFactory.setNetworkRecoveryInterval(10000); - incomingDataQueueFactory = new RabbitQueueFactoryImpl(createConnection()); - outgoingDataQueuefactory = new RabbitQueueFactoryImpl(createConnection()); + } - protected Connection createConnection() throws Exception { + public Connection createConnection() throws Exception { Connection connection = null; Exception exception = null; for (int i = 0; (connection == null) && (i <= NUMBER_OF_RETRIES_TO_CONNECT_TO_RABBIT_MQ); ++i) { @@ -122,8 +160,9 @@ protected Connection createConnection() throws Exception { @Override public void close() throws IOException { - IOUtils.closeQuietly(incomingDataQueueFactory); - IOUtils.closeQuietly(outgoingDataQueuefactory); + incomingDataQueueFactory.close(); + outgoingDataQueuefactory.close(); + commonChannel.close(); } public String getHobbitSessionId() { @@ -133,5 +172,22 @@ public String getHobbitSessionId() { public String generateSessionQueueName(String queueName) { return queueName + "." + hobbitSessionId; } + /** + * Method gets the property value for {@link org.hobbit.core.Constants#IS_RABBIT_MQ_ENABLED} + * from environment variables. Sets the default value to true if the property not found. + */ + + protected boolean isRabbitMQEnabled() { + boolean isRabbitMQEnabled = false; + try { + if(EnvVariables.getString(Constants.IS_RABBIT_MQ_ENABLED, LOGGER).equals(TRUE)) { + isRabbitMQEnabled = true; + } + } catch(Exception e) { + LOGGER.error("Unable to fetch the property for RabbitMQ Enabled, setting the default to true"); + isRabbitMQEnabled = true; + } + return isRabbitMQEnabled; + } } diff --git a/src/main/java/org/hobbit/core/components/AbstractDataGenerator.java b/src/main/java/org/hobbit/core/components/AbstractDataGenerator.java index 8fa66d5..ad4e81c 100644 --- a/src/main/java/org/hobbit/core/components/AbstractDataGenerator.java +++ b/src/main/java/org/hobbit/core/components/AbstractDataGenerator.java @@ -18,12 +18,11 @@ import java.io.IOException; import java.util.concurrent.Semaphore; - import org.apache.commons.io.IOUtils; import org.hobbit.core.Commands; import org.hobbit.core.Constants; -import org.hobbit.core.rabbit.DataSender; -import org.hobbit.core.rabbit.DataSenderImpl; +import org.hobbit.core.com.DataSender; +import org.hobbit.core.components.communicationfactory.SenderReceiverFactory; import org.hobbit.utils.EnvVariables; public abstract class AbstractDataGenerator extends AbstractPlatformConnectorComponent { @@ -45,10 +44,12 @@ public void init() throws Exception { generatorId = EnvVariables.getInt(Constants.GENERATOR_ID_KEY); numberOfGenerators = EnvVariables.getInt(Constants.GENERATOR_COUNT_KEY); - sender2TaskGen = DataSenderImpl.builder().queue(getFactoryForOutgoingDataQueues(), - generateSessionQueueName(Constants.DATA_GEN_2_TASK_GEN_QUEUE_NAME)).build(); - sender2System = DataSenderImpl.builder().queue(getFactoryForOutgoingDataQueues(), - generateSessionQueueName(Constants.DATA_GEN_2_SYSTEM_QUEUE_NAME)).build(); + sender2TaskGen = SenderReceiverFactory.getSenderImpl(isRabbitMQEnabled(), + generateSessionQueueName(Constants.DATA_GEN_2_TASK_GEN_QUEUE_NAME), this); + + sender2System = SenderReceiverFactory.getSenderImpl(isRabbitMQEnabled(), + generateSessionQueueName(Constants.DATA_GEN_2_SYSTEM_QUEUE_NAME), this); + } @Override diff --git a/src/main/java/org/hobbit/core/components/AbstractEvaluationModule.java b/src/main/java/org/hobbit/core/components/AbstractEvaluationModule.java index cd97f44..d6a35b2 100644 --- a/src/main/java/org/hobbit/core/components/AbstractEvaluationModule.java +++ b/src/main/java/org/hobbit/core/components/AbstractEvaluationModule.java @@ -19,6 +19,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; import org.apache.commons.io.IOUtils; import org.apache.jena.rdf.model.Model; @@ -26,6 +27,8 @@ import org.apache.jena.vocabulary.RDF; import org.hobbit.core.Commands; import org.hobbit.core.Constants; +import org.hobbit.core.com.Channel; +import org.hobbit.core.com.java.DirectCallback; import org.hobbit.core.data.RabbitQueue; import org.hobbit.core.rabbit.RabbitMQUtils; import org.hobbit.utils.EnvVariables; @@ -34,7 +37,9 @@ import org.slf4j.LoggerFactory; import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; +import com.rabbitmq.client.ShutdownSignalException; /** * This abstract class implements basic functions that can be used to implement @@ -50,10 +55,14 @@ public abstract class AbstractEvaluationModule extends AbstractPlatformConnector /** * Consumer used to receive the responses from the evaluation storage. */ - protected QueueingConsumer consumer; + protected Object consumer; /** * Queue to the evaluation storage. */ + protected Channel evalModule2EvalStoreChannel; + /** + * Channel to the evaluation storage. + */ protected RabbitQueue evalModule2EvalStoreQueue; /** * Incoming queue from the evaluation storage. @@ -74,14 +83,14 @@ public void init() throws Exception { // Get the experiment URI experimentUri = EnvVariables.getString(Constants.HOBBIT_EXPERIMENT_URI_KEY, LOGGER); + + getFactoryForOutgoingDataQueues().declareQueue(generateSessionQueueName(Constants.EVAL_MODULE_2_EVAL_STORAGE_DEFAULT_QUEUE_NAME)); + getFactoryForIncomingDataQueues().declareQueue(generateSessionQueueName(Constants.EVAL_STORAGE_2_EVAL_MODULE_DEFAULT_QUEUE_NAME)); + + consumer = getConsumer(); + + getFactoryForIncomingDataQueues().readBytes(consumer, this, true, getFactoryForIncomingDataQueues().getQueueName(null)); - evalModule2EvalStoreQueue = getFactoryForOutgoingDataQueues() - .createDefaultRabbitQueue(generateSessionQueueName(Constants.EVAL_MODULE_2_EVAL_STORAGE_DEFAULT_QUEUE_NAME)); - evalStore2EvalModuleQueue = getFactoryForIncomingDataQueues() - .createDefaultRabbitQueue(generateSessionQueueName(Constants.EVAL_STORAGE_2_EVAL_MODULE_DEFAULT_QUEUE_NAME)); - - consumer = new QueueingConsumer(evalStore2EvalModuleQueue.channel); - evalStore2EvalModuleQueue.channel.basicConsume(evalStore2EvalModuleQueue.name, consumer); } @Override @@ -113,11 +122,15 @@ protected void collectResponses() throws Exception { while (true) { // request next response pair - props = new BasicProperties.Builder().deliveryMode(2).replyTo(evalStore2EvalModuleQueue.name).build(); - evalModule2EvalStoreQueue.channel.basicPublish("", evalModule2EvalStoreQueue.name, props, requestBody); - QueueingConsumer.Delivery delivery = consumer.nextDelivery(); + props = new BasicProperties.Builder().deliveryMode(2).replyTo(getFactoryForIncomingDataQueues().getQueueName(this)).build(); + getFactoryForOutgoingDataQueues().writeBytes(requestBody, "", getFactoryForOutgoingDataQueues().getQueueName(this), props); + //QueueingConsumer.Delivery delivery = consumer.nextDelivery(); + + //props = new BasicProperties.Builder().deliveryMode(2).replyTo(evalStore2EvalModuleQueue.name).build(); + //evalModule2EvalStoreQueue.channel.basicPublish("", evalModule2EvalStoreQueue.name, props, requestBody); + //QueueingConsumer.Delivery delivery = consumer.nextDelivery(); // parse the response - buffer = ByteBuffer.wrap(delivery.getBody()); + buffer = ByteBuffer.wrap(getBodyFromResponse()); // if the response is empty if (buffer.remaining() == 0) { LOGGER.error("Got a completely empty response from the evaluation storage."); @@ -206,4 +219,23 @@ protected Model createDefaultModel() { resultModel.add(resultModel.createResource(experimentUri), RDF.type, HOBBIT.Experiment); return resultModel; } + + private Object getConsumer() { + if(isRabbitMQEnabled()) { + return new QueueingConsumer((com.rabbitmq.client.Channel) getFactoryForIncomingDataQueues().getChannel()); + } + return new DirectCallback() { + @Override + public void callback(byte[] data, List classs, BasicProperties props) { + // TODO Auto-generated method stub + }}; + } + + private byte[] getBodyFromResponse() throws Exception { + if(isRabbitMQEnabled()) { + QueueingConsumer.Delivery delivery = ((QueueingConsumer)consumer).nextDelivery(); + return delivery.getBody(); + } + return new byte[] {}; + } } diff --git a/src/main/java/org/hobbit/core/components/AbstractEvaluationStorage.java b/src/main/java/org/hobbit/core/components/AbstractEvaluationStorage.java index 4651fe7..a6219ca 100644 --- a/src/main/java/org/hobbit/core/components/AbstractEvaluationStorage.java +++ b/src/main/java/org/hobbit/core/components/AbstractEvaluationStorage.java @@ -20,25 +20,31 @@ import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; import org.apache.commons.io.IOUtils; import org.apache.jena.ext.com.google.common.collect.Lists; import org.hobbit.core.Commands; import org.hobbit.core.Constants; +import org.hobbit.core.com.Channel; +import org.hobbit.core.com.DataHandler; +import org.hobbit.core.com.DataReceiver; +import org.hobbit.core.com.java.DirectCallback; +import org.hobbit.core.components.communicationfactory.ChannelFactory; +import org.hobbit.core.components.communicationfactory.SenderReceiverFactory; import org.hobbit.core.data.RabbitQueue; import org.hobbit.core.data.Result; import org.hobbit.core.data.ResultPair; -import org.hobbit.core.rabbit.DataHandler; -import org.hobbit.core.rabbit.DataReceiver; import org.hobbit.core.rabbit.DataReceiverImpl; +import org.hobbit.core.rabbit.RabbitMQChannel; import org.hobbit.core.rabbit.RabbitMQUtils; import org.hobbit.utils.EnvVariables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.rabbitmq.client.AMQP.BasicProperties; -import com.rabbitmq.client.Channel; +import com.rabbitmq.client.AMQP; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; @@ -99,6 +105,10 @@ public abstract class AbstractEvaluationStorage extends AbstractPlatformConnecto * Channel on which the acknowledgements are send. */ protected Channel ackChannel = null; + + protected Channel evaluationStorageChannel = null; + + private ExecutorService cmdThreadPool; /** * Constructor using the {@link #DEFAULT_MAX_PARALLEL_PROCESSED_MESSAGES}= @@ -122,125 +132,39 @@ public AbstractEvaluationStorage(int maxParallelProcessedMsgs) { @Override public void init() throws Exception { - super.init(); + super.init(); String queueName = EnvVariables.getString(Constants.TASK_GEN_2_EVAL_STORAGE_QUEUE_NAME_KEY, Constants.TASK_GEN_2_EVAL_STORAGE_DEFAULT_QUEUE_NAME); - taskResultReceiver = DataReceiverImpl.builder().maxParallelProcessedMsgs(maxParallelProcessedMsgs) - .queue(incomingDataQueueFactory, generateSessionQueueName(queueName)).dataHandler(new DataHandler() { - @Override - public void handleData(byte[] data) { - ByteBuffer buffer = ByteBuffer.wrap(data); - String taskId = RabbitMQUtils.readString(buffer); - LOGGER.trace("Received from task generator {}.", taskId); - byte[] taskData = RabbitMQUtils.readByteArray(buffer); - long timestamp = buffer.getLong(); - receiveExpectedResponseData(taskId, timestamp, taskData); - } - }).build(); + Object taskresultconsumer= getTaskResultConsumer(); + + taskResultReceiver = SenderReceiverFactory.getReceiverImpl(isRabbitMQEnabled(), + generateSessionQueueName(queueName), taskresultconsumer, maxParallelProcessedMsgs,this); queueName = EnvVariables.getString(Constants.SYSTEM_2_EVAL_STORAGE_QUEUE_NAME_KEY, Constants.SYSTEM_2_EVAL_STORAGE_DEFAULT_QUEUE_NAME); final boolean receiveTimeStamp = EnvVariables.getBoolean(RECEIVE_TIMESTAMP_FOR_SYSTEM_RESULTS_KEY, false, LOGGER); final String ackExchangeName = generateSessionQueueName(Constants.HOBBIT_ACK_EXCHANGE_NAME); - systemResultReceiver = DataReceiverImpl.builder().maxParallelProcessedMsgs(maxParallelProcessedMsgs) - .queue(incomingDataQueueFactory, generateSessionQueueName(queueName)).dataHandler(new DataHandler() { - @Override - public void handleData(byte[] data) { - ByteBuffer buffer = ByteBuffer.wrap(data); - String taskId = RabbitMQUtils.readString(buffer); - LOGGER.trace("Received from system {}.", taskId); - byte[] responseData = RabbitMQUtils.readByteArray(buffer); - long timestamp = receiveTimeStamp ? buffer.getLong() : System.currentTimeMillis(); - receiveResponseData(taskId, timestamp, responseData); - // If we should send acknowledgments (and there was no - // error until now) - if (ackChannel != null) { - try { - ackChannel.basicPublish(ackExchangeName, "", null, RabbitMQUtils.writeString(taskId)); - } catch (IOException e) { - LOGGER.error("Error while sending acknowledgement.", e); - } - LOGGER.trace("Sent ack {}.", taskId); - } - } - }).build(); + Object systemresultconsumer= getSystemResultConsumer(receiveTimeStamp, ackExchangeName); + systemResultReceiver = SenderReceiverFactory.getReceiverImpl(isRabbitMQEnabled(), + generateSessionQueueName(queueName), systemresultconsumer, maxParallelProcessedMsgs,this); queueName = EnvVariables.getString(Constants.EVAL_MODULE_2_EVAL_STORAGE_QUEUE_NAME_KEY, Constants.EVAL_MODULE_2_EVAL_STORAGE_DEFAULT_QUEUE_NAME); - evalModule2EvalStoreQueue = getFactoryForIncomingDataQueues() - .createDefaultRabbitQueue(generateSessionQueueName(queueName)); - evalModule2EvalStoreQueue.channel.basicConsume(evalModule2EvalStoreQueue.name, true, - new DefaultConsumer(evalModule2EvalStoreQueue.channel) { - @Override - public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, - byte[] body) throws IOException { - byte response[] = null; - // get iterator id - ByteBuffer buffer = ByteBuffer.wrap(body); - if (buffer.remaining() < 1) { - response = EMPTY_RESPONSE; - LOGGER.error("Got a request without a valid iterator Id. Returning emtpy response."); - } else { - byte iteratorId = buffer.get(); - - // get the iterator - Iterator iterator = null; - if (iteratorId == NEW_ITERATOR_ID) { - // create and save a new iterator - iteratorId = (byte) resultPairIterators.size(); - LOGGER.info("Creating new iterator #{}", iteratorId); - resultPairIterators.add(iterator = createIterator()); - } else if ((iteratorId < 0) || iteratorId >= resultPairIterators.size()) { - response = EMPTY_RESPONSE; - LOGGER.error("Got a request without a valid iterator Id (" + Byte.toString(iteratorId) - + "). Returning emtpy response."); - } else { - iterator = resultPairIterators.get(iteratorId); - } - if ((iterator != null) && (iterator.hasNext())) { - ResultPair resultPair = iterator.next(); - Result result = resultPair.getExpected(); - byte expectedResultData[], expectedResultTimeStamp[], actualResultData[], - actualResultTimeStamp[]; - // Make sure that the result is not null - if (result != null) { - // Check whether the data array is null - expectedResultData = result.getData() != null ? result.getData() : new byte[0]; - expectedResultTimeStamp = RabbitMQUtils.writeLong(result.getSentTimestamp()); - } else { - expectedResultData = new byte[0]; - expectedResultTimeStamp = RabbitMQUtils.writeLong(0); - } - result = resultPair.getActual(); - // Make sure that the result is not null - if (result != null) { - // Check whether the data array is null - actualResultData = result.getData() != null ? result.getData() : new byte[0]; - actualResultTimeStamp = RabbitMQUtils.writeLong(result.getSentTimestamp()); - } else { - actualResultData = new byte[0]; - actualResultTimeStamp = RabbitMQUtils.writeLong(0); - } - - response = RabbitMQUtils - .writeByteArrays( - new byte[] { iteratorId }, new byte[][] { expectedResultTimeStamp, - expectedResultData, actualResultTimeStamp, actualResultData }, - null); - } else { - response = new byte[] { iteratorId }; - } - } - getChannel().basicPublish("", properties.getReplyTo(), null, response); - } - }); - + + getFactoryForIncomingDataQueues().declareQueue(generateSessionQueueName(queueName)); + Object consumerCallback = getConsumerCallback(getFactoryForIncomingDataQueues().getQueueName(this)); + + + getFactoryForIncomingDataQueues().readBytes(consumerCallback, this, true, getFactoryForIncomingDataQueues().getQueueName(null)); + boolean sendAcks = EnvVariables.getBoolean(Constants.ACKNOWLEDGEMENT_FLAG_KEY, false, LOGGER); if (sendAcks) { // Create channel for acknowledgements - ackChannel = getFactoryForOutgoingCmdQueues().getConnection().createChannel(); + ackChannel = new ChannelFactory().getChannel(isRabbitMQEnabled(), + generateSessionQueueName(Constants.HOBBIT_ACK_EXCHANGE_NAME), connectionFactory); + ackChannel.createChannel(); ackChannel.exchangeDeclare(generateSessionQueueName(Constants.HOBBIT_ACK_EXCHANGE_NAME), "fanout", false, true, null); } @@ -256,6 +180,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie @Override public void run() throws Exception { sendToCmdQueue(Commands.EVAL_STORAGE_READY_SIGNAL); + terminationMutex.acquire(); taskResultReceiver.closeWhenFinished(); systemResultReceiver.closeWhenFinished(); @@ -275,7 +200,7 @@ public void receiveCommand(byte command, byte[] data) { public void close() throws IOException { IOUtils.closeQuietly(taskResultReceiver); IOUtils.closeQuietly(systemResultReceiver); - IOUtils.closeQuietly(evalModule2EvalStoreQueue); + //IOUtils.closeQuietly(evalModule2EvalStoreQueue); if (ackChannel != null) { try { ackChannel.close(); @@ -285,4 +210,275 @@ public void close() throws IOException { } super.close(); } + + private Object getTaskResultConsumer() { + Object taskresultconsumer = null; + if (isRabbitMQEnabled()) { + taskresultconsumer = getDataHandler(); + }else { + taskresultconsumer= getDirectHandler(); + } + return taskresultconsumer; + } + + private Object getDataHandler() { + return new DataHandler() { + @Override + public void handleData(byte[] data) { + ByteBuffer buffer = ByteBuffer.wrap(data); + String taskId = RabbitMQUtils.readString(buffer); + LOGGER.trace("Received from task generator {}.", taskId); + byte[] taskData = RabbitMQUtils.readByteArray(buffer); + long timestamp = buffer.getLong(); + receiveExpectedResponseData(taskId, timestamp, taskData); + } + }; + } + + private Object getDirectHandler() { + return new DirectCallback() { + @Override + public void callback(byte[] data, List classs, BasicProperties props) { + ByteBuffer buffer = ByteBuffer.wrap(data); + String taskId = RabbitMQUtils.readString(buffer); + LOGGER.debug("Received from task generator {}.", taskId); + byte[] taskData = RabbitMQUtils.readByteArray(buffer); + long timestamp = buffer.getLong(); + receiveExpectedResponseData(taskId, timestamp, taskData); + + } + + }; + } + + private Object getSystemResultConsumer(boolean receiveTimeStamp, String ackExchangeName) { + Object systemresultconsumer = null; + if (isRabbitMQEnabled()) { + systemresultconsumer = getSystemDataHandler(receiveTimeStamp, ackExchangeName); + }else { + systemresultconsumer = getSystemDirectHandler(receiveTimeStamp, ackExchangeName); + } + return systemresultconsumer; + } + + private Object getSystemDataHandler(boolean receiveTimeStamp, String ackExchangeName) { + return new DataHandler() { + @Override + public void handleData(byte[] data) { + ByteBuffer buffer = ByteBuffer.wrap(data); + String taskId = RabbitMQUtils.readString(buffer); + LOGGER.trace("Received from system {}.", taskId); + byte[] responseData = RabbitMQUtils.readByteArray(buffer); + long timestamp = receiveTimeStamp ? buffer.getLong() : System.currentTimeMillis(); + receiveResponseData(taskId, timestamp, responseData); + // If we should send acknowledgments (and there was no + // error until now) + if (ackChannel != null) { + try { + //ackChannel.basicPublish(ackExchangeName, "", null, RabbitMQUtils.writeString(taskId)); + ByteBuffer buf = ByteBuffer.wrap(RabbitMQUtils.writeString(taskId)); + ackChannel.writeBytes(buf, ackExchangeName, "", null); + } catch (Exception e) { + LOGGER.error("Error while sending acknowledgement.", e); + } + LOGGER.trace("Sent ack {}.", taskId); + } + } + }; + } + + private Object getSystemDirectHandler(boolean receiveTimeStamp, String ackExchangeName) { + return new DirectCallback() { + @Override + public void callback(byte[] data, List classs, BasicProperties props) { + ByteBuffer buffer = ByteBuffer.wrap(data); + String taskId = RabbitMQUtils.readString(buffer); + LOGGER.trace("Received from system {}.", taskId); + byte[] responseData = RabbitMQUtils.readByteArray(buffer); + long timestamp = receiveTimeStamp ? buffer.getLong() : System.currentTimeMillis(); + receiveResponseData(taskId, timestamp, responseData); + // If we should send acknowledgments (and there was no + // error until now) + if (ackChannel != null) { + try { + //ackChannel.basicPublish(ackExchangeName, "", null, RabbitMQUtils.writeString(taskId)); + ByteBuffer buf = ByteBuffer.wrap(RabbitMQUtils.writeString(taskId)); + ackChannel.writeBytes(buf, ackExchangeName, null, null); + } catch (Exception e) { + LOGGER.error("Error while sending acknowledgement.", e); + } + LOGGER.trace("Sent ack {}.", taskId); + } + + } + + }; + } + + private Object getConsumerCallback(String queueName) { + Object consumerCallback = null; + if(isRabbitMQEnabled()) { + consumerCallback = getDefaultConsumer(queueName); + } else { + consumerCallback = getDirectConsumer(queueName); + } + return consumerCallback; + } + + private Object getDefaultConsumer(String queueName) { + + try { + evalModule2EvalStoreQueue = ((RabbitMQChannel)getFactoryForIncomingDataQueues()).getCmdQueueFactory() + .createDefaultRabbitQueue(generateSessionQueueName(queueName)); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + return new DefaultConsumer(evalModule2EvalStoreQueue.channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, + byte[] body) throws IOException { + byte response[] = null; + // get iterator id + ByteBuffer buffer = ByteBuffer.wrap(body); + if (buffer.remaining() < 1) { + response = EMPTY_RESPONSE; + LOGGER.error("Got a request without a valid iterator Id. Returning emtpy response."); + } else { + byte iteratorId = buffer.get(); + + // get the iterator + Iterator iterator = null; + if (iteratorId == NEW_ITERATOR_ID) { + // create and save a new iterator + iteratorId = (byte) resultPairIterators.size(); + LOGGER.info("Creating new iterator #{}", iteratorId); + resultPairIterators.add(iterator = createIterator()); + } else if ((iteratorId < 0) || iteratorId >= resultPairIterators.size()) { + response = EMPTY_RESPONSE; + LOGGER.error("Got a request without a valid iterator Id (" + Byte.toString(iteratorId) + + "). Returning emtpy response."); + } else { + iterator = resultPairIterators.get(iteratorId); + } + if ((iterator != null) && (iterator.hasNext())) { + ResultPair resultPair = iterator.next(); + Result result = resultPair.getExpected(); + byte expectedResultData[], expectedResultTimeStamp[], actualResultData[], + actualResultTimeStamp[]; + // Make sure that the result is not null + if (result != null) { + // Check whether the data array is null + expectedResultData = result.getData() != null ? result.getData() : new byte[0]; + expectedResultTimeStamp = RabbitMQUtils.writeLong(result.getSentTimestamp()); + } else { + expectedResultData = new byte[0]; + expectedResultTimeStamp = RabbitMQUtils.writeLong(0); + } + result = resultPair.getActual(); + // Make sure that the result is not null + if (result != null) { + // Check whether the data array is null + actualResultData = result.getData() != null ? result.getData() : new byte[0]; + actualResultTimeStamp = RabbitMQUtils.writeLong(result.getSentTimestamp()); + } else { + actualResultData = new byte[0]; + actualResultTimeStamp = RabbitMQUtils.writeLong(0); + } + + response = RabbitMQUtils + .writeByteArrays( + new byte[] { iteratorId }, new byte[][] { expectedResultTimeStamp, + expectedResultData, actualResultTimeStamp, actualResultData }, + null); + } else { + response = new byte[] { iteratorId }; + } + } + getChannel().basicPublish("", properties.getReplyTo(), null, response); + } + }; + } + + private Object getDirectConsumer(String queueName) { + + return new DirectCallback(evaluationStorageChannel,generateSessionQueueName(queueName), null) { + @Override + public void callback(byte[] data, List cmdCallbackObjectList, BasicProperties props) { + for(Object cmdCallbackObject:cmdCallbackObjectList) { + if(cmdCallbackObject != null && + cmdCallbackObject instanceof AbstractEvaluationStorage) { + byte response[] = null; + // get iterator id + ByteBuffer buffer = ByteBuffer.wrap(data); + if (buffer.remaining() < 1) { + response = EMPTY_RESPONSE; + LOGGER.error("Got a request without a valid iterator Id. Returning emtpy response."); + } else { + byte iteratorId = buffer.get(); + + // get the iterator + Iterator iterator = null; + if (iteratorId == NEW_ITERATOR_ID) { + // create and save a new iterator + iteratorId = (byte) resultPairIterators.size(); + LOGGER.info("Creating new iterator #{}", iteratorId); + resultPairIterators.add(iterator = createIterator()); + } else if ((iteratorId < 0) || iteratorId >= resultPairIterators.size()) { + response = EMPTY_RESPONSE; + LOGGER.error("Got a request without a valid iterator Id (" + Byte.toString(iteratorId) + + "). Returning emtpy response."); + } else { + iterator = resultPairIterators.get(iteratorId); + } + if ((iterator != null) && (iterator.hasNext())) { + ResultPair resultPair = iterator.next(); + Result result = resultPair.getExpected(); + byte expectedResultData[], expectedResultTimeStamp[], actualResultData[], + actualResultTimeStamp[]; + // Make sure that the result is not null + if (result != null) { + // Check whether the data array is null + expectedResultData = result.getData() != null ? result.getData() : new byte[0]; + expectedResultTimeStamp = RabbitMQUtils.writeLong(result.getSentTimestamp()); + } else { + expectedResultData = new byte[0]; + expectedResultTimeStamp = RabbitMQUtils.writeLong(0); + } + result = resultPair.getActual(); + // Make sure that the result is not null + if (result != null) { + // Check whether the data array is null + actualResultData = result.getData() != null ? result.getData() : new byte[0]; + actualResultTimeStamp = RabbitMQUtils.writeLong(result.getSentTimestamp()); + } else { + actualResultData = new byte[0]; + actualResultTimeStamp = RabbitMQUtils.writeLong(0); + } + + response = RabbitMQUtils + .writeByteArrays( + new byte[] { iteratorId }, new byte[][] { expectedResultTimeStamp, + expectedResultData, actualResultTimeStamp, actualResultData }, + null); + } else { + response = new byte[] { iteratorId }; + } + } + try { + //Thread.sleep(0, 1000); + ByteBuffer resposebuffer = ByteBuffer.allocate(response.length); + channel.writeBytes(resposebuffer, null, queue, null); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + // getChannel().basicPublish("", properties.getReplyTo(), null, response); + + } + } + } + }; + } } diff --git a/src/main/java/org/hobbit/core/components/AbstractPlatformConnectorComponent.java b/src/main/java/org/hobbit/core/components/AbstractPlatformConnectorComponent.java index 896da49..221b338 100644 --- a/src/main/java/org/hobbit/core/components/AbstractPlatformConnectorComponent.java +++ b/src/main/java/org/hobbit/core/components/AbstractPlatformConnectorComponent.java @@ -22,6 +22,7 @@ import org.hobbit.core.Commands; import org.hobbit.core.Constants; +import org.hobbit.core.com.Channel; import org.hobbit.core.rabbit.RabbitMQUtils; import org.hobbit.core.rabbit.RabbitQueueFactory; @@ -79,22 +80,22 @@ protected void addContainerObserver(String containerName, ContainerStateObserver } @Override - public RabbitQueueFactory getFactoryForIncomingCmdQueues() { - return cmdQueueFactory; + public Channel getFactoryForIncomingCmdQueues() { + return commonChannel; } @Override - public RabbitQueueFactory getFactoryForIncomingDataQueues() { + public Channel getFactoryForIncomingDataQueues() { return incomingDataQueueFactory; } @Override - public RabbitQueueFactory getFactoryForOutgoingCmdQueues() { - return cmdQueueFactory; + public Channel getFactoryForOutgoingCmdQueues() { + return commonChannel; } @Override - public RabbitQueueFactory getFactoryForOutgoingDataQueues() { + public Channel getFactoryForOutgoingDataQueues() { return outgoingDataQueuefactory; } diff --git a/src/main/java/org/hobbit/core/components/AbstractSequencingTaskGenerator.java b/src/main/java/org/hobbit/core/components/AbstractSequencingTaskGenerator.java index 7109848..7447fd2 100644 --- a/src/main/java/org/hobbit/core/components/AbstractSequencingTaskGenerator.java +++ b/src/main/java/org/hobbit/core/components/AbstractSequencingTaskGenerator.java @@ -22,6 +22,7 @@ import java.util.concurrent.TimeoutException; import org.hobbit.core.Constants; +import org.hobbit.core.rabbit.RabbitMQChannel; import org.hobbit.core.rabbit.RabbitMQUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,7 +77,7 @@ public abstract class AbstractSequencingTaskGenerator extends AbstractTaskGenera /** * Channel on which the acknowledgments are received. */ - protected Channel ackChannel; + //protected Channel ackChannel; public AbstractSequencingTaskGenerator() { // TODO remove this 1 from the constructor @@ -92,24 +93,14 @@ public void init() throws Exception { super.init(); // Create channel for incoming acknowledgments using the command // connection (not the data connection!) - ackChannel = getFactoryForIncomingCmdQueues().getConnection().createChannel(); - String queueName = ackChannel.queueDeclare().getQueue(); + //ackChannel = ((RabbitMQChannel)getFactoryForIncomingCmdQueues()).getCmdQueueFactory().createChannel(); + String queueName = getFactoryForIncomingCmdQueues().declareQueue(null); String exchangeName = generateSessionQueueName(Constants.HOBBIT_ACK_EXCHANGE_NAME); - ackChannel.exchangeDeclare(exchangeName, "fanout", false, true, null); - ackChannel.queueBind(queueName, exchangeName, ""); - Consumer consumer = new DefaultConsumer(ackChannel) { - @Override - public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, - byte[] body) throws IOException { - try { - handleAck(body); - } catch (Exception e) { - LOGGER.error("Exception while trying to handle incoming command.", e); - } - } - }; - ackChannel.basicConsume(queueName, true, consumer); - ackChannel.basicQos(1); + getFactoryForIncomingCmdQueues().exchangeDeclare(exchangeName, "fanout", false, true, null); + getFactoryForIncomingCmdQueues().queueBind(queueName, exchangeName, ""); + Object consumer = getConsumer(); + getFactoryForIncomingCmdQueues().readBytes(consumer, this, true, queueName); + ((Channel)getFactoryForIncomingCmdQueues().getChannel()).basicQos(1); } /** @@ -218,9 +209,26 @@ public void setAckTimeout(long ackTimeout) { @Override public void close() throws IOException { try { - ackChannel.close(); + ((Channel)getFactoryForIncomingCmdQueues().getChannel()).close(); } catch (TimeoutException e) { } super.close(); } + + private Object getConsumer() { + if(isRabbitMQEnabled()) { + return new DefaultConsumer(((Channel)getFactoryForIncomingCmdQueues().getChannel())) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, + byte[] body) throws IOException { + try { + handleAck(body); + } catch (Exception e) { + LOGGER.error("Exception while trying to handle incoming command.", e); + } + } + }; + } + return null; + } } diff --git a/src/main/java/org/hobbit/core/components/AbstractSystemAdapter.java b/src/main/java/org/hobbit/core/components/AbstractSystemAdapter.java index 5ce6ba1..2029d1e 100644 --- a/src/main/java/org/hobbit/core/components/AbstractSystemAdapter.java +++ b/src/main/java/org/hobbit/core/components/AbstractSystemAdapter.java @@ -18,24 +18,25 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; import java.util.concurrent.Semaphore; - import org.apache.commons.io.Charsets; import org.apache.commons.io.IOUtils; import org.apache.jena.rdf.model.Model; import org.apache.jena.rdf.model.ModelFactory; import org.hobbit.core.Commands; import org.hobbit.core.Constants; -import org.hobbit.core.rabbit.DataHandler; -import org.hobbit.core.rabbit.DataReceiver; -import org.hobbit.core.rabbit.DataReceiverImpl; -import org.hobbit.core.rabbit.DataSender; -import org.hobbit.core.rabbit.DataSenderImpl; +import org.hobbit.core.com.DataHandler; +import org.hobbit.core.com.DataReceiver; +import org.hobbit.core.com.DataSender; +import org.hobbit.core.com.java.DirectCallback; +import org.hobbit.core.components.communicationfactory.SenderReceiverFactory; import org.hobbit.core.rabbit.RabbitMQUtils; import org.hobbit.utils.EnvVariables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.rabbitmq.client.AMQP.BasicProperties; /** * This abstract class implements basic functions that can be used to implement * a system adapter. @@ -114,33 +115,21 @@ public void init() throws Exception { // Get the benchmark parameter model systemParamModel = EnvVariables.getModel(Constants.SYSTEM_PARAMETERS_MODEL_KEY, - () -> ModelFactory.createDefaultModel(), LOGGER); - - dataGenReceiver = DataReceiverImpl.builder().maxParallelProcessedMsgs(maxParallelProcessedMsgs) - .queue(incomingDataQueueFactory, generateSessionQueueName(Constants.DATA_GEN_2_SYSTEM_QUEUE_NAME)) - .dataHandler(new DataHandler() { - @Override - public void handleData(byte[] data) { - receiveGeneratedData(data); - } - }).build(); - - taskGenReceiver = DataReceiverImpl.builder().maxParallelProcessedMsgs(maxParallelProcessedMsgs) - .queue(incomingDataQueueFactory, generateSessionQueueName(Constants.TASK_GEN_2_SYSTEM_QUEUE_NAME)) - .dataHandler(new DataHandler() { - @Override - public void handleData(byte[] data) { - ByteBuffer buffer = ByteBuffer.wrap(data); - String taskId = RabbitMQUtils.readString(buffer); - byte[] taskData = RabbitMQUtils.readByteArray(buffer); - receiveGeneratedTask(taskId, taskData); - } - }).build(); - - sender2EvalStore = DataSenderImpl.builder().queue(getFactoryForOutgoingDataQueues(), - generateSessionQueueName(Constants.SYSTEM_2_EVAL_STORAGE_DEFAULT_QUEUE_NAME)).build(); + () -> ModelFactory.createDefaultModel(), LOGGER); + Object dataGenReceiverConsumer= getDataReceiverHandler(); + + dataGenReceiver = SenderReceiverFactory.getReceiverImpl(isRabbitMQEnabled(), + generateSessionQueueName(Constants.DATA_GEN_2_SYSTEM_QUEUE_NAME), dataGenReceiverConsumer, maxParallelProcessedMsgs,this); + + Object taskGenReceiverConsumer= getTaskReceiverHandler(); + taskGenReceiver = SenderReceiverFactory.getReceiverImpl(isRabbitMQEnabled(), + generateSessionQueueName(Constants.TASK_GEN_2_SYSTEM_QUEUE_NAME), taskGenReceiverConsumer, maxParallelProcessedMsgs,this); + + sender2EvalStore = SenderReceiverFactory.getSenderImpl(isRabbitMQEnabled(), + generateSessionQueueName(Constants.SYSTEM_2_EVAL_STORAGE_DEFAULT_QUEUE_NAME), this); } + @Override public void run() throws Exception { sendToCmdQueue(Commands.SYSTEM_READY_SIGNAL); @@ -198,7 +187,7 @@ protected void sendResultToEvalStorage(String taskIdString, byte[] data) throws * Starts termination of the main thread of this system adapter. If a cause is * given, it will be thrown causing an abortion from the main thread instead of * a normal termination. - * + * * @param cause * the cause for an abortion of the process or {code null} if the * component should terminate in a normal way. @@ -223,7 +212,63 @@ public void close() throws IOException { IOUtils.closeQuietly(taskGenReceiver); // Close the sender (we shouldn't close it before this point since we want to be // sure that all results have been sent) - sender2EvalStore.closeWhenFinished(); + //sender2EvalStore.closeWhenFinished(); super.close(); } + + private Object getDataReceiverHandler() { + if(isRabbitMQEnabled()) { + return getDataReceiverDataHandler(); + } + return getDataReceiverDirectHandler(); + } + + private Object getDataReceiverDirectHandler() { + return new DirectCallback() { + @Override + public void callback(byte[] data, List classs, BasicProperties props) { + receiveGeneratedData(data); + } + }; + } + + private Object getDataReceiverDataHandler() { + return new DataHandler() { + @Override + public void handleData(byte[] data) { + receiveGeneratedData(data); + } + }; + } + + private Object getTaskReceiverHandler() { + if(isRabbitMQEnabled()) { + return getTaskReceiverDataHandler(); + } + return getTaskReceiverDirectHandler(); + } + + private Object getTaskReceiverDirectHandler() { + return new DirectCallback() { + @Override + public void callback(byte[] data, List classs, BasicProperties props) { + ByteBuffer buffer = ByteBuffer.wrap(data); + String taskId = RabbitMQUtils.readString(buffer); + byte[] taskData = RabbitMQUtils.readByteArray(buffer); + receiveGeneratedTask(taskId, taskData); + } + }; + } + + private Object getTaskReceiverDataHandler() { + return new DataHandler() { + @Override + public void handleData(byte[] data) { + ByteBuffer buffer = ByteBuffer.wrap(data); + String taskId = RabbitMQUtils.readString(buffer); + byte[] taskData = RabbitMQUtils.readByteArray(buffer); + receiveGeneratedTask(taskId, taskData); + } + }; + } } diff --git a/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java b/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java index 5fb0cf0..b13511c 100644 --- a/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java +++ b/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java @@ -17,22 +17,31 @@ package org.hobbit.core.components; import java.io.IOException; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import org.apache.commons.io.IOUtils; import org.hobbit.core.Commands; import org.hobbit.core.Constants; -import org.hobbit.core.rabbit.DataHandler; -import org.hobbit.core.rabbit.DataReceiver; +import org.hobbit.core.com.Channel; +import org.hobbit.core.com.DataHandler; +import org.hobbit.core.com.DataReceiver; +import org.hobbit.core.com.DataSender; +import org.hobbit.core.com.java.DirectCallback; +import org.hobbit.core.components.communicationfactory.ChannelFactory; +import org.hobbit.core.components.communicationfactory.SenderReceiverFactory; import org.hobbit.core.rabbit.DataReceiverImpl; -import org.hobbit.core.rabbit.DataSender; import org.hobbit.core.rabbit.DataSenderImpl; import org.hobbit.core.rabbit.RabbitMQUtils; import org.hobbit.utils.EnvVariables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.rabbitmq.client.AMQP; import com.rabbitmq.client.QueueingConsumer; +import com.rabbitmq.client.AMQP.BasicProperties; /** * This abstract class implements basic functions that can be used to implement @@ -124,18 +133,15 @@ public void init() throws Exception { nextTaskId = generatorId; numberOfGenerators = EnvVariables.getInt(Constants.GENERATOR_COUNT_KEY); - sender2System = DataSenderImpl.builder().queue(getFactoryForOutgoingDataQueues(), - generateSessionQueueName(Constants.TASK_GEN_2_SYSTEM_QUEUE_NAME)).build(); - sender2EvalStore = DataSenderImpl.builder().queue(getFactoryForOutgoingDataQueues(), - generateSessionQueueName(Constants.TASK_GEN_2_EVAL_STORAGE_DEFAULT_QUEUE_NAME)).build(); + sender2System = SenderReceiverFactory.getSenderImpl(isRabbitMQEnabled(), + generateSessionQueueName(Constants.TASK_GEN_2_SYSTEM_QUEUE_NAME), this); + sender2EvalStore = SenderReceiverFactory.getSenderImpl(isRabbitMQEnabled(), + generateSessionQueueName(Constants.TASK_GEN_2_EVAL_STORAGE_DEFAULT_QUEUE_NAME), this); + + Object dataGenReceiverConsumer= getDataReceiverHandler(); + dataGenReceiver = SenderReceiverFactory.getReceiverImpl(isRabbitMQEnabled(), + generateSessionQueueName(Constants.DATA_GEN_2_TASK_GEN_QUEUE_NAME), dataGenReceiverConsumer,maxParallelProcessedMsgs,this); - dataGenReceiver = DataReceiverImpl.builder().dataHandler(new DataHandler() { - @Override - public void handleData(byte[] data) { - receiveGeneratedData(data); - } - }).maxParallelProcessedMsgs(maxParallelProcessedMsgs).queue(getFactoryForIncomingDataQueues(), - generateSessionQueueName(Constants.DATA_GEN_2_TASK_GEN_QUEUE_NAME)).build(); } @Override @@ -214,7 +220,7 @@ public void receiveCommand(byte command, byte[] data) { */ protected void sendTaskToEvalStorage(String taskIdString, long timestamp, byte[] data) throws IOException { sender2EvalStore.sendData(RabbitMQUtils.writeByteArrays(null, - new byte[][] { RabbitMQUtils.writeString(taskIdString), data }, RabbitMQUtils.writeLong(timestamp))); + new byte[][] { RabbitMQUtils.writeString(taskIdString), data }, RabbitMQUtils.writeLong(timestamp))); } /** @@ -247,4 +253,29 @@ public void close() throws IOException { IOUtils.closeQuietly(sender2System); super.close(); } + + private Object getDataReceiverHandler() { + if(isRabbitMQEnabled()) { + return getDataReceiverDataHandler(); + } + return getDataReceiverDirectHandler(); + } + + private Object getDataReceiverDataHandler() { + return new DataHandler() { + @Override + public void handleData(byte[] data) { + receiveGeneratedData(data); + } + }; + } + + private Object getDataReceiverDirectHandler() { + return new DirectCallback() { + @Override + public void callback(byte[] data, List classs, BasicProperties props) { + receiveGeneratedData(data); + } + }; + } } diff --git a/src/main/java/org/hobbit/core/components/PlatformConnector.java b/src/main/java/org/hobbit/core/components/PlatformConnector.java index 9e637e0..482c501 100644 --- a/src/main/java/org/hobbit/core/components/PlatformConnector.java +++ b/src/main/java/org/hobbit/core/components/PlatformConnector.java @@ -17,6 +17,7 @@ package org.hobbit.core.components; import org.hobbit.core.Commands; +import org.hobbit.core.com.Channel; import org.hobbit.core.rabbit.RabbitQueueFactory; /** @@ -54,8 +55,8 @@ public interface PlatformConnector { */ public void stopContainer(String containerName); - public RabbitQueueFactory getFactoryForOutgoingDataQueues(); - public RabbitQueueFactory getFactoryForIncomingDataQueues(); - public RabbitQueueFactory getFactoryForOutgoingCmdQueues(); - public RabbitQueueFactory getFactoryForIncomingCmdQueues(); + public Channel getFactoryForOutgoingDataQueues(); + public Channel getFactoryForIncomingDataQueues(); + public Channel getFactoryForOutgoingCmdQueues(); + public Channel getFactoryForIncomingCmdQueues(); } diff --git a/src/main/java/org/hobbit/core/components/communicationfactory/ChannelFactory.java b/src/main/java/org/hobbit/core/components/communicationfactory/ChannelFactory.java new file mode 100644 index 0000000..69b2cd8 --- /dev/null +++ b/src/main/java/org/hobbit/core/components/communicationfactory/ChannelFactory.java @@ -0,0 +1,37 @@ +package org.hobbit.core.components.communicationfactory; + +import org.hobbit.core.Constants; +import org.hobbit.core.com.Channel; +import org.hobbit.core.com.java.DirectChannel; +import org.hobbit.core.rabbit.RabbitMQChannel; + +import com.rabbitmq.client.ConnectionFactory; +/** + * This factory class provides an instance of {@link RabbitMQChannel} or {@link DirectChannel} + * based on the environment property {@link org.hobbit.core.Constants#IS_RABBIT_MQ_ENABLED} + * + * @author Altafhusen Makandar + * @author Sourabh Poddar + * @author Yamini Punetha + * @author Melissa Das + * + */ +public class ChannelFactory { + + /** + * Factory method to get the instance of {@link RabbitMQChannel} or {@link DirectChannel} + * based on the environment configuration {@link Constants#IS_RABBIT_MQ_ENABLED} + * + * @param rabbitMQEnabled an environment variable that returns a boolean value + * @param queue + * @param connectionFactory an instance of {@link ConnectionFactory} + * @return instance of {@link RabbitMQChannel} or {@link DirectChannel} + */ + public Channel getChannel(boolean rabbitMQEnabled, String queue, ConnectionFactory connectionFactory) { + if(rabbitMQEnabled){ + return new RabbitMQChannel(connectionFactory); + } + return new DirectChannel(queue); + } + +} diff --git a/src/main/java/org/hobbit/core/components/communicationfactory/SenderReceiverFactory.java b/src/main/java/org/hobbit/core/components/communicationfactory/SenderReceiverFactory.java new file mode 100644 index 0000000..fb7d9d2 --- /dev/null +++ b/src/main/java/org/hobbit/core/components/communicationfactory/SenderReceiverFactory.java @@ -0,0 +1,49 @@ +package org.hobbit.core.components.communicationfactory; + +import org.hobbit.core.Constants; +import org.hobbit.core.com.DataHandler; +import org.hobbit.core.com.DataReceiver; +import org.hobbit.core.com.DataSender; +import org.hobbit.core.com.java.DirectChannel; +import org.hobbit.core.com.java.DirectReceiverImpl; +import org.hobbit.core.com.java.DirectSenderImpl; +import org.hobbit.core.components.AbstractPlatformConnectorComponent; +import org.hobbit.core.rabbit.DataReceiverImpl; +import org.hobbit.core.rabbit.DataSenderImpl; +import org.hobbit.core.rabbit.RabbitMQChannel; +/** + * This factory class provides the instance of {@link DataSender} and {@link DataReceiver} + * for {@link RabbitMQChannel} or {@link DirectChannel} based on the environment property + * {@link org.hobbit.core.Constants#IS_RABBIT_MQ_ENABLED} + * @author Altafhusen Makandar + * + */ +public class SenderReceiverFactory { + + /** + * Factory method to fetch the instance of {@link DataSenderImpl} or {@link DirectSenderImpl} + * based on the environment configuration {@link Constants#IS_RABBIT_MQ_ENABLED} + */ + public static DataSender getSenderImpl(boolean isRabbitEnabled, String queue, AbstractPlatformConnectorComponent object) throws Exception { + if(isRabbitEnabled) { + return DataSenderImpl.builder().queue(((RabbitMQChannel)object.getFactoryForOutgoingDataQueues()).getCmdQueueFactory(), + queue).build(); + } + return new DirectSenderImpl(queue); + } + + /** + * Factory method to fetch the instance of {@link DataReceiverImpl} or {@link DirectReceiverImpl} + * based on the environment configuration {@link Constants#IS_RABBIT_MQ_ENABLED} + */ + public static DataReceiver getReceiverImpl(boolean isRabbitEnabled, String queue, Object consumer, + int maxParallelProcessedMsgs, AbstractPlatformConnectorComponent object ) throws Exception { + if(isRabbitEnabled) { + return DataReceiverImpl.builder().maxParallelProcessedMsgs(maxParallelProcessedMsgs). + queue(((RabbitMQChannel)object.getFactoryForIncomingDataQueues()).getCmdQueueFactory(), + queue).dataHandler((DataHandler) consumer).build(); + } + return new DirectReceiverImpl(queue, consumer); + } + +} diff --git a/src/main/java/org/hobbit/core/components/utils/SystemResourceUsageRequester.java b/src/main/java/org/hobbit/core/components/utils/SystemResourceUsageRequester.java index 1619add..0431c33 100644 --- a/src/main/java/org/hobbit/core/components/utils/SystemResourceUsageRequester.java +++ b/src/main/java/org/hobbit/core/components/utils/SystemResourceUsageRequester.java @@ -11,6 +11,7 @@ import org.hobbit.core.components.AbstractCommandReceivingComponent; import org.hobbit.core.components.PlatformConnector; import org.hobbit.core.data.usage.ResourceUsageInformation; +import org.hobbit.core.rabbit.RabbitMQChannel; import org.hobbit.core.rabbit.RabbitMQUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,8 +27,8 @@ public class SystemResourceUsageRequester implements Closeable { public static SystemResourceUsageRequester create(PlatformConnector connector, String sessionId) { try { - Channel cmdChannel = connector.getFactoryForOutgoingCmdQueues().createChannel(); - Channel incomingChannel = connector.getFactoryForIncomingDataQueues().createChannel(); + Channel cmdChannel = ((RabbitMQChannel)connector.getFactoryForOutgoingCmdQueues()).getCmdQueueFactory().createChannel(); + Channel incomingChannel = ((RabbitMQChannel)connector.getFactoryForIncomingDataQueues()).getCmdQueueFactory().createChannel(); String responseQueueName = null; // if (responseQueueName == null) { responseQueueName = incomingChannel.queueDeclare().getQueue(); diff --git a/src/main/java/org/hobbit/core/mimic/DockerBasedMimickingAlg.java b/src/main/java/org/hobbit/core/mimic/DockerBasedMimickingAlg.java index 5a9460a..e0080d3 100644 --- a/src/main/java/org/hobbit/core/mimic/DockerBasedMimickingAlg.java +++ b/src/main/java/org/hobbit/core/mimic/DockerBasedMimickingAlg.java @@ -26,6 +26,7 @@ import org.hobbit.core.components.ContainerStateObserver; import org.hobbit.core.components.PlatformConnector; import org.hobbit.core.data.RabbitQueue; +import org.hobbit.core.rabbit.RabbitMQChannel; import org.hobbit.core.rabbit.SimpleFileReceiver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +69,7 @@ public void generateData(String outputDirectory, String[] envVariables) throws E SimpleFileReceiver receiver = null; try { // create the queue to get data from the container - queue = connector.getFactoryForIncomingDataQueues() + queue = ((RabbitMQChannel)connector.getFactoryForIncomingDataQueues()).getCmdQueueFactory() .createDefaultRabbitQueue(UUID.randomUUID().toString().replace("-", "")); // create a receiver that writes incoming data to the files receiver = SimpleFileReceiver.create(queue); diff --git a/src/main/java/org/hobbit/core/rabbit/DataReceiverImpl.java b/src/main/java/org/hobbit/core/rabbit/DataReceiverImpl.java index 24cfc1c..0697e96 100644 --- a/src/main/java/org/hobbit/core/rabbit/DataReceiverImpl.java +++ b/src/main/java/org/hobbit/core/rabbit/DataReceiverImpl.java @@ -6,6 +6,8 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.io.IOUtils; +import org.hobbit.core.com.DataHandler; +import org.hobbit.core.com.DataReceiver; import org.hobbit.core.data.RabbitQueue; import org.hobbit.utils.TerminatableRunnable; import org.slf4j.Logger; diff --git a/src/main/java/org/hobbit/core/rabbit/DataSenderImpl.java b/src/main/java/org/hobbit/core/rabbit/DataSenderImpl.java index 61298a0..98e15a7 100644 --- a/src/main/java/org/hobbit/core/rabbit/DataSenderImpl.java +++ b/src/main/java/org/hobbit/core/rabbit/DataSenderImpl.java @@ -7,6 +7,7 @@ import java.util.concurrent.Semaphore; import org.apache.commons.io.IOUtils; +import org.hobbit.core.com.DataSender; import org.hobbit.core.data.RabbitQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/org/hobbit/core/rabbit/RabbitMQChannel.java b/src/main/java/org/hobbit/core/rabbit/RabbitMQChannel.java new file mode 100644 index 0000000..dc7ed0b --- /dev/null +++ b/src/main/java/org/hobbit/core/rabbit/RabbitMQChannel.java @@ -0,0 +1,138 @@ +package org.hobbit.core.rabbit; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.commons.io.IOUtils; +import org.hobbit.core.com.Channel; +import org.hobbit.core.components.AbstractCommandReceivingComponent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.Consumer; +import com.rabbitmq.client.AMQP.BasicProperties; + +public class RabbitMQChannel implements Channel { + + private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQChannel.class); + + protected ConnectionFactory connectionFactory; + + protected String rabbitMQHostName; + + protected RabbitQueueFactory cmdQueueFactory; + + protected com.rabbitmq.client.Channel cmdChannel = null; + + private String queueName; + /** + * Maximum number of retries that are executed to connect to RabbitMQ. + */ + public static final int NUMBER_OF_RETRIES_TO_CONNECT_TO_RABBIT_MQ = 5; + /** + * Time, the system waits before retrying to connect to RabbitMQ. Note that this + * time will be multiplied with the number of already failed tries. + */ + public static final long START_WAITING_TIME_BEFORE_RETRY = 5000; + + public RabbitMQChannel(ConnectionFactory connectionFactory) { + this.connectionFactory = connectionFactory; + } + + @Override + public void readBytes(Object callback, Object classs, Boolean autoAck, String queue) throws IOException { + if(autoAck == null) { + cmdChannel.basicConsume(queue, (Consumer) callback); + }else { + cmdChannel.basicConsume(queue, autoAck, (Consumer) callback); + } + } + + @Override + public void writeBytes(byte[] data, String exchange, String routingKey, BasicProperties props) throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(data.length); + buffer.put(data); + writeBytes(buffer, exchange, routingKey, props); + } + + @Override + public void writeBytes(ByteBuffer buffer, String exchange, String routingKey, BasicProperties props) throws IOException { + cmdChannel.basicPublish(exchange, routingKey, props, buffer.array()); + } + + @Override + public void close() { + IOUtils.closeQuietly(cmdQueueFactory); + } + + protected Connection createConnection() throws Exception { + Connection connection = null; + Exception exception = null; + for (int i = 0; (connection == null) && (i <= NUMBER_OF_RETRIES_TO_CONNECT_TO_RABBIT_MQ); ++i) { + try { + connection = connectionFactory.newConnection(); + } catch (Exception e) { + if (i < NUMBER_OF_RETRIES_TO_CONNECT_TO_RABBIT_MQ) { + long waitingTime = START_WAITING_TIME_BEFORE_RETRY * (i + 1); + LOGGER.warn("Couldn't connect to RabbitMQ with try #" + i + ". Next try in " + waitingTime + "ms."); + exception = e; + try { + Thread.sleep(waitingTime); + } catch (Exception e2) { + LOGGER.warn("Interrupted while waiting before retrying to connect to RabbitMQ.", e2); + } + } + } + } + if (connection == null) { + String msg = "Couldn't connect to RabbitMQ after " + NUMBER_OF_RETRIES_TO_CONNECT_TO_RABBIT_MQ + + " retries."; + LOGGER.error(msg, exception); + throw new Exception(msg, exception); + } + return connection; + } + + @Override + public void createChannel() throws Exception { + cmdQueueFactory = new RabbitQueueFactoryImpl(createConnection()); + cmdChannel = cmdQueueFactory.getConnection().createChannel(); + } + + @Override + public String getQueueName(AbstractCommandReceivingComponent abstractCommandReceivingComponent) throws Exception { + return this.queueName; + } + + @Override + public void exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, + Map arguments) throws IOException { + cmdChannel.exchangeDeclare(exchange, type, durable, autoDelete, arguments); + } + + @Override + public void queueBind(String queue, String exchange, String routingKey) throws IOException { + cmdChannel.queueBind(queue, exchange, routingKey); + } + + @Override + public Object getChannel() { + return cmdChannel; + } + + public RabbitQueueFactory getCmdQueueFactory() { + return cmdQueueFactory; + } + + @Override + public String declareQueue(String queueName) throws IOException { + this.queueName = queueName; + if(queueName != null) { + cmdChannel.queueDeclare(queueName, false, false, true, null); + } else { + this.queueName = cmdChannel.queueDeclare().getQueue(); + } + return this.queueName; + } +} \ No newline at end of file diff --git a/src/main/java/org/hobbit/core/rabbit/SimpleFileSender.java b/src/main/java/org/hobbit/core/rabbit/SimpleFileSender.java index 6c0e905..e2e011a 100644 --- a/src/main/java/org/hobbit/core/rabbit/SimpleFileSender.java +++ b/src/main/java/org/hobbit/core/rabbit/SimpleFileSender.java @@ -23,6 +23,7 @@ import java.util.Arrays; import org.apache.commons.io.IOUtils; +import org.hobbit.core.com.Channel; import org.hobbit.core.data.RabbitQueue; import com.rabbitmq.client.MessageProperties; @@ -46,15 +47,17 @@ public class SimpleFileSender implements Closeable { private static final int DEFAULT_MESSAGE_SIZE = 65536; - public static SimpleFileSender create(RabbitQueueFactory factory, String queueName) throws IOException { - return new SimpleFileSender(factory.createDefaultRabbitQueue(queueName)); + public static SimpleFileSender create(Channel outgoingDataQueuefactory, String queueName) throws IOException { + return new SimpleFileSender(outgoingDataQueuefactory, queueName); } - private RabbitQueue queue; + private Channel channel; + private String queueName; private int messageSize = DEFAULT_MESSAGE_SIZE; - protected SimpleFileSender(RabbitQueue queue) { - this.queue = queue; + protected SimpleFileSender(Channel queue, String queueName) { + this.channel = queue; + this.queueName = queueName; } public void streamData(InputStream is, String name) throws IOException { @@ -71,8 +74,9 @@ public void streamData(InputStream is, String name) throws IOException { buffer.position(messageIdPos); buffer.putInt(messageId); length = is.read(array, dataStartPos, array.length - dataStartPos); - queue.channel.basicPublish("", queue.name, MessageProperties.MINIMAL_PERSISTENT_BASIC, - Arrays.copyOf(array, (length > 0) ? (dataStartPos + length) : dataStartPos)); + channel.declareQueue(queueName); + channel.writeBytes(Arrays.copyOf(array, (length > 0) ? (dataStartPos + length) : dataStartPos), "", + queueName, MessageProperties.MINIMAL_PERSISTENT_BASIC); ++messageId; } while (length > 0); } @@ -83,7 +87,8 @@ public void setMessageSize(int messageSize) { @Override public void close() { - IOUtils.closeQuietly(queue); + channel.close(); + //IOUtils.closeQuietly(queue); } } diff --git a/src/main/java/org/hobbit/core/rabbit/SimpleFileSenderRabbitMQ.java b/src/main/java/org/hobbit/core/rabbit/SimpleFileSenderRabbitMQ.java new file mode 100644 index 0000000..b478999 --- /dev/null +++ b/src/main/java/org/hobbit/core/rabbit/SimpleFileSenderRabbitMQ.java @@ -0,0 +1,56 @@ +package org.hobbit.core.rabbit; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.apache.commons.io.IOUtils; +import org.hobbit.core.data.RabbitQueue; + +import com.rabbitmq.client.MessageProperties; + +public class SimpleFileSenderRabbitMQ implements Closeable { + private static final int DEFAULT_MESSAGE_SIZE = 65536; + + public static SimpleFileSenderRabbitMQ create(RabbitQueueFactory factory, String queueName) throws IOException { + return new SimpleFileSenderRabbitMQ(factory.createDefaultRabbitQueue(queueName)); + } + + private RabbitQueue queue; + private int messageSize = DEFAULT_MESSAGE_SIZE; + + protected SimpleFileSenderRabbitMQ(RabbitQueue queue) { + this.queue = queue; + } + + public void streamData(InputStream is, String name) throws IOException { + int messageId = 0; + int length = 0; + byte[] nameBytes = RabbitMQUtils.writeString(name); + byte[] array = new byte[messageSize + nameBytes.length + 8]; + ByteBuffer buffer = ByteBuffer.wrap(array); + buffer.putInt(nameBytes.length); + buffer.put(nameBytes); + int messageIdPos = buffer.position(); + int dataStartPos = messageIdPos + 4; + do { + buffer.position(messageIdPos); + buffer.putInt(messageId); + length = is.read(array, dataStartPos, array.length - dataStartPos); + queue.channel.basicPublish("", queue.name, MessageProperties.MINIMAL_PERSISTENT_BASIC, + Arrays.copyOf(array, (length > 0) ? (dataStartPos + length) : dataStartPos)); + ++messageId; + } while (length > 0); + } + + public void setMessageSize(int messageSize) { + this.messageSize = messageSize; + } + + @Override + public void close() { + IOUtils.closeQuietly(queue); + } +} diff --git a/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java b/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java index d631f8f..a5ff082 100644 --- a/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java +++ b/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java @@ -89,6 +89,7 @@ public void test() throws Exception { // Needed for the generators environmentVariables.set(Constants.GENERATOR_ID_KEY, "0"); environmentVariables.set(Constants.GENERATOR_COUNT_KEY, "1"); + environmentVariables.set(Constants.IS_RABBIT_MQ_ENABLED,"true"); final DummyPlatformController dummyPlatformController = new DummyPlatformController(sessionId); try { @@ -269,9 +270,8 @@ public void run() { Thread t = new Thread(dataGenExecutor); dataGenThreads.add(t); t.start(); + commonChannel.writeBytes(RabbitMQUtils.writeString(containerId), "", replyTo, replyProps); - cmdChannel.basicPublish("", replyTo, replyProps, - RabbitMQUtils.writeString(containerId)); } else if (startCommandJson.contains(TASK_GEN_IMAGE)) { // Create task generators that are waiting for a random // amount of @@ -308,12 +308,10 @@ public void run() { Thread t = new Thread(taskGenExecutor); taskGenThreads.add(t); t.start(); + commonChannel.writeBytes(RabbitMQUtils.writeString(containerId), "", replyTo, replyProps); - cmdChannel.basicPublish("", replyTo, replyProps, - RabbitMQUtils.writeString(containerId)); } else if (startCommandJson.contains(EVAL_IMAGE)) { - cmdChannel.basicPublish("", replyTo, replyProps, - RabbitMQUtils.writeString(containerId)); + commonChannel.writeBytes(RabbitMQUtils.writeString(containerId), "", replyTo, replyProps); sendToCmdQueue(this.sessionId, Commands.EVAL_STORAGE_READY_SIGNAL, null, null); } else { LOGGER.error("Got unknown start command. Ignoring it."); diff --git a/src/test/java/org/hobbit/core/components/ContainerCreationNoCorrelationTest.java b/src/test/java/org/hobbit/core/components/ContainerCreationNoCorrelationTest.java index 2896915..39b40aa 100644 --- a/src/test/java/org/hobbit/core/components/ContainerCreationNoCorrelationTest.java +++ b/src/test/java/org/hobbit/core/components/ContainerCreationNoCorrelationTest.java @@ -62,6 +62,7 @@ public class ContainerCreationNoCorrelationTest { public void setUp() throws Exception { environmentVariables.set(Constants.RABBIT_MQ_HOST_NAME_KEY, TestConstants.RABBIT_HOST); environmentVariables.set(Constants.HOBBIT_SESSION_ID_KEY, "0"); + environmentVariables.set(Constants.IS_RABBIT_MQ_ENABLED,"true"); platformController = new DummyPlatformController(HOBBIT_SESSION_ID); DummyComponentExecutor platformExecutor = new DummyComponentExecutor(platformController); @@ -103,10 +104,9 @@ public void receiveCommand(byte command, byte[] data, String sessionId, AMQP.Bas AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder(); propsBuilder.deliveryMode(2); AMQP.BasicProperties replyProps = propsBuilder.build(); + commonChannel.writeBytes(RabbitMQUtils.writeString(containerId), "", props.getReplyTo(), replyProps); - cmdChannel.basicPublish("", props.getReplyTo(), replyProps, - RabbitMQUtils.writeString(containerId)); - } catch (IOException e) { + } catch (Exception e) { LOGGER.error("Exception in receiveCommand", e); } } diff --git a/src/test/java/org/hobbit/core/components/ContainerCreationTest.java b/src/test/java/org/hobbit/core/components/ContainerCreationTest.java index 3dc49a1..b15502b 100644 --- a/src/test/java/org/hobbit/core/components/ContainerCreationTest.java +++ b/src/test/java/org/hobbit/core/components/ContainerCreationTest.java @@ -63,6 +63,7 @@ public class ContainerCreationTest { public void setUp() throws Exception { environmentVariables.set(Constants.RABBIT_MQ_HOST_NAME_KEY, TestConstants.RABBIT_HOST); environmentVariables.set(Constants.HOBBIT_SESSION_ID_KEY, "0"); + environmentVariables.set(Constants.IS_RABBIT_MQ_ENABLED,"true"); platformController = new DummyPlatformController(HOBBIT_SESSION_ID); DummyComponentExecutor platformExecutor = new DummyComponentExecutor(platformController); @@ -124,10 +125,9 @@ public void receiveCommand(byte command, byte[] data, String sessionId, AMQP.Bas propsBuilder.deliveryMode(2); propsBuilder.correlationId(props.getCorrelationId()); AMQP.BasicProperties replyProps = propsBuilder.build(); + commonChannel.writeBytes(RabbitMQUtils.writeString(containerId), "", props.getReplyTo(), replyProps); - cmdChannel.basicPublish("", props.getReplyTo(), replyProps, - RabbitMQUtils.writeString(containerId)); - } catch (IOException | InterruptedException e) { + } catch (Exception e) { LOGGER.error("Exception in receiveCommand", e); } } diff --git a/src/test/java/org/hobbit/core/components/ContainerEnvironmentTest.java b/src/test/java/org/hobbit/core/components/ContainerEnvironmentTest.java index 6c57921..c47ad4e 100644 --- a/src/test/java/org/hobbit/core/components/ContainerEnvironmentTest.java +++ b/src/test/java/org/hobbit/core/components/ContainerEnvironmentTest.java @@ -112,7 +112,7 @@ public static Collection data() { public void setUp() throws Exception { environmentVariables.set(Constants.RABBIT_MQ_HOST_NAME_KEY, TestConstants.RABBIT_HOST); environmentVariables.set(Constants.HOBBIT_SESSION_ID_KEY, "0"); - + environmentVariables.set(Constants.IS_RABBIT_MQ_ENABLED,"true"); component = new DummyCommandReceivingComponent(); component.init(); } diff --git a/src/test/java/org/hobbit/core/components/DataGeneratorTest.java b/src/test/java/org/hobbit/core/components/DataGeneratorTest.java index 3662447..dc8606d 100644 --- a/src/test/java/org/hobbit/core/components/DataGeneratorTest.java +++ b/src/test/java/org/hobbit/core/components/DataGeneratorTest.java @@ -95,6 +95,7 @@ public void test() throws Exception { environmentVariables.set(Constants.GENERATOR_ID_KEY, "0"); environmentVariables.set(Constants.GENERATOR_COUNT_KEY, "1"); environmentVariables.set(Constants.HOBBIT_SESSION_ID_KEY, "0"); + environmentVariables.set(Constants.IS_RABBIT_MQ_ENABLED,"true"); init(); diff --git a/src/test/java/org/hobbit/core/components/EvaluationModuleTest.java b/src/test/java/org/hobbit/core/components/EvaluationModuleTest.java index c79aa0b..4a9ae12 100644 --- a/src/test/java/org/hobbit/core/components/EvaluationModuleTest.java +++ b/src/test/java/org/hobbit/core/components/EvaluationModuleTest.java @@ -71,6 +71,7 @@ public void test() throws Exception { environmentVariables.set(Constants.RABBIT_MQ_HOST_NAME_KEY, TestConstants.RABBIT_HOST); environmentVariables.set(Constants.HOBBIT_SESSION_ID_KEY, "0"); environmentVariables.set(Constants.HOBBIT_EXPERIMENT_URI_KEY, HobbitExperiments.getExperimentURI("123")); + environmentVariables.set(Constants.IS_RABBIT_MQ_ENABLED,"true"); // Create the eval store and add some data InMemoryEvaluationStore evalStore = new InMemoryEvaluationStore(); diff --git a/src/test/java/org/hobbit/core/components/SequencingTaskGeneratorTest.java b/src/test/java/org/hobbit/core/components/SequencingTaskGeneratorTest.java index cb8a443..933d0c1 100644 --- a/src/test/java/org/hobbit/core/components/SequencingTaskGeneratorTest.java +++ b/src/test/java/org/hobbit/core/components/SequencingTaskGeneratorTest.java @@ -101,6 +101,7 @@ public void test() throws Exception { environmentVariables.set(Constants.GENERATOR_ID_KEY, "0"); environmentVariables.set(Constants.GENERATOR_COUNT_KEY, "1"); environmentVariables.set(Constants.HOBBIT_SESSION_ID_KEY, "0"); + environmentVariables.set(Constants.IS_RABBIT_MQ_ENABLED,"true"); // Set the acknowledgement flag to true (read by the evaluation storage) environmentVariables.set(Constants.ACKNOWLEDGEMENT_FLAG_KEY, "true"); diff --git a/src/test/java/org/hobbit/core/components/SystemAdapterTest.java b/src/test/java/org/hobbit/core/components/SystemAdapterTest.java index 8c85a7a..0b08d89 100644 --- a/src/test/java/org/hobbit/core/components/SystemAdapterTest.java +++ b/src/test/java/org/hobbit/core/components/SystemAdapterTest.java @@ -110,6 +110,7 @@ public void test() throws Exception { environmentVariables.set(Constants.GENERATOR_ID_KEY, "0"); environmentVariables.set(Constants.GENERATOR_COUNT_KEY, "1"); environmentVariables.set(Constants.HOBBIT_SESSION_ID_KEY, "0"); + environmentVariables.set(Constants.IS_RABBIT_MQ_ENABLED,"true"); init(); diff --git a/src/test/java/org/hobbit/core/components/TaskGeneratorSequencingTest.java b/src/test/java/org/hobbit/core/components/TaskGeneratorSequencingTest.java index fd8cf15..1c655fd 100644 --- a/src/test/java/org/hobbit/core/components/TaskGeneratorSequencingTest.java +++ b/src/test/java/org/hobbit/core/components/TaskGeneratorSequencingTest.java @@ -103,6 +103,7 @@ public void test() throws Exception { environmentVariables.set(Constants.GENERATOR_ID_KEY, "0"); environmentVariables.set(Constants.GENERATOR_COUNT_KEY, "1"); environmentVariables.set(Constants.HOBBIT_SESSION_ID_KEY, "0"); + environmentVariables.set(Constants.IS_RABBIT_MQ_ENABLED,"true"); init(); diff --git a/src/test/java/org/hobbit/core/components/TaskGeneratorTest.java b/src/test/java/org/hobbit/core/components/TaskGeneratorTest.java index 551b6b1..b102245 100644 --- a/src/test/java/org/hobbit/core/components/TaskGeneratorTest.java +++ b/src/test/java/org/hobbit/core/components/TaskGeneratorTest.java @@ -115,6 +115,7 @@ public void test() throws Exception { environmentVariables.set(Constants.GENERATOR_ID_KEY, "0"); environmentVariables.set(Constants.GENERATOR_COUNT_KEY, "1"); environmentVariables.set(Constants.HOBBIT_SESSION_ID_KEY, "0"); + environmentVariables.set(Constants.IS_RABBIT_MQ_ENABLED,"true"); init(); diff --git a/src/test/java/org/hobbit/core/components/dummy/AbstractDummyPlatformController.java b/src/test/java/org/hobbit/core/components/dummy/AbstractDummyPlatformController.java index 239ace7..607d4ec 100644 --- a/src/test/java/org/hobbit/core/components/dummy/AbstractDummyPlatformController.java +++ b/src/test/java/org/hobbit/core/components/dummy/AbstractDummyPlatformController.java @@ -89,7 +89,7 @@ public void sendToCmdQueue(String address, byte command, byte data[], BasicPrope if (attachData) { buffer.put(data); } - cmdChannel.basicPublish(Constants.HOBBIT_COMMAND_EXCHANGE_NAME, "", props, buffer.array()); + commonChannel.writeBytes(buffer.array(), Constants.HOBBIT_COMMAND_EXCHANGE_NAME, "", props); } @Override diff --git a/src/test/java/org/hobbit/core/mimic/DockerBasedMimickingAlgTest.java b/src/test/java/org/hobbit/core/mimic/DockerBasedMimickingAlgTest.java index bb62bd7..d350b1d 100644 --- a/src/test/java/org/hobbit/core/mimic/DockerBasedMimickingAlgTest.java +++ b/src/test/java/org/hobbit/core/mimic/DockerBasedMimickingAlgTest.java @@ -59,6 +59,7 @@ public class DockerBasedMimickingAlgTest extends AbstractPlatformConnectorCompon public void test() throws Exception { environmentVariables.set(Constants.RABBIT_MQ_HOST_NAME_KEY, TestConstants.RABBIT_HOST); environmentVariables.set(Constants.HOBBIT_SESSION_ID_KEY, HOBBIT_SESSION_ID); + environmentVariables.set(Constants.IS_RABBIT_MQ_ENABLED,"true"); outputDir = generateTempDir(); LOGGER.debug("File will be writte to {}", outputDir.getAbsolutePath()); @@ -159,9 +160,9 @@ public void run() { propsBuilder.deliveryMode(2); propsBuilder.correlationId(props.getCorrelationId()); AMQP.BasicProperties replyProps = propsBuilder.build(); + commonChannel.writeBytes(RabbitMQUtils.writeString(containerId), "", + replyTo, replyProps); - cmdChannel.basicPublish("", replyTo, replyProps, - RabbitMQUtils.writeString(containerId)); } else { LOGGER.error("Got unknown start command. Ignoring it."); } diff --git a/src/test/java/org/hobbit/core/rabbit/FileStreamingTest.java b/src/test/java/org/hobbit/core/rabbit/FileStreamingTest.java index ae2bb68..9ecfee5 100644 --- a/src/test/java/org/hobbit/core/rabbit/FileStreamingTest.java +++ b/src/test/java/org/hobbit/core/rabbit/FileStreamingTest.java @@ -84,7 +84,7 @@ public void run() { receiverThread.start(); System.out.println("Starting sender..."); - SimpleFileSender sender = SimpleFileSender.create(this, queueName); + SimpleFileSenderRabbitMQ sender = SimpleFileSenderRabbitMQ.create(this, queueName); Thread senderThread = new Thread(new Runnable() { @Override public void run() { diff --git a/src/test/java/org/hobbit/core/rabbit/SenderReceiverTest.java b/src/test/java/org/hobbit/core/rabbit/SenderReceiverTest.java index f4ec3bd..21c32e7 100644 --- a/src/test/java/org/hobbit/core/rabbit/SenderReceiverTest.java +++ b/src/test/java/org/hobbit/core/rabbit/SenderReceiverTest.java @@ -11,6 +11,9 @@ import org.apache.commons.io.IOUtils; import org.hobbit.core.TestConstants; +import org.hobbit.core.com.DataHandler; +import org.hobbit.core.com.DataReceiver; +import org.hobbit.core.com.DataSender; import org.hobbit.core.data.RabbitQueue; import org.junit.Assert; import org.junit.Test;