Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Communication channel transparency #60

Open
wants to merge 25 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8c3e109
Channel transparency, base implementation.
altafhusen-mr May 3, 2020
763d5c9
Channel transparency, base implementation.
altafhusen-mr May 9, 2020
f5bc5c1
Channel transparency, base implementation.
altafhusen-mr May 10, 2020
580fc27
Channel transparency, base implementation.
altafhusen-mr May 11, 2020
c92f462
Channel transparency : DirectChannel implementation
altafhusen-mr May 17, 2020
d61f495
Channel transparency : DirectChannel implementation
sourabhpoddar404 May 18, 2020
027807e
Package refactotization
altafhusen-mr May 20, 2020
56564b4
Changes to SenderReceiverFactory
sourabhpoddar404 May 20, 2020
fe47b1a
Including ExecutorService for DirectChannel and ReadByteChannel, othe…
altafhusen-mr May 24, 2020
595aa23
Fix for Tests failing due to data over writing during DirectSenderImpl
altafhusen-mr May 24, 2020
c2adfb6
changes to AbstractEvaluationStorage component
sourabhpoddar404 May 25, 2020
62e88f9
Changes to AbstractEvaluationStorage
sourabhpoddar404 May 25, 2020
feed030
DataGereratorTest fix for multiple test runs, TaskGeneratorTest changes,
altafhusen-mr May 31, 2020
6092dc0
Implementing RabbitMQChannel, DataGenerator test working implementation
altafhusen-mr Jun 7, 2020
1b6069a
ContainerCreationNoCorrelationTest and ContainerCreationTest for
altafhusen-mr Jun 7, 2020
5ebdaf3
Remove comment from catch block
altafhusen-mr Jun 7, 2020
0416688
BenchmarkControllingTest for RabbitMQChannel
altafhusen-mr Jun 9, 2020
21c4c40
Fixing All the tests except SequencingTaskGeneratorTest for
altafhusen-mr Jun 21, 2020
ec5cba3
Fix for SequencingTaskGeneratorTest for RabbitMQChannel
altafhusen-mr Jun 21, 2020
36ace5f
Code documentation
altafhusen-mr Jun 21, 2020
6523996
Merge branch 'develop' of https://github.com/hobbit-project/core into…
altafhusen-mr Jun 24, 2020
89d32df
Revert test cases changes and resolve conflict
altafhusen-mr Jun 24, 2020
8a8c30f
Moving classes to specified packages, code format, unnecessary exception
altafhusen-mr Jun 28, 2020
3cb1680
Added Javadocs for the components.
melissadas Jul 5, 2020
585f811
Rename CommonChannel to Channel as per the feedback
altafhusen-mr Jul 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/main/java/org/hobbit/core/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ private Constants() {
}

// =============== ENVIRONMENT CONSTANTS ===============
public static final String IS_RABBIT_MQ_ENABLED = "IS_RABBIT_MQ_ENABLED";

public static final String HOBBIT_SESSION_ID_KEY = "HOBBIT_SESSION_ID";

Expand Down
73 changes: 73 additions & 0 deletions src/main/java/org/hobbit/core/com/Channel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.hobbit.core.com;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;

import org.hobbit.core.components.AbstractCommandReceivingComponent;

import com.rabbitmq.client.AMQP.BasicProperties;
import org.hobbit.core.rabbit.RabbitMQChannel;
import org.hobbit.core.com.java.DirectChannel;

/**
* An interface with several methods that is implemented by {@link RabbitMQChannel} and {@link DirectChannel}.
*
* @author Altafhusen Makandar
* @author Sourabh Poddar
* @author Yamini Punetha
* @author Melissa Das
*
*/
public interface Channel {

/**
* Method to accept messages from the channel.
*/
public void readBytes(Object consumerCallback, Object classs, Boolean autoAck, String queue) throws IOException;

/**
* Method to publish a message of type Byte array the channel.
*/
public void writeBytes(byte data[], String exchange, String routingKey, BasicProperties props) throws IOException;

/**
* Method to publish a message of type ByteBuffer to the channel.
*/
public void writeBytes(ByteBuffer buffer, String exchange, String routingKey, BasicProperties props) throws IOException;

/**
* Method to close a channel.
*/
public void close();

/**
* Method to create a new channel.
*/
public void createChannel() throws Exception;

/**
* Method to get the queue name.
*/
public String getQueueName(AbstractCommandReceivingComponent abstractCommandReceivingComponent) throws Exception;

/**
* Method to create an exchange for the data transfer for a broker
*/
public void exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
Map<String, Object> arguments) throws IOException;

/**
* Method to bind a queue with an exchange. This allows an exchange to publish the messages
* to the queues that are bound to this exchange
*/
public void queueBind(String queue, String exchange, String routingKey) throws IOException;

/**
* Method to return the channel instance.
*/
public Object getChannel();

public String declareQueue(String queueName) throws IOException;

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.hobbit.core.rabbit;
package org.hobbit.core.com;

public interface DataHandler {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.hobbit.core.rabbit;
package org.hobbit.core.com;

import java.io.Closeable;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.hobbit.core.rabbit;
package org.hobbit.core.com;

import java.io.Closeable;
import java.io.IOException;
Expand Down
31 changes: 31 additions & 0 deletions src/main/java/org/hobbit/core/com/java/DirectCallback.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.hobbit.core.com.java;

import java.util.List;

import org.hobbit.core.com.Channel;

import com.rabbitmq.client.AMQP.BasicProperties;

/**
* This class is used by the DirectChannel implementation
* for a callback function as a consumer callback
* @author altaf, sourabh, yamini, melisa
*
*/
public abstract class DirectCallback {

protected Channel channel;
protected String queue;
protected BasicProperties props;

public DirectCallback() {}

public DirectCallback(Channel channel, String queue, BasicProperties props) {
this.channel = channel;
this.queue = queue;
this.props = props;
}

public abstract void callback(byte[] data, List<Object> classs,BasicProperties props);

}
117 changes: 117 additions & 0 deletions src/main/java/org/hobbit/core/com/java/DirectChannel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package org.hobbit.core.com.java;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.StringUtils;
import org.hobbit.core.com.Channel;
import org.hobbit.core.components.AbstractCommandReceivingComponent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rabbitmq.client.AMQP.BasicProperties;
/**
* This class implements the necessary functionality for message sharing
* without using RabbitMQ. DirectChannel uses Java NIO Pipe for message queuing
* implementation
*
* @author Altafhusen Makandar
* @author Sourabh Poddar
* @author Yamini Punetha
* @author Melissa Das
*
*/
public class DirectChannel implements Channel {

private static final Logger LOGGER = LoggerFactory.getLogger(DirectChannel.class);

static Map<String, PipeChannel> pipes = new HashMap<>();

PipeChannel pipeChannel;
private ExecutorService threadPool = Executors.newCachedThreadPool();
public DirectChannel(){}
public DirectChannel(String queue){
try {
if(pipes.get(queue) == null) {
pipeChannel = new PipeChannel(Pipe.open());
pipes.put(queue, pipeChannel);
}
} catch (IOException e) {
LOGGER.error("Error creating pipe ",e);
}
}


@Override
public void readBytes(Object callback, Object classs, Boolean autoAck, String queue) {
threadPool.execute(new ReadByteChannel(pipes.get(queue), callback, classs));
}

@Override
public void writeBytes(byte[] data, String exchange, String routingKey, BasicProperties props) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(data.length);
buffer.put(data);
writeBytes(buffer, exchange, routingKey, props);
}

@Override
public void writeBytes(ByteBuffer buffer, String exchange, String routingKey, BasicProperties props) throws IOException {
String queue = StringUtils.isEmpty(exchange) ? routingKey : exchange;
if(!pipes.isEmpty()) {
pipes.get(queue).setProps(props);
String replyQueue = queue;
if(props != null && StringUtils.isNotBlank(props.getReplyTo())) {
replyQueue = props.getReplyTo();
}
if(pipes.get(replyQueue).getPipe().sink().isOpen()) {
buffer.flip();
while (buffer.hasRemaining())
pipes.get(replyQueue).getPipe().sink().write(buffer);
buffer.clear();
}
}
}

@Override
public void close() {
/*if(ReadByteChannel.classes != null && ReadByteChannel.classes.size() > 0) {
ReadByteChannel.classes.clear();
}
pipes.clear();*/
}

@Override
public void createChannel() throws Exception {

}

@Override
public String getQueueName(AbstractCommandReceivingComponent abstractCommandReceivingComponent) throws Exception {
return null;
}

@Override
public void exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
Map<String, Object> arguments) throws IOException {

}

@Override
public void queueBind(String queue, String exchange, String routingKey) throws IOException {

}

@Override
public Object getChannel() {
return null;
}

@Override
public String declareQueue(String queueName) throws IOException {
return queueName;
}
}
65 changes: 65 additions & 0 deletions src/main/java/org/hobbit/core/com/java/DirectReceiverImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.hobbit.core.com.java;

import java.io.IOException;

import org.hobbit.core.com.Channel;
import org.hobbit.core.com.DataHandler;
import org.hobbit.core.com.DataReceiver;
import org.hobbit.core.components.communicationfactory.ChannelFactory;
import org.hobbit.core.data.RabbitQueue;

/**
* This class implements the methods of DataReceiver for {@link DirectChannel}
*
* @author Altafhusen Makander
* @author Sourabh Poddar
* @author Yamini Punetha
* @author Melissa Das
*
*/
public class DirectReceiverImpl implements DataReceiver {

public DirectReceiverImpl(String queue, Object consumer) {

Channel channel = new ChannelFactory().getChannel(false, queue, null);
try {
channel.readBytes(consumer, this, null, queue);
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void close() throws IOException {
// TODO Auto-generated method stub

}

@Override
public int getErrorCount() {
return 0;
}

@Override
public void closeWhenFinished() {
// TODO Auto-generated method stub

}

@Override
public void increaseErrorCount() {
// TODO Auto-generated method stub

}

@Override
public DataHandler getDataHandler() {
return null;
}

@Override
public RabbitQueue getQueue() {
return null;
}

}
60 changes: 60 additions & 0 deletions src/main/java/org/hobbit/core/com/java/DirectSenderImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.hobbit.core.com.java;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.hobbit.core.com.Channel;
import org.hobbit.core.com.DataSender;
import org.hobbit.core.components.communicationfactory.ChannelFactory;
/**
* DataSender implementation for DirectChannel
* @author altaf, sourabh, yamini, melisa
*
*/
public class DirectSenderImpl implements DataSender {

Channel senderChannel;
String queue;

public DirectSenderImpl(String queue){
this.queue = queue;
senderChannel = new ChannelFactory().getChannel(false, queue, null);
}


@Override
public void close() throws IOException {
// TODO Auto-generated method stub
}

@Override
public void sendData(byte[] data) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(data.length);
buffer.put(data);
sleep(100000);
senderChannel.writeBytes(buffer, null, this.queue, null);
/*try {
Thread.sleep(0, 100);
sleep(100000);
senderChannel.writeBytes(buffer, null, this.queue, null);
} catch (Exception e) {
LOGGER.error("Error waiting during send data", e);
}*/
}

private void sleep(long interval) {
long start = System.nanoTime();
long totalTime = start + interval;
long end = 0;
do {
end = System.nanoTime();
}while(totalTime >= end);
}


@Override
public void closeWhenFinished() {
// TODO Auto-generated method stub
}

}
Loading