diff --git a/src/main/java/org/hobbit/core/components/AbstractEvaluationModule.java b/src/main/java/org/hobbit/core/components/AbstractEvaluationModule.java index 2cd19e5..7f8d202 100644 --- a/src/main/java/org/hobbit/core/components/AbstractEvaluationModule.java +++ b/src/main/java/org/hobbit/core/components/AbstractEvaluationModule.java @@ -28,7 +28,7 @@ import org.hobbit.core.Commands; import org.hobbit.core.Constants; import org.hobbit.core.data.RabbitQueue; -import org.hobbit.core.rabbit.CustomConsumer; +import org.hobbit.core.rabbit.QueueingConsumer; import org.hobbit.core.rabbit.RabbitMQUtils; import org.hobbit.utils.EnvVariables; import org.hobbit.vocab.HOBBIT; @@ -52,7 +52,7 @@ public abstract class AbstractEvaluationModule extends AbstractPlatformConnector /** * Consumer used to receive the responses from the evaluation storage. */ - protected CustomConsumer consumer; + protected QueueingConsumer consumer; /** * Queue to the evaluation storage. */ @@ -65,6 +65,10 @@ public abstract class AbstractEvaluationModule extends AbstractPlatformConnector * The URI of the experiment. */ protected String experimentUri; + /** + * Timeout parameter for delivery queue message poll. + */ + private static final int QUEUEPOLLTIMEOUT=600000; public AbstractEvaluationModule() { defaultContainerType = Constants.CONTAINER_TYPE_BENCHMARK; @@ -82,7 +86,7 @@ public void init() throws Exception { evalStore2EvalModuleQueue = getFactoryForIncomingDataQueues() .createDefaultRabbitQueue(generateSessionQueueName(Constants.EVAL_STORAGE_2_EVAL_MODULE_DEFAULT_QUEUE_NAME)); - consumer = new CustomConsumer(evalStore2EvalModuleQueue.channel); + consumer = new QueueingConsumer(evalStore2EvalModuleQueue.channel); evalStore2EvalModuleQueue.channel.basicConsume(evalStore2EvalModuleQueue.name, consumer); } @@ -117,9 +121,15 @@ protected void collectResponses() throws Exception { // request next response pair props = new BasicProperties.Builder().deliveryMode(2).replyTo(evalStore2EvalModuleQueue.name).build(); evalModule2EvalStoreQueue.channel.basicPublish("", evalModule2EvalStoreQueue.name, props, requestBody); - Delivery delivery = consumer.getDeliveryQueue().poll(300, TimeUnit.MILLISECONDS); + //Wait for delivery message + Delivery delivery = consumer.getDeliveryQueue().poll(QUEUEPOLLTIMEOUT, TimeUnit.MILLISECONDS); // parse the response - buffer = ByteBuffer.wrap(delivery.getBody()); + try + { + buffer = ByteBuffer.wrap(delivery.getBody()); + + + // if the response is empty if (buffer.remaining() == 0) { LOGGER.error("Got a completely empty response from the evaluation storage."); @@ -139,7 +149,13 @@ protected void collectResponses() throws Exception { responseReceivedTimestamp = data.length > 0 ? RabbitMQUtils.readLong(data) : 0; receivedData = RabbitMQUtils.readByteArray(buffer); + evaluateResponse(expectedData, receivedData, taskSentTimestamp, responseReceivedTimestamp); + }catch(NullPointerException e) + + { + LOGGER.error("No Message Received after waiting for ten minutes"); + } } } diff --git a/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java b/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java index 9fee50b..89671b4 100644 --- a/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java +++ b/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java @@ -22,7 +22,7 @@ import org.apache.commons.io.IOUtils; import org.hobbit.core.Commands; import org.hobbit.core.Constants; -import org.hobbit.core.rabbit.CustomConsumer; +import org.hobbit.core.rabbit.QueueingConsumer; import org.hobbit.core.rabbit.DataHandler; import org.hobbit.core.rabbit.DataReceiver; import org.hobbit.core.rabbit.DataReceiverImpl; @@ -88,8 +88,8 @@ public abstract class AbstractTaskGenerator extends AbstractPlatformConnectorCom protected DataSender sender2System; protected DataSender sender2EvalStore; protected DataReceiver dataGenReceiver; - - protected CustomConsumer consumer; + @Deprecated + protected QueueingConsumer consumer; protected boolean runFlag; /** diff --git a/src/test/java/org/hobbit/core/rabbit/SenderReceiverTest.java b/src/test/java/org/hobbit/core/rabbit/SenderReceiverTest.java index 373b0bd..9adf7a2 100644 --- a/src/test/java/org/hobbit/core/rabbit/SenderReceiverTest.java +++ b/src/test/java/org/hobbit/core/rabbit/SenderReceiverTest.java @@ -213,7 +213,7 @@ public void run() { @SuppressWarnings("unused") private void receiveMsgsSequentielly(RabbitQueue queue) throws Exception { - CustomConsumer consumer = new CustomConsumer(queue.channel); + QueueingConsumer consumer = new QueueingConsumer(queue.channel); queue.channel.basicConsume(queue.name, true, consumer); Delivery delivery = null; while ((terminationMutex.availablePermits() == 0) || (queue.messageCount() > 0) || (delivery != null)) { @@ -226,7 +226,7 @@ private void receiveMsgsSequentielly(RabbitQueue queue) throws Exception { @SuppressWarnings("unused") private void receiveMsgsInParallel(RabbitQueue queue, ExecutorService executor) throws Exception { - CustomConsumer consumer = new CustomConsumer(queue.channel); + QueueingConsumer consumer = new QueueingConsumer(queue.channel); queue.channel.basicConsume(queue.name, true, consumer); Delivery delivery = null; while ((terminationMutex.availablePermits() == 0) || (queue.messageCount() > 0) || (delivery != null)) {