Skip to content

Commit

Permalink
Code Changes for the feedback on issue hobbit-project#45
Browse files Browse the repository at this point in the history
  • Loading branch information
sourabhpoddar404 committed Nov 23, 2019
1 parent 67f46f1 commit c5c57ad
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.hobbit.core.Commands;
import org.hobbit.core.Constants;
import org.hobbit.core.data.RabbitQueue;
import org.hobbit.core.rabbit.CustomConsumer;
import org.hobbit.core.rabbit.QueueingConsumer;
import org.hobbit.core.rabbit.RabbitMQUtils;
import org.hobbit.utils.EnvVariables;
import org.hobbit.vocab.HOBBIT;
Expand All @@ -52,7 +52,7 @@ public abstract class AbstractEvaluationModule extends AbstractPlatformConnector
/**
* Consumer used to receive the responses from the evaluation storage.
*/
protected CustomConsumer consumer;
protected QueueingConsumer consumer;
/**
* Queue to the evaluation storage.
*/
Expand All @@ -65,6 +65,10 @@ public abstract class AbstractEvaluationModule extends AbstractPlatformConnector
* The URI of the experiment.
*/
protected String experimentUri;
/**
* Timeout parameter for delivery queue message poll.
*/
private static final int QUEUEPOLLTIMEOUT=600000;

public AbstractEvaluationModule() {
defaultContainerType = Constants.CONTAINER_TYPE_BENCHMARK;
Expand All @@ -82,7 +86,7 @@ public void init() throws Exception {
evalStore2EvalModuleQueue = getFactoryForIncomingDataQueues()
.createDefaultRabbitQueue(generateSessionQueueName(Constants.EVAL_STORAGE_2_EVAL_MODULE_DEFAULT_QUEUE_NAME));

consumer = new CustomConsumer(evalStore2EvalModuleQueue.channel);
consumer = new QueueingConsumer(evalStore2EvalModuleQueue.channel);
evalStore2EvalModuleQueue.channel.basicConsume(evalStore2EvalModuleQueue.name, consumer);
}

Expand Down Expand Up @@ -117,9 +121,15 @@ protected void collectResponses() throws Exception {
// request next response pair
props = new BasicProperties.Builder().deliveryMode(2).replyTo(evalStore2EvalModuleQueue.name).build();
evalModule2EvalStoreQueue.channel.basicPublish("", evalModule2EvalStoreQueue.name, props, requestBody);
Delivery delivery = consumer.getDeliveryQueue().poll(300, TimeUnit.MILLISECONDS);
//Wait for delivery message
Delivery delivery = consumer.getDeliveryQueue().poll(QUEUEPOLLTIMEOUT, TimeUnit.MILLISECONDS);
// parse the response
buffer = ByteBuffer.wrap(delivery.getBody());
try
{
buffer = ByteBuffer.wrap(delivery.getBody());



// if the response is empty
if (buffer.remaining() == 0) {
LOGGER.error("Got a completely empty response from the evaluation storage.");
Expand All @@ -139,7 +149,13 @@ protected void collectResponses() throws Exception {
responseReceivedTimestamp = data.length > 0 ? RabbitMQUtils.readLong(data) : 0;
receivedData = RabbitMQUtils.readByteArray(buffer);


evaluateResponse(expectedData, receivedData, taskSentTimestamp, responseReceivedTimestamp);
}catch(NullPointerException e)

{
LOGGER.error("No Message Received after waiting for ten minutes");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.commons.io.IOUtils;
import org.hobbit.core.Commands;
import org.hobbit.core.Constants;
import org.hobbit.core.rabbit.CustomConsumer;
import org.hobbit.core.rabbit.QueueingConsumer;
import org.hobbit.core.rabbit.DataHandler;
import org.hobbit.core.rabbit.DataReceiver;
import org.hobbit.core.rabbit.DataReceiverImpl;
Expand Down Expand Up @@ -88,8 +88,8 @@ public abstract class AbstractTaskGenerator extends AbstractPlatformConnectorCom
protected DataSender sender2System;
protected DataSender sender2EvalStore;
protected DataReceiver dataGenReceiver;

protected CustomConsumer consumer;
@Deprecated
protected QueueingConsumer consumer;
protected boolean runFlag;

/**
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/org/hobbit/core/rabbit/SenderReceiverTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public void run() {

@SuppressWarnings("unused")
private void receiveMsgsSequentielly(RabbitQueue queue) throws Exception {
CustomConsumer consumer = new CustomConsumer(queue.channel);
QueueingConsumer consumer = new QueueingConsumer(queue.channel);
queue.channel.basicConsume(queue.name, true, consumer);
Delivery delivery = null;
while ((terminationMutex.availablePermits() == 0) || (queue.messageCount() > 0) || (delivery != null)) {
Expand All @@ -226,7 +226,7 @@ private void receiveMsgsSequentielly(RabbitQueue queue) throws Exception {

@SuppressWarnings("unused")
private void receiveMsgsInParallel(RabbitQueue queue, ExecutorService executor) throws Exception {
CustomConsumer consumer = new CustomConsumer(queue.channel);
QueueingConsumer consumer = new QueueingConsumer(queue.channel);
queue.channel.basicConsume(queue.name, true, consumer);
Delivery delivery = null;
while ((terminationMutex.availablePermits() == 0) || (queue.messageCount() > 0) || (delivery != null)) {
Expand Down

0 comments on commit c5c57ad

Please sign in to comment.