Skip to content

Commit

Permalink
Changes for AbstractEvaluationModule Issue hobbit-project#45
Browse files Browse the repository at this point in the history
  • Loading branch information
sourabhpoddar404 committed Nov 10, 2019
1 parent d86827f commit d8f4edd
Showing 1 changed file with 39 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.apache.commons.io.IOUtils;
import org.apache.jena.rdf.model.Model;
Expand All @@ -27,14 +29,18 @@
import org.hobbit.core.Commands;
import org.hobbit.core.Constants;
import org.hobbit.core.data.RabbitQueue;
import org.hobbit.core.rabbit.CustomConsumer;
import org.hobbit.core.rabbit.RabbitMQUtils;
import org.hobbit.utils.EnvVariables;
import org.hobbit.vocab.HOBBIT;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.Envelope;

/**
* This abstract class implements basic functions that can be used to implement
Expand All @@ -50,7 +56,7 @@ public abstract class AbstractEvaluationModule extends AbstractPlatformConnector
/**
* Consumer used to receive the responses from the evaluation storage.
*/
protected QueueingConsumer consumer;
protected CustomConsumer consumer;
/**
* Queue to the evaluation storage.
*/
Expand All @@ -63,6 +69,7 @@ public abstract class AbstractEvaluationModule extends AbstractPlatformConnector
* The URI of the experiment.
*/
protected String experimentUri;


public AbstractEvaluationModule() {
defaultContainerType = Constants.CONTAINER_TYPE_BENCHMARK;
Expand All @@ -80,7 +87,7 @@ public void init() throws Exception {
evalStore2EvalModuleQueue = getFactoryForIncomingDataQueues()
.createDefaultRabbitQueue(generateSessionQueueName(Constants.EVAL_STORAGE_2_EVAL_MODULE_DEFAULT_QUEUE_NAME));

consumer = new QueueingConsumer(evalStore2EvalModuleQueue.channel);
consumer = new CustomConsumer(evalStore2EvalModuleQueue.channel);
evalStore2EvalModuleQueue.channel.basicConsume(evalStore2EvalModuleQueue.name, consumer);
}

Expand Down Expand Up @@ -113,33 +120,39 @@ protected void collectResponses() throws Exception {

while (true) {
// request next response pair
Delivery delivery=null;
props = new BasicProperties.Builder().deliveryMode(2).replyTo(evalStore2EvalModuleQueue.name).build();
evalModule2EvalStoreQueue.channel.basicPublish("", evalModule2EvalStoreQueue.name, props, requestBody);
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
// parse the response
buffer = ByteBuffer.wrap(delivery.getBody());
// if the response is empty
if (buffer.remaining() == 0) {
LOGGER.error("Got a completely empty response from the evaluation storage.");
return;
}
requestBody[0] = buffer.get();
delivery = consumer.deliveryQueue.poll(300,TimeUnit.MILLISECONDS);
//if(!deliveryQueue.isEmpty()) {

// parse the response
buffer = ByteBuffer.wrap(delivery.getBody());
// if the response is empty
if (buffer.remaining() == 0) {
LOGGER.error("Got a completely empty response from the evaluation storage.");
return;
}
requestBody[0] = buffer.get();

// if the response is empty
if (buffer.remaining() == 0) {
return;
}
byte[] data = RabbitMQUtils.readByteArray(buffer);
taskSentTimestamp = data.length > 0 ? RabbitMQUtils.readLong(data) : 0;
expectedData = RabbitMQUtils.readByteArray(buffer);

data = RabbitMQUtils.readByteArray(buffer);
responseReceivedTimestamp = data.length > 0 ? RabbitMQUtils.readLong(data) : 0;
receivedData = RabbitMQUtils.readByteArray(buffer);

evaluateResponse(expectedData, receivedData, taskSentTimestamp, responseReceivedTimestamp);

// if the response is empty
if (buffer.remaining() == 0) {
return;
}
byte[] data = RabbitMQUtils.readByteArray(buffer);
taskSentTimestamp = data.length > 0 ? RabbitMQUtils.readLong(data) : 0;
expectedData = RabbitMQUtils.readByteArray(buffer);

data = RabbitMQUtils.readByteArray(buffer);
responseReceivedTimestamp = data.length > 0 ? RabbitMQUtils.readLong(data) : 0;
receivedData = RabbitMQUtils.readByteArray(buffer);

evaluateResponse(expectedData, receivedData, taskSentTimestamp, responseReceivedTimestamp);
}
}

}
//}

/**
* Evaluates the given response pair.
Expand Down

0 comments on commit d8f4edd

Please sign in to comment.