From 843327fae6947886a3d08e0e1434ce3852ad5594 Mon Sep 17 00:00:00 2001 From: RufoJ Date: Mon, 27 Aug 2018 10:54:24 +0200 Subject: [PATCH 1/5] Added feature diffusion computation and utils --- .../examples/DiffusionMasterCompute.java | 201 ++++++++++++++++++ .../DiffusionSimulationComputation.java | 116 ++++++++++ .../datastructures/DiffusionVertexValue.java | 117 ++++++++++ .../datastructures/LabelingVertexValue.java | 79 +++++++ ...ListNoEdgeValueTextVertexOutputFormat.java | 66 ++++++ .../io/DiffusionVertexInputFormat.java | 66 ++++++ .../io/LabelingInputFormat.java | 62 ++++++ .../DegreeLabelingSimulationComputation.java | 27 +++ ...FlatTimeLabelingSimulationComputation.java | 26 +++ .../KCoreLabelingSimulationComputation.java | 77 +++++++ ...PageRankLabelingSimulationComputation.java | 45 ++++ 11 files changed, 882 insertions(+) create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMasterCompute.java create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionSimulationComputation.java create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/datastructures/DiffusionVertexValue.java create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/datastructures/LabelingVertexValue.java create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/AdjacencyListNoEdgeValueTextVertexOutputFormat.java create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/DiffusionVertexInputFormat.java create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/LabelingInputFormat.java create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/DegreeLabelingSimulationComputation.java create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/FlatTimeLabelingSimulationComputation.java create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/KCoreLabelingSimulationComputation.java create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/PageRankLabelingSimulationComputation.java diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMasterCompute.java b/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMasterCompute.java new file mode 100644 index 000000000..c180818b5 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMasterCompute.java @@ -0,0 +1,201 @@ +package org.apache.giraph.examples; + + +import org.apache.giraph.aggregators.BooleanOrAggregator; +import org.apache.giraph.aggregators.LongMaxAggregator; +import org.apache.giraph.aggregators.LongSumAggregator; +import org.apache.giraph.master.DefaultMasterCompute; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.log4j.Logger; + + +public class DiffusionMasterCompute extends DefaultMasterCompute { + + protected Logger LOG = Logger.getLogger(DiffusionMasterCompute.class); + + public static final String convincedVerticesAggregator = "CONV_AGG_DIFF"; + public static final String usingVerticesAggregator = "AGG_DIFF"; + public static final String deadVerticesAggregator = "AGG_DIFF_DEAD"; + public static final String latestActivationsAggregator = "AGG_ACTIVATED_LAST"; + public static final String activatedVerticesCounterGroup = "Diffusion Counters"; + public static final String convincedVerticesCounter = "Convinced_Vertices "; + public static final String usingVerticesCounter = "Using_Vertices "; + public static final String deadVerticesCounter = "Dead_Vertices "; + public static final String diffusionDeltaOption = "diffusion.delta"; + public static final double diffusionDeltaOptionDefault = 0.005; + public static final String diffusionListenOption = "diffusion.listenWhileUnactive"; + public static final String hesitantVerticesAggregator="hesitantVerticesAggregator "; + + + public static final String byLabelOption="by_label"; + + //for KCORE (or general label) based algorithm + public static final String invitedVerticesAggregator="Invited_Vertices "; + public static final String almostConvincedVerticesAggregator="AlmostConvinced_Vertices "; + public static final String currentLabel="Label_active "; //label da analizzare nello specifico superstep + public static final String nextLabel="Next_label "; //ogni volta riceve tutte le label ancora da eseguire + public static final String timeToSwitch="is_time_to_switch"; + public double KSwitchTreshold; + + //for MIN_NUMBER based algorithm + public boolean byLabel; + public int minNumber; + public static final String potentialVerticesAggregator="Potential_invited_vertices"; + public static final String oldInvitedVerticesAggregator="Old_invited_vertices"; + public static final String oldConvincedVerticesAggregator="Old_convinced_vertices"; + public static final String oldDeadVerticesAggregator="Old_dead_vertices"; + public static final String oldUsingVerticesAggregator="Old_using_vertices"; + public static final String oldHesitantVerticesAggregator="Old_hesitant_vertices"; + public static final String oldAlmostConvincedVerticesAggregator="Old_almostConvinced_vertices"; + public static final String justChangedTimeToSwitch="Just_changed_timeToSwitch_value"; + + + @Override + public void compute() { + //super.compute(); + long convincedVertices = ((LongWritable)getAggregatedValue(convincedVerticesAggregator)).get(); + long usingVertices = ((LongWritable)getAggregatedValue(usingVerticesAggregator)).get(); + long deadVertices = ((LongWritable)getAggregatedValue(deadVerticesAggregator)).get(); + long invitedVertices=((LongWritable)getAggregatedValue(invitedVerticesAggregator)).get(); + long almostConvincedVertices=((LongWritable)getAggregatedValue(almostConvincedVerticesAggregator)).get(); + long activeLabel=(int)((LongWritable)getAggregatedValue(currentLabel)).get(); + long hesitantVerticesAggregatorVal=((LongWritable)getAggregatedValue(hesitantVerticesAggregator)).get(); + + //This avoid having counters' value "0" when it's timeToSwitch and so the computation based on MIN_NUMBER is "paused" + if(!byLabel && ((BooleanWritable)getAggregatedValue(timeToSwitch)).get()) { + almostConvincedVertices=((LongWritable)getAggregatedValue(oldAlmostConvincedVerticesAggregator)).get(); + invitedVertices=((LongWritable)getAggregatedValue(oldInvitedVerticesAggregator)).get(); + usingVertices=((LongWritable)getAggregatedValue(oldUsingVerticesAggregator)).get(); + deadVertices=((LongWritable)getAggregatedValue(oldDeadVerticesAggregator)).get(); + convincedVertices=((LongWritable)getAggregatedValue(oldConvincedVerticesAggregator)).get(); + hesitantVerticesAggregatorVal=((LongWritable)getAggregatedValue(oldHesitantVerticesAggregator)).get(); + } + + getContext().getCounter(activatedVerticesCounterGroup,hesitantVerticesAggregator+superstep()).setValue(hesitantVerticesAggregatorVal); + getContext().getCounter(activatedVerticesCounterGroup, usingVerticesCounter + superstep()).setValue(usingVertices); + getContext().getCounter(activatedVerticesCounterGroup, convincedVerticesCounter + superstep()).setValue(convincedVertices); + getContext().getCounter(activatedVerticesCounterGroup, deadVerticesCounter + superstep()).setValue(deadVertices); + getContext().getCounter(activatedVerticesCounterGroup,invitedVerticesAggregator + superstep()).setValue(invitedVertices); + getContext().getCounter(activatedVerticesCounterGroup,almostConvincedVerticesAggregator + superstep()).setValue(almostConvincedVertices); + getContext().getCounter(activatedVerticesCounterGroup,currentLabel + superstep()).setValue(activeLabel); + + + + //test purpose + if(superstep()==0) { + System.out.println("InitProb,"+getConf().getStrings("InitialProbability","0.02")[0]); + System.out.println("Delta,"+getConf().getStrings("Delta","0.005")[0]); + }else { + System.out.println("InvitedVertices,"+(getSuperstep()-1)+","+invitedVertices); + System.out.println("ConvincedVertices,"+(getSuperstep()-1)+","+convincedVertices); + System.out.println("DeadVertices,"+(getSuperstep()-1)+","+deadVertices); + System.out.println("AlmostConvincedVertices,"+(getSuperstep()-1)+","+almostConvincedVertices); + System.out.println("UsingVertices,"+(getSuperstep()-1)+","+usingVertices); + System.out.println("LabelReached,"+(getSuperstep()-1)+","+activeLabel); + System.out.println("HesitantVertices,"+(getSuperstep()-1)+","+hesitantVerticesAggregatorVal); + } + + + if(getSuperstep() > 0 && getTotalNumVertices()==(deadVertices+convincedVertices+hesitantVerticesAggregatorVal )) + haltComputation(); + + if(byLabel) {//Kcore or similar + if(getSuperstep()>0) { + if ( ((BooleanWritable)getAggregatedValue(timeToSwitch)).get() ) + setAggregatedValue(timeToSwitch, new BooleanWritable(false)); + if ( superstep()==1) { + setAggregatedValue(currentLabel, (LongWritable)getAggregatedValue(nextLabel)); + setAggregatedValue(nextLabel, new LongWritable(-1)); + setAggregatedValue(timeToSwitch, new BooleanWritable(true)); + }else if (activeLabel!=1){//if we haven't reached the lowest coreness + long almostConvicedVal=((LongWritable) getAggregatedValue(DiffusionMasterCompute.almostConvincedVerticesAggregator)).get(); + long invitedVal=((LongWritable) getAggregatedValue(DiffusionMasterCompute.invitedVerticesAggregator)).get(); + //if the threshold is reached or all the invited vertices are dead, convinced or hesitant + if(((double)almostConvicedVal)/invitedVal>KSwitchTreshold || invitedVertices==(deadVertices+convincedVertices+hesitantVerticesAggregatorVal) ) { + setAggregatedValue(timeToSwitch, new BooleanWritable(true)); + setAggregatedValue(currentLabel, (LongWritable)getAggregatedValue(nextLabel)); + setAggregatedValue(nextLabel, new LongWritable(-1)); + } + } + } + }else {//degree, pagerank or other similar where the label does not represent a group of vertices + if ( superstep()==0) { + setAggregatedValue(currentLabel, new LongWritable(Long.MAX_VALUE)); + setAggregatedValue(timeToSwitch, new BooleanWritable(true)); + setAggregatedValue(oldInvitedVerticesAggregator, new LongWritable(0)); + } + if(superstep()>0) { + + if ( ! ((BooleanWritable)getAggregatedValue(timeToSwitch)).get() ) { + if(activeLabel>0) { + long almostConvicedVal=((LongWritable) getAggregatedValue(DiffusionMasterCompute.almostConvincedVerticesAggregator)).get(); + long invitedVal=((LongWritable) getAggregatedValue(DiffusionMasterCompute.invitedVerticesAggregator)).get(); + //if the threshold is reached or all the invited vertices are dead, convinced or hesitant + if(((double)almostConvicedVal)/invitedVal>KSwitchTreshold || invitedVertices==(deadVertices+convincedVertices+hesitantVerticesAggregatorVal)) { + setAggregatedValue(timeToSwitch, new BooleanWritable(true)); + setAggregatedValue(oldInvitedVerticesAggregator, (LongWritable)getAggregatedValue(invitedVerticesAggregator)); + setAggregatedValue(oldConvincedVerticesAggregator, (LongWritable)getAggregatedValue(convincedVerticesAggregator)); + setAggregatedValue(oldAlmostConvincedVerticesAggregator, (LongWritable)getAggregatedValue(almostConvincedVerticesAggregator)); + setAggregatedValue(oldDeadVerticesAggregator, (LongWritable)getAggregatedValue(deadVerticesAggregator)); + setAggregatedValue(oldHesitantVerticesAggregator, (LongWritable)getAggregatedValue(hesitantVerticesAggregator)); + setAggregatedValue(oldUsingVerticesAggregator, (LongWritable)getAggregatedValue(usingVerticesAggregator)); + } + } + }else { //it's time to switch: let's scan some label until we find at least vertices more than now + long old = ((LongWritable)getAggregatedValue(oldInvitedVerticesAggregator)).get(); + long actual = ((LongWritable)getAggregatedValue(potentialVerticesAggregator)).get(); + if (actual-old>minNumber) {//reached a label which increment the invited vertices by MIN_NUMBER at least + setAggregatedValue(timeToSwitch, new BooleanWritable(false)); + setAggregatedValue(justChangedTimeToSwitch, new BooleanWritable(true)); + }else if( ((LongWritable)getAggregatedValue(nextLabel)).get()<0 && superstep()>10 ){//reached the lowest label without finding at least MIN_NUMBER vertices + setAggregatedValue(timeToSwitch, new BooleanWritable(false)); + setAggregatedValue(justChangedTimeToSwitch, new BooleanWritable(true)); + setAggregatedValue(currentLabel, new LongWritable(0)); + }else {//continue to scan + setAggregatedValue(currentLabel, (LongWritable)getAggregatedValue(nextLabel)); + setAggregatedValue(nextLabel, new LongWritable(-1)); + } + } + } + } + + + } + + @Override + public void initialize() throws InstantiationException, IllegalAccessException { + super.initialize(); + KSwitchTreshold = Double.parseDouble(getConf().getStrings("KSwitchThreshold", "0.7")[0]); + registerAggregator(convincedVerticesAggregator, LongSumAggregator.class); + registerAggregator(usingVerticesAggregator, LongSumAggregator.class); + registerAggregator(deadVerticesAggregator, LongSumAggregator.class); + registerAggregator(invitedVerticesAggregator, LongSumAggregator.class); + registerAggregator(almostConvincedVerticesAggregator, LongSumAggregator.class); + registerPersistentAggregator(nextLabel, LongMaxAggregator.class); + registerPersistentAggregator(currentLabel, LongMaxAggregator.class); + registerPersistentAggregator(timeToSwitch, BooleanOrAggregator.class); + registerAggregator(hesitantVerticesAggregator, LongSumAggregator.class); + + byLabel=Boolean.parseBoolean(getConf().getStrings("ByLabel", "true")[0]); + registerPersistentAggregator(byLabelOption, BooleanOrAggregator.class); + setAggregatedValue(byLabelOption, new BooleanWritable(byLabel)); + if(!byLabel) { + minNumber = Integer.parseInt(getConf().getStrings("minNumber", "200")[0]); + registerAggregator(potentialVerticesAggregator, LongSumAggregator.class); + registerPersistentAggregator(oldInvitedVerticesAggregator, LongSumAggregator.class); + registerPersistentAggregator(oldConvincedVerticesAggregator, LongSumAggregator.class); + registerPersistentAggregator(oldDeadVerticesAggregator, LongSumAggregator.class); + registerPersistentAggregator(oldUsingVerticesAggregator, LongSumAggregator.class); + registerPersistentAggregator(oldHesitantVerticesAggregator, LongSumAggregator.class); + registerPersistentAggregator(oldAlmostConvincedVerticesAggregator, LongSumAggregator.class); + registerAggregator(justChangedTimeToSwitch, BooleanOrAggregator.class); + + } + } + + protected long superstep() { + return getSuperstep(); + } + +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionSimulationComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionSimulationComputation.java new file mode 100644 index 000000000..c8a71f950 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionSimulationComputation.java @@ -0,0 +1,116 @@ +package org.apache.giraph.examples; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.giraph.graph.BasicComputation; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.log4j.Logger; + +import org.apache.giraph.examples.feature_diffusion_utils.datastructures.DiffusionVertexValue; + +public class DiffusionSimulationComputation extends BasicComputation { + + Logger LOG = Logger.getLogger(this.getClass()); + + @Override + public void compute(Vertex vertex, Iterable msgs) + throws IOException { + + DiffusionVertexValue value = vertex.getValue(); + if(getSuperstep()==0) { //First superstep just to know the first label to analyze + setup(value); + }else { + boolean byLabel = ((BooleanWritable)getAggregatedValue(DiffusionMasterCompute.byLabelOption)).get(); + int currentLabel = (int)((LongWritable)getAggregatedValue(DiffusionMasterCompute.currentLabel)).get(); + // aggregators must be analyzed after first superstep + boolean timeToSwitch = ((BooleanWritable)getAggregatedValue(DiffusionMasterCompute.timeToSwitch)).get(); + if (timeToSwitch)//time to switch label? + if(value.getLabel() msgs, DiffusionVertexValue value) { + Iterator it = msgs.iterator(); + int activeNeighbors = 0; + while(it.hasNext()) + activeNeighbors += it.next().get(); + if(activeNeighbors > value.getVertexThreshold()) + value.modifyCurrentActivationProbability(1); + else if(activeNeighbors < value.getVertexThreshold()) + value.modifyCurrentActivationProbability(-1); + return activeNeighbors; + } + + private void aggregateVerticesBasedOnProbability(DiffusionVertexValue value) { + if(value.isVertexConvinced()) + aggregate(DiffusionMasterCompute.convincedVerticesAggregator, new LongWritable(1)); + //LOG.info("I'm with a probability " + value.getCurrentActivationProbability()); + if(value.isVertexDead()) { //Dead aggregator update + aggregate(DiffusionMasterCompute.deadVerticesAggregator, new LongWritable(1)); + } + if(value.isAlmostConvinced()){ + aggregate(DiffusionMasterCompute.almostConvincedVerticesAggregator,new LongWritable(1)); + } + aggregate(DiffusionMasterCompute.invitedVerticesAggregator, new LongWritable(1)); + + } + +} \ No newline at end of file diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/datastructures/DiffusionVertexValue.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/datastructures/DiffusionVertexValue.java new file mode 100644 index 000000000..bbf739205 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/datastructures/DiffusionVertexValue.java @@ -0,0 +1,117 @@ +package org.apache.giraph.examples.feature_diffusion_utils.datastructures; + +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.math.BigDecimal; +import java.math.RoundingMode; +// DEBUG +//import java.util.LinkedList; +import java.util.LinkedList; + +public class DiffusionVertexValue implements Writable { + + protected int vertexThreshold; + protected int label; + protected double currentActivationProbability=0.2; + protected double delta=0.05; + protected double almostConvincedTreshold=0.7; + + public DiffusionVertexValue() { + this.vertexThreshold=1; + this.label=1; + } + + public DiffusionVertexValue(int label) { + this.vertexThreshold=1; + this.label=label; + } + + public DiffusionVertexValue( int vertexThreshold, int label) { + this.vertexThreshold=vertexThreshold; + this.label=label; + } + + public void readFields(DataInput in) throws IOException { + vertexThreshold = in.readInt(); + label = in.readInt(); + currentActivationProbability = in.readDouble(); + } + + public void write(DataOutput out) throws IOException { + out.writeInt(vertexThreshold); + out.writeInt(label); + out.writeDouble(currentActivationProbability); + } + + public double getCurrentActivationProbability() { + return currentActivationProbability; + } + + public void modifyCurrentActivationProbability(int sign) { + BigDecimal tmpcurrentActivationProbability = new BigDecimal(currentActivationProbability).add(new BigDecimal(sign*delta)).setScale(5, RoundingMode.HALF_UP); + if(tmpcurrentActivationProbability.doubleValue() > 1) + currentActivationProbability = 1; + else + currentActivationProbability = tmpcurrentActivationProbability.doubleValue(); + if(tmpcurrentActivationProbability.doubleValue() <= 0) + currentActivationProbability = 0; + } + + public boolean isVertexInvited(int currentLabel) { + return this.label >= currentLabel; + + } + + public boolean isVertexDead() { + return new BigDecimal(currentActivationProbability).setScale(2, RoundingMode.HALF_DOWN).floatValue() == 0; + } + + public boolean isVertexConvinced() { + return new BigDecimal(currentActivationProbability).setScale(2, RoundingMode.HALF_DOWN).floatValue() == 1; + } + + public void setVertexThreshold(int threshold) { + this.vertexThreshold=threshold; + } + + public int getVertexThreshold() { + return vertexThreshold; + } + + public long getLabel() { + return this.label; + } + + public boolean rollActivationDice() { + return Math.random() <= currentActivationProbability; + } + + public void setlabel(int coreness) { + this.label=coreness; + } + + public boolean isAlmostConvinced(){ + return currentActivationProbability>almostConvincedTreshold; + } + + // used at ss=0 in case of differences from default 0.2 + public void setInitialActivationProbability(double initialActivationProbability){ + this.currentActivationProbability=initialActivationProbability; + } + + public void setAlmostConvincedTreshold(double almostConvincedTreshold){ + this.almostConvincedTreshold=almostConvincedTreshold; + } + + public void setDelta(double delta) { + this.delta = delta; + } + + public String toString() { + return ""+label+","+currentActivationProbability; + } + +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/datastructures/LabelingVertexValue.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/datastructures/LabelingVertexValue.java new file mode 100644 index 000000000..7bab062af --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/datastructures/LabelingVertexValue.java @@ -0,0 +1,79 @@ +package org.apache.giraph.examples.feature_diffusion_utils.datastructures; + + + +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; + +public class LabelingVertexValue implements Writable { + + + protected long label; + protected int treshold; + protected boolean changed = false; + protected double temp; + protected HashMap neighboorsLabels=new HashMap(); + + public LabelingVertexValue(){ + this.treshold=1; + } + + public LabelingVertexValue(int treshold){ + this.treshold=treshold; + } + + public void readFields(DataInput in) throws IOException { + label=in.readLong(); + treshold=in.readInt(); + changed=in.readBoolean(); + temp=in.readDouble(); + } + + public void write(DataOutput out) throws IOException { + out.writeLong(label); + out.writeInt(treshold); + out.writeBoolean(changed); + out.writeDouble(temp); + } + + public Long getLabel() {return label;} + + public HashMap getNeighboorsLabel() {return neighboorsLabels;} + + public boolean isChanged() {return changed;} + + public void setLabel(long label) { + this.label=label; + this.changed=true; + } + + public void setChanged(boolean newChanged) { + this.changed = newChanged; + } + + + public void updateNeighboorLabel(long id,long label) { + if(!neighboorsLabels.containsKey(id)) + neighboorsLabels.put(id, label); + else + if(neighboorsLabels.get(id) > label) + neighboorsLabels.put(id, label); + } + + public double getTemp() { + return temp; + } + + public void setTemp(double temp) { + this.temp=temp; + } + public String toString() { + return ""+label+","+treshold; + } + + +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/AdjacencyListNoEdgeValueTextVertexOutputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/AdjacencyListNoEdgeValueTextVertexOutputFormat.java new file mode 100644 index 000000000..3534356a0 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/AdjacencyListNoEdgeValueTextVertexOutputFormat.java @@ -0,0 +1,66 @@ +package org.apache.giraph.examples.feature_diffusion_utils.io; + + +import org.apache.giraph.edge.Edge; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.io.formats.TextVertexOutputFormat; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +/** +* OutputFormat to write out the graph nodes as text, value-separated (by +* tabs, by default). With the default delimiter, a vertex is written out as: +* +* []+ +* +* @param Vertex index value +* @param Vertex value +* @param Edge value +*/ +@SuppressWarnings("rawtypes") +public class AdjacencyListNoEdgeValueTextVertexOutputFormat extends TextVertexOutputFormat { + + /** Split delimiter */ + public static final String LINE_TOKENIZE_VALUE = "output.delimiter"; + /** Default split delimiter */ + public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t"; + + @Override + public AdjacencyListTextVertexWriter createVertexWriter(TaskAttemptContext context) { + return new AdjacencyListTextVertexWriter(); + } + + /** + * Vertex writer associated with {@link AdjacencyListTextVertexOutputFormat}. + */ + protected class AdjacencyListTextVertexWriter extends TextVertexWriterToEachLine { + /** Cached split delimeter */ + private String delimiter; + + @Override + public void initialize(TaskAttemptContext context) throws IOException, InterruptedException { + super.initialize(context); + delimiter = getConf().get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT); + } + + @Override + public Text convertVertexToLine(Vertex vertex) throws IOException { + StringBuffer sb = new StringBuffer(vertex.getId().toString()); + sb.append(delimiter); + sb.append(vertex.getValue()); + sb.append(delimiter); + + for (Edge edge : vertex.getEdges()) { + sb.append(edge.getTargetVertexId()).append(","); + //sb.append(delimiter).append(edge.getValue()); + } + sb.setLength(sb.length()-1); + return new Text(sb.toString()); + } + } + +} \ No newline at end of file diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/DiffusionVertexInputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/DiffusionVertexInputFormat.java new file mode 100644 index 000000000..416bac274 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/DiffusionVertexInputFormat.java @@ -0,0 +1,66 @@ +package org.apache.giraph.examples.feature_diffusion_utils.io; + +import com.google.common.collect.Lists; + +import datastructures.DiffusionVertexValue; + +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; +import org.apache.giraph.io.formats.TextVertexInputFormat; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.List; + +public class DiffusionVertexInputFormat extends TextVertexInputFormat { + + @Override + public TextVertexReader createVertexReader( + InputSplit arg0, TaskAttemptContext arg1) throws IOException { + return new DiffusionVertexReader(); + } + + protected class DiffusionVertexReader extends TextVertexReaderFromEachLine{ + + @Override + protected Iterable> getEdges(Text line) throws IOException { + String[] fA = line.toString().split("\t"); + String[] edgeArray = fA[fA.length-1].split(","); + List> edges = Lists.newArrayList(); + int i; + for (i = 0; i < edgeArray.length; ++i) { + long neighborId = Long.parseLong(edgeArray[i]); + edges.add(EdgeFactory.create(new LongWritable(neighborId), + NullWritable.get())); + } + return edges; + } + + @Override + protected LongWritable getId(Text line) throws IOException { + return new LongWritable(Long.parseLong(line.toString().split("\t")[0])); + } + + @Override + protected DiffusionVertexValue getValue(Text line) throws IOException { + String[] split = line.toString().split("\t"); + String value=split[1]; + String [] reSplit=value.split(","); + if(reSplit.length==2) { + int treshold= Integer.parseInt(reSplit[1]); + int label=Integer.parseInt(reSplit[0]); + return new DiffusionVertexValue(treshold,label); + + }else { + int label=Integer.parseInt(reSplit[0]); + return new DiffusionVertexValue(label); + + } + } + } + +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/LabelingInputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/LabelingInputFormat.java new file mode 100644 index 000000000..9ad30179a --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/LabelingInputFormat.java @@ -0,0 +1,62 @@ +package org.apache.giraph.examples.feature_diffusion_utils.io; + +import com.google.common.collect.Lists; + +import datastructures.LabelingVertexValue; + +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; +import org.apache.giraph.io.formats.TextVertexInputFormat; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.List; + +public class LabelingInputFormat extends TextVertexInputFormat { + + @Override + public TextVertexReader createVertexReader( + InputSplit arg0, TaskAttemptContext arg1) throws IOException { + return new DiffusionVertexReader(); + } + + protected class DiffusionVertexReader extends TextVertexReaderFromEachLine{ + + @Override + protected Iterable> getEdges(Text line) throws IOException { + String[] fA = line.toString().split("\t"); + String[] edgeArray = fA[fA.length-1].split(","); + List> edges = Lists.newArrayList(); + int i; + for (i = 0; i < edgeArray.length; ++i) { + long neighborId = Long.parseLong(edgeArray[i]); + edges.add(EdgeFactory.create(new LongWritable(neighborId), + NullWritable.get())); + } + return edges; + } + + @Override + protected LongWritable getId(Text line) throws IOException { + return new LongWritable(Long.parseLong(line.toString().split("\t")[0])); + } + + @Override + protected LabelingVertexValue getValue(Text line) throws IOException { + String[] split = line.toString().split("\t"); + if (split.length==2){ + return new LabelingVertexValue(); + }else { + String treshold = split[split.length-2]; + return new LabelingVertexValue(Integer.parseInt(treshold)); + } + + } + + } + +} \ No newline at end of file diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/DegreeLabelingSimulationComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/DegreeLabelingSimulationComputation.java new file mode 100644 index 000000000..039325e04 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/DegreeLabelingSimulationComputation.java @@ -0,0 +1,27 @@ +package org.apache.giraph.examples.feature_diffusion_utils.labeling; + +import java.io.IOException; + +import org.apache.giraph.graph.BasicComputation; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +import org.apache.giraph.examples.feature_diffusion_utils.datastructures.LabelingVertexValue; + +public class DegreeLabelingSimulationComputation extends BasicComputation { + + + Logger LOG = Logger.getLogger(this.getClass()); + + @Override + public void compute(Vertex vertex, Iterable msgs) + throws IOException { + LabelingVertexValue value = vertex.getValue(); + value.setLabel(vertex.getNumEdges()); + vertex.voteToHalt(); + } + +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/FlatTimeLabelingSimulationComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/FlatTimeLabelingSimulationComputation.java new file mode 100644 index 000000000..75767e013 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/FlatTimeLabelingSimulationComputation.java @@ -0,0 +1,26 @@ +package org.apache.giraph.examples.feature_diffusion_utils.labeling; + +import java.io.IOException; + +import org.apache.giraph.graph.BasicComputation; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +import org.apache.giraph.examples.feature_diffusion_utils.datastructures.LabelingVertexValue; + +public class FlatTimeLabelingSimulationComputation extends BasicComputation { + + Logger LOG = Logger.getLogger(this.getClass()); + + @Override + public void compute(Vertex vertex, Iterable msgs) + throws IOException { + LabelingVertexValue value = vertex.getValue(); + value.setLabel(1); + vertex.voteToHalt(); + } + +} \ No newline at end of file diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/KCoreLabelingSimulationComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/KCoreLabelingSimulationComputation.java new file mode 100644 index 000000000..6ff7bde60 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/KCoreLabelingSimulationComputation.java @@ -0,0 +1,77 @@ +package org.apache.giraph.examples.feature_diffusion_utils.labeling; + +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.comm.WorkerClientRequestProcessor; +import org.apache.giraph.graph.BasicComputation; +import org.apache.giraph.graph.GraphState; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.worker.WorkerGlobalCommUsage; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.log4j.Logger; + + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map.Entry; + +import org.apache.giraph.examples.feature_diffusion_utils.datastructures.LabelingVertexValue; + +@SuppressWarnings("unused") +public class KCoreLabelingSimulationComputation extends BasicComputation { + + Logger LOG = Logger.getLogger(this.getClass()); + + @Override + public void compute(Vertex vertex, Iterable msgs) + throws IOException { + //delta=Double.parseDouble(getConf().getStrings("Delta", "0.005")[0]); + LabelingVertexValue value = vertex.getValue(); + if(getSuperstep()==0) { + value.setLabel(Integer.max(vertex.getNumEdges(),1)); + //value.setNeighboorsLabel(vertex.getNumEdges()); + sendMessageToAllEdges(vertex, new Text(""+vertex.getId().get()+" "+value.getLabel())); + value.setChanged(false); + }else { + + for(Text msg: msgs) { + long id = Long.parseLong(msg.toString().split(" ")[0]); + int coreness = Integer.parseInt(msg.toString().split(" ")[1]); + value.updateNeighboorLabel(id, coreness); + } + + int tempLabel = computeIndex(value.getNeighboorsLabel(),value.getLabel()); + if (tempLabel neighboorsLabel, long coreness) { + int[] corenessCount = new int[(int) coreness]; + for (int i = 0 ; i pair: neighboorsLabel.entrySet()) { + long corenessCandidate =Long.min( pair.getValue() , coreness); + corenessCount[(int)corenessCandidate-1]++; + } + for (int i=(int) (coreness-1); i>0 ; i--) + corenessCount[i-1]+=corenessCount[i]; + int i = (int) coreness; + while(i>1 && corenessCount[i-1] { + + + Logger LOG = Logger.getLogger(this.getClass()); + + @Override + public void compute(Vertex vertex, Iterable msgs) + throws IOException { + //delta=Double.parseDouble(getConf().getStrings("Delta", "0.005")[0]); + LabelingVertexValue value = vertex.getValue(); + if (getSuperstep() >= 1) { + double sum = 0; + for (Text message : msgs) { + sum += Double.parseDouble(message.toString()); + } + double pr=((0.15f / getTotalNumVertices()) + 0.85f * sum); + value.setTemp(pr);//to change, removing inty + + System.out.println("SIamo al SS "+getSuperstep()+" sono il vertice "+vertex.getId()+" e ho pr "+pr); + + + } + if (getSuperstep() < 50) { + sendMessageToAllEdges(vertex, new Text( ""+ (value.getTemp() / vertex.getNumEdges()) ) ); + } else { + int cif= (int)(Math.log10(getTotalNumVertices())+2); + value.setLabel((long)(value.getTemp()*Math.pow(10, cif))); + vertex.voteToHalt(); + } + } + +} \ No newline at end of file From 27eed7e5ad6f86aa6813e23a82357ccce3ab23e9 Mon Sep 17 00:00:00 2001 From: RufoJ Date: Thu, 18 Oct 2018 09:53:38 +0200 Subject: [PATCH 2/5] Feature diffusion computation implemented in Blocks-Framework through Migration utils --- .../DiffusionMigrationBlockFactory.java | 39 ++++++++++++++++ ...a => DiffusionMigrationMasterCompute.java} | 14 +++--- ...fusionMigrationSimulationComputation.java} | 46 ++++++++++++------- 3 files changed, 75 insertions(+), 24 deletions(-) create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMigrationBlockFactory.java rename giraph-examples/src/main/java/org/apache/giraph/examples/{DiffusionMasterCompute.java => DiffusionMigrationMasterCompute.java} (95%) rename giraph-examples/src/main/java/org/apache/giraph/examples/{DiffusionSimulationComputation.java => DiffusionMigrationSimulationComputation.java} (64%) diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMigrationBlockFactory.java b/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMigrationBlockFactory.java new file mode 100644 index 000000000..1d43099f3 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMigrationBlockFactory.java @@ -0,0 +1,39 @@ +package org.apache.giraph.examples; + +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.migration.MigrationFullBlockFactory; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; + +import org.apache.giraph.examples.feature_diffusion_utils.datastructures.DiffusionVertexValue; + + +public class DiffusionMigrationBlockFactory extends MigrationFullBlockFactory { + + public Block createBlock(GiraphConfiguration conf) { + return createMigrationAppBlock( + DiffusionMigrationSimulationComputation.class, + new DiffusionMigrationMasterCompute(), + IntWritable.class, + null, + conf); + } + + @Override + protected Class getEdgeValueClass(GiraphConfiguration arg0) { + return NullWritable.class; + } + + @Override + protected Class getVertexIDClass(GiraphConfiguration arg0) { + return LongWritable.class; + } + + @Override + protected Class getVertexValueClass(GiraphConfiguration arg0) { + return DiffusionVertexValue.class; + } + +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMasterCompute.java b/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMigrationMasterCompute.java similarity index 95% rename from giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMasterCompute.java rename to giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMigrationMasterCompute.java index c180818b5..aea061100 100644 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMasterCompute.java +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMigrationMasterCompute.java @@ -4,15 +4,15 @@ import org.apache.giraph.aggregators.BooleanOrAggregator; import org.apache.giraph.aggregators.LongMaxAggregator; import org.apache.giraph.aggregators.LongSumAggregator; -import org.apache.giraph.master.DefaultMasterCompute; +import org.apache.giraph.block_app.migration.MigrationMasterCompute.MigrationFullMasterCompute; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.LongWritable; import org.apache.log4j.Logger; -public class DiffusionMasterCompute extends DefaultMasterCompute { +public class DiffusionMigrationMasterCompute extends MigrationFullMasterCompute { - protected Logger LOG = Logger.getLogger(DiffusionMasterCompute.class); + protected Logger LOG = Logger.getLogger(DiffusionMigrationMasterCompute.class); public static final String convincedVerticesAggregator = "CONV_AGG_DIFF"; public static final String usingVerticesAggregator = "AGG_DIFF"; @@ -109,8 +109,8 @@ public void compute() { setAggregatedValue(nextLabel, new LongWritable(-1)); setAggregatedValue(timeToSwitch, new BooleanWritable(true)); }else if (activeLabel!=1){//if we haven't reached the lowest coreness - long almostConvicedVal=((LongWritable) getAggregatedValue(DiffusionMasterCompute.almostConvincedVerticesAggregator)).get(); - long invitedVal=((LongWritable) getAggregatedValue(DiffusionMasterCompute.invitedVerticesAggregator)).get(); + long almostConvicedVal=((LongWritable) getAggregatedValue(DiffusionMigrationMasterCompute .almostConvincedVerticesAggregator)).get(); + long invitedVal=((LongWritable) getAggregatedValue(DiffusionMigrationMasterCompute .invitedVerticesAggregator)).get(); //if the threshold is reached or all the invited vertices are dead, convinced or hesitant if(((double)almostConvicedVal)/invitedVal>KSwitchTreshold || invitedVertices==(deadVertices+convincedVertices+hesitantVerticesAggregatorVal) ) { setAggregatedValue(timeToSwitch, new BooleanWritable(true)); @@ -129,8 +129,8 @@ public void compute() { if ( ! ((BooleanWritable)getAggregatedValue(timeToSwitch)).get() ) { if(activeLabel>0) { - long almostConvicedVal=((LongWritable) getAggregatedValue(DiffusionMasterCompute.almostConvincedVerticesAggregator)).get(); - long invitedVal=((LongWritable) getAggregatedValue(DiffusionMasterCompute.invitedVerticesAggregator)).get(); + long almostConvicedVal=((LongWritable) getAggregatedValue(DiffusionMigrationMasterCompute .almostConvincedVerticesAggregator)).get(); + long invitedVal=((LongWritable) getAggregatedValue(DiffusionMigrationMasterCompute .invitedVerticesAggregator)).get(); //if the threshold is reached or all the invited vertices are dead, convinced or hesitant if(((double)almostConvicedVal)/invitedVal>KSwitchTreshold || invitedVertices==(deadVertices+convincedVertices+hesitantVerticesAggregatorVal)) { setAggregatedValue(timeToSwitch, new BooleanWritable(true)); diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionSimulationComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMigrationSimulationComputation.java similarity index 64% rename from giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionSimulationComputation.java rename to giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMigrationSimulationComputation.java index c8a71f950..03f5483e7 100644 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionSimulationComputation.java +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMigrationSimulationComputation.java @@ -3,7 +3,7 @@ import java.io.IOException; import java.util.Iterator; -import org.apache.giraph.graph.BasicComputation; +import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullBasicComputation; import org.apache.giraph.graph.Vertex; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.IntWritable; @@ -13,10 +13,22 @@ import org.apache.giraph.examples.feature_diffusion_utils.datastructures.DiffusionVertexValue; -public class DiffusionSimulationComputation extends BasicComputation { +public class DiffusionMigrationSimulationComputation extends MigrationFullBasicComputation { Logger LOG = Logger.getLogger(this.getClass()); + + + /*public void initialize(GraphState graphState, + WorkerClientRequestProcessor workerClientRequestProcessor, + CentralizedServiceWorker serviceWorker, + WorkerGlobalCommUsage workerGlobalCommUsage) { + super.initialize(graphState, workerClientRequestProcessor, serviceWorker, workerGlobalCommUsage); + delta = getConf().getDouble(DiffusionMigrationMasterCompute.diffusionDeltaOption, DiffusionMigrationMasterCompute.diffusionDeltaOptionDefault); + modelSwitch = getConf().getBoolean(DiffusionMigrationMasterCompute.diffusionListenOption, false); + + }*/ + @Override public void compute(Vertex vertex, Iterable msgs) throws IOException { @@ -25,42 +37,42 @@ public void compute(Vertex ver if(getSuperstep()==0) { //First superstep just to know the first label to analyze setup(value); }else { - boolean byLabel = ((BooleanWritable)getAggregatedValue(DiffusionMasterCompute.byLabelOption)).get(); - int currentLabel = (int)((LongWritable)getAggregatedValue(DiffusionMasterCompute.currentLabel)).get(); + boolean byLabel = ((BooleanWritable)getAggregatedValue(DiffusionMigrationMasterCompute.byLabelOption)).get(); + int currentLabel = (int)((LongWritable)getAggregatedValue(DiffusionMigrationMasterCompute.currentLabel)).get(); // aggregators must be analyzed after first superstep - boolean timeToSwitch = ((BooleanWritable)getAggregatedValue(DiffusionMasterCompute.timeToSwitch)).get(); + boolean timeToSwitch = ((BooleanWritable)getAggregatedValue(DiffusionMigrationMasterCompute.timeToSwitch)).get(); if (timeToSwitch)//time to switch label? if(value.getLabel() Date: Fri, 19 Oct 2018 15:36:32 +0200 Subject: [PATCH 3/5] Labeling computation implemented in Blocks-Framework through Migration utils --- .../io/DiffusionVertexInputFormat.java | 2 +- .../io/LabelingInputFormat.java | 2 +- ...abelingMigrationSimulationComputation.java | 38 +++++++++++++++++ .../DegreeLabelingSimulationComputation.java | 27 ------------ ...abelingMigrationSimulationComputation.java | 36 ++++++++++++++++ ...FlatTimeLabelingSimulationComputation.java | 26 ------------ ...belingMigrationSimulationComputation.java} | 21 +++++++--- .../LabelingMigrationBlockFactory.java | 42 +++++++++++++++++++ ...belingMigrationSimulationComputation.java} | 15 ++++++- 9 files changed, 147 insertions(+), 62 deletions(-) create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/DegreeLabelingMigrationSimulationComputation.java delete mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/DegreeLabelingSimulationComputation.java create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/FlatTimeLabelingMigrationSimulationComputation.java delete mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/FlatTimeLabelingSimulationComputation.java rename giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/{KCoreLabelingSimulationComputation.java => KCoreLabelingMigrationSimulationComputation.java} (69%) create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/LabelingMigrationBlockFactory.java rename giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/{PageRankLabelingSimulationComputation.java => PageRankLabelingMigrationSimulationComputation.java} (61%) diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/DiffusionVertexInputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/DiffusionVertexInputFormat.java index 416bac274..2ffa2d11e 100644 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/DiffusionVertexInputFormat.java +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/DiffusionVertexInputFormat.java @@ -2,7 +2,7 @@ import com.google.common.collect.Lists; -import datastructures.DiffusionVertexValue; +import org.apache.giraph.examples.feature_diffusion_utils.datastructures.DiffusionVertexValue; import org.apache.giraph.edge.Edge; import org.apache.giraph.edge.EdgeFactory; diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/LabelingInputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/LabelingInputFormat.java index 9ad30179a..91471c01b 100644 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/LabelingInputFormat.java +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/LabelingInputFormat.java @@ -2,7 +2,7 @@ import com.google.common.collect.Lists; -import datastructures.LabelingVertexValue; +import org.apache.giraph.examples.feature_diffusion_utils.datastructures.LabelingVertexValue; import org.apache.giraph.edge.Edge; import org.apache.giraph.edge.EdgeFactory; diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/DegreeLabelingMigrationSimulationComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/DegreeLabelingMigrationSimulationComputation.java new file mode 100644 index 000000000..502be2a0c --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/DegreeLabelingMigrationSimulationComputation.java @@ -0,0 +1,38 @@ +package org.apache.giraph.examples.feature_diffusion_utils.labeling; + +import java.io.IOException; + +import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullBasicComputation; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +import org.apache.giraph.examples.feature_diffusion_utils.datastructures.LabelingVertexValue; + +public class DegreeLabelingMigrationSimulationComputation extends MigrationFullBasicComputation { + + + Logger LOG = Logger.getLogger(this.getClass()); + + + /*public void initialize(GraphState graphState, + WorkerClientRequestProcessor workerClientRequestProcessor, + CentralizedServiceWorker serviceWorker, + WorkerGlobalCommUsage workerGlobalCommUsage) { + super.initialize(graphState, workerClientRequestProcessor, serviceWorker, workerGlobalCommUsage); + delta = getConf().getDouble(DiffusionMasterCompute.diffusionDeltaOption, DiffusionMasterCompute.diffusionDeltaOptionDefault); + modelSwitch = getConf().getBoolean(DiffusionMasterCompute.diffusionListenOption, false); + + }*/ + + @Override + public void compute(Vertex vertex, Iterable msgs) + throws IOException { + LabelingVertexValue value = vertex.getValue(); + value.setLabel(vertex.getNumEdges()); + vertex.voteToHalt(); + } + +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/DegreeLabelingSimulationComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/DegreeLabelingSimulationComputation.java deleted file mode 100644 index 039325e04..000000000 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/DegreeLabelingSimulationComputation.java +++ /dev/null @@ -1,27 +0,0 @@ -package org.apache.giraph.examples.feature_diffusion_utils.labeling; - -import java.io.IOException; - -import org.apache.giraph.graph.BasicComputation; -import org.apache.giraph.graph.Vertex; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; - -import org.apache.giraph.examples.feature_diffusion_utils.datastructures.LabelingVertexValue; - -public class DegreeLabelingSimulationComputation extends BasicComputation { - - - Logger LOG = Logger.getLogger(this.getClass()); - - @Override - public void compute(Vertex vertex, Iterable msgs) - throws IOException { - LabelingVertexValue value = vertex.getValue(); - value.setLabel(vertex.getNumEdges()); - vertex.voteToHalt(); - } - -} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/FlatTimeLabelingMigrationSimulationComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/FlatTimeLabelingMigrationSimulationComputation.java new file mode 100644 index 000000000..c0ea7432e --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/FlatTimeLabelingMigrationSimulationComputation.java @@ -0,0 +1,36 @@ +package org.apache.giraph.examples.feature_diffusion_utils.labeling; + +import java.io.IOException; + +import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullBasicComputation; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +import org.apache.giraph.examples.feature_diffusion_utils.datastructures.LabelingVertexValue; + +public class FlatTimeLabelingMigrationSimulationComputation extends MigrationFullBasicComputation { + + Logger LOG = Logger.getLogger(this.getClass()); + + /*public void initialize(GraphState graphState, + WorkerClientRequestProcessor workerClientRequestProcessor, + CentralizedServiceWorker serviceWorker, + WorkerGlobalCommUsage workerGlobalCommUsage) { + super.initialize(graphState, workerClientRequestProcessor, serviceWorker, workerGlobalCommUsage); + delta = getConf().getDouble(DiffusionMasterCompute.diffusionDeltaOption, DiffusionMasterCompute.diffusionDeltaOptionDefault); + modelSwitch = getConf().getBoolean(DiffusionMasterCompute.diffusionListenOption, false); + + }*/ + + @Override + public void compute(Vertex vertex, Iterable msgs) + throws IOException { + LabelingVertexValue value = vertex.getValue(); + value.setLabel(1); + vertex.voteToHalt(); + } + +} \ No newline at end of file diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/FlatTimeLabelingSimulationComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/FlatTimeLabelingSimulationComputation.java deleted file mode 100644 index 75767e013..000000000 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/FlatTimeLabelingSimulationComputation.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.apache.giraph.examples.feature_diffusion_utils.labeling; - -import java.io.IOException; - -import org.apache.giraph.graph.BasicComputation; -import org.apache.giraph.graph.Vertex; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; - -import org.apache.giraph.examples.feature_diffusion_utils.datastructures.LabelingVertexValue; - -public class FlatTimeLabelingSimulationComputation extends BasicComputation { - - Logger LOG = Logger.getLogger(this.getClass()); - - @Override - public void compute(Vertex vertex, Iterable msgs) - throws IOException { - LabelingVertexValue value = vertex.getValue(); - value.setLabel(1); - vertex.voteToHalt(); - } - -} \ No newline at end of file diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/KCoreLabelingSimulationComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/KCoreLabelingMigrationSimulationComputation.java similarity index 69% rename from giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/KCoreLabelingSimulationComputation.java rename to giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/KCoreLabelingMigrationSimulationComputation.java index 6ff7bde60..fce7280b7 100644 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/KCoreLabelingSimulationComputation.java +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/KCoreLabelingMigrationSimulationComputation.java @@ -1,8 +1,8 @@ package org.apache.giraph.examples.feature_diffusion_utils.labeling; +import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullBasicComputation; import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.WorkerClientRequestProcessor; -import org.apache.giraph.graph.BasicComputation; import org.apache.giraph.graph.GraphState; import org.apache.giraph.graph.Vertex; import org.apache.giraph.worker.WorkerGlobalCommUsage; @@ -20,20 +20,31 @@ import java.util.Iterator; import java.util.Map.Entry; -import org.apache.giraph.examples.feature_diffusion_utils.datastructures.LabelingVertexValue; +import org.apache.giraph.examples.feature_diffusion_utils.datastructures.*; @SuppressWarnings("unused") -public class KCoreLabelingSimulationComputation extends BasicComputation { +public class KCoreLabelingMigrationSimulationComputation extends MigrationFullBasicComputation { Logger LOG = Logger.getLogger(this.getClass()); + + /*public void initialize(GraphState graphState, + WorkerClientRequestProcessor workerClientRequestProcessor, + CentralizedServiceWorker serviceWorker, + WorkerGlobalCommUsage workerGlobalCommUsage) { + super.initialize(graphState, workerClientRequestProcessor, serviceWorker, workerGlobalCommUsage); + delta = getConf().getDouble(DiffusionMasterCompute.diffusionDeltaOption, DiffusionMasterCompute.diffusionDeltaOptionDefault); + modelSwitch = getConf().getBoolean(DiffusionMasterCompute.diffusionListenOption, false); + + }*/ + @Override public void compute(Vertex vertex, Iterable msgs) throws IOException { //delta=Double.parseDouble(getConf().getStrings("Delta", "0.005")[0]); LabelingVertexValue value = vertex.getValue(); if(getSuperstep()==0) { - value.setLabel(Integer.max(vertex.getNumEdges(),1)); + value.setLabel(Math.max(vertex.getNumEdges(),1)); //value.setNeighboorsLabel(vertex.getNumEdges()); sendMessageToAllEdges(vertex, new Text(""+vertex.getId().get()+" "+value.getLabel())); value.setChanged(false); @@ -62,7 +73,7 @@ private int computeIndex(HashMap neighboorsLabel, long coreness) { for (int i = 0 ; i pair: neighboorsLabel.entrySet()) { - long corenessCandidate =Long.min( pair.getValue() , coreness); + long corenessCandidate =Math.min( pair.getValue() , coreness); corenessCount[(int)corenessCandidate-1]++; } for (int i=(int) (coreness-1); i>0 ; i--) diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/LabelingMigrationBlockFactory.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/LabelingMigrationBlockFactory.java new file mode 100644 index 000000000..f5049375b --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/LabelingMigrationBlockFactory.java @@ -0,0 +1,42 @@ +package org.apache.giraph.examples.feature_diffusion_utils.labeling; + +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullBasicComputation; +import org.apache.giraph.block_app.migration.MigrationMasterCompute.MigrationFullMasterCompute; +import org.apache.giraph.block_app.migration.MigrationFullBlockFactory; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; + +import org.apache.giraph.examples.feature_diffusion_utils.datastructures.LabelingVertexValue; + +public class LabelingMigrationBlockFactory extends MigrationFullBlockFactory{ + + public Block createBlock(GiraphConfiguration conf) { + @SuppressWarnings("unchecked") + Class> computationClass=(Class>)conf.getClass("giraph.typesHolder", KCoreLabelingMigrationSimulationComputation.class); + return createMigrationAppBlock( + computationClass, + new MigrationFullMasterCompute(), + Text.class, + null, + conf); + } + + @Override + protected Class getEdgeValueClass(GiraphConfiguration arg0) { + return NullWritable.class; + } + + @Override + protected Class getVertexIDClass(GiraphConfiguration arg0) { + return LongWritable.class; + } + + @Override + protected Class getVertexValueClass(GiraphConfiguration arg0) { + return LabelingVertexValue.class; + } + +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/PageRankLabelingSimulationComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/PageRankLabelingMigrationSimulationComputation.java similarity index 61% rename from giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/PageRankLabelingSimulationComputation.java rename to giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/PageRankLabelingMigrationSimulationComputation.java index 8999ab851..390c4216c 100644 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/PageRankLabelingSimulationComputation.java +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/PageRankLabelingMigrationSimulationComputation.java @@ -2,7 +2,7 @@ import java.io.IOException; -import org.apache.giraph.graph.BasicComputation; +import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullBasicComputation; import org.apache.giraph.graph.Vertex; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; @@ -11,11 +11,22 @@ import org.apache.giraph.examples.feature_diffusion_utils.datastructures.LabelingVertexValue; -public class PageRankLabelingSimulationComputation extends BasicComputation { +public class PageRankLabelingMigrationSimulationComputation extends MigrationFullBasicComputation { Logger LOG = Logger.getLogger(this.getClass()); + + /*public void initialize(GraphState graphState, + WorkerClientRequestProcessor workerClientRequestProcessor, + CentralizedServiceWorker serviceWorker, + WorkerGlobalCommUsage workerGlobalCommUsage) { + super.initialize(graphState, workerClientRequestProcessor, serviceWorker, workerGlobalCommUsage); + delta = getConf().getDouble(DiffusionMasterCompute.diffusionDeltaOption, DiffusionMasterCompute.diffusionDeltaOptionDefault); + modelSwitch = getConf().getBoolean(DiffusionMasterCompute.diffusionListenOption, false); + + }*/ + @Override public void compute(Vertex vertex, Iterable msgs) throws IOException { From 92a51669925f1fe3484a88594cb0a1a85d7066ea Mon Sep 17 00:00:00 2001 From: RufoJ Date: Wed, 28 Nov 2018 17:37:53 +0100 Subject: [PATCH 4/5] -Fix a bug causing the initial vertices to have lowered init probablity -Minor changes --- ...ffusionMigrationSimulationComputation.java | 2 +- .../datastructures/DiffusionVertexValue.java | 6 ++--- ...abelingMigrationSimulationComputation.java | 8 +++---- ...abelingMigrationSimulationComputation.java | 22 ++++++++----------- 4 files changed, 16 insertions(+), 22 deletions(-) diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMigrationSimulationComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMigrationSimulationComputation.java index 03f5483e7..424c2c173 100644 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMigrationSimulationComputation.java +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMigrationSimulationComputation.java @@ -46,7 +46,7 @@ public void compute(Vertex ver aggregate(DiffusionMigrationMasterCompute.nextLabel, new LongWritable(value.getLabel())); if(value.isVertexInvited(currentLabel)) { if (byLabel) { - if(!value.isVertexDead()) { //Update the using probability, if not dead + if(!value.isVertexDead() && getSuperstep()!=1) { //Update the using probability, if not dead int activeNeighbors = checkMsgsAndUpdateProbability(msgs, value); if(activeNeighbors==value.getVertexThreshold()) aggregate(DiffusionMigrationMasterCompute.hesitantVerticesAggregator,new LongWritable(1)); diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/datastructures/DiffusionVertexValue.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/datastructures/DiffusionVertexValue.java index bbf739205..2edb4a6c2 100644 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/datastructures/DiffusionVertexValue.java +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/datastructures/DiffusionVertexValue.java @@ -23,7 +23,7 @@ public DiffusionVertexValue() { this.vertexThreshold=1; this.label=1; } - + public DiffusionVertexValue(int label) { this.vertexThreshold=1; this.label=label; @@ -45,7 +45,7 @@ public void write(DataOutput out) throws IOException { out.writeInt(label); out.writeDouble(currentActivationProbability); } - + public double getCurrentActivationProbability() { return currentActivationProbability; } @@ -76,7 +76,7 @@ public boolean isVertexConvinced() { public void setVertexThreshold(int threshold) { this.vertexThreshold=threshold; } - + public int getVertexThreshold() { return vertexThreshold; } diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/KCoreLabelingMigrationSimulationComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/KCoreLabelingMigrationSimulationComputation.java index fce7280b7..4686706ec 100644 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/KCoreLabelingMigrationSimulationComputation.java +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/KCoreLabelingMigrationSimulationComputation.java @@ -40,22 +40,20 @@ public class KCoreLabelingMigrationSimulationComputation extends MigrationFullBa @Override public void compute(Vertex vertex, Iterable msgs) - throws IOException { - //delta=Double.parseDouble(getConf().getStrings("Delta", "0.005")[0]); + throws IOException { LabelingVertexValue value = vertex.getValue(); if(getSuperstep()==0) { value.setLabel(Math.max(vertex.getNumEdges(),1)); - //value.setNeighboorsLabel(vertex.getNumEdges()); sendMessageToAllEdges(vertex, new Text(""+vertex.getId().get()+" "+value.getLabel())); value.setChanged(false); }else { - + for(Text msg: msgs) { long id = Long.parseLong(msg.toString().split(" ")[0]); int coreness = Integer.parseInt(msg.toString().split(" ")[1]); value.updateNeighboorLabel(id, coreness); } - + int tempLabel = computeIndex(value.getNeighboorsLabel(),value.getLabel()); if (tempLabel { @@ -29,20 +29,16 @@ public class PageRankLabelingMigrationSimulationComputation extends MigrationFul @Override public void compute(Vertex vertex, Iterable msgs) - throws IOException { - //delta=Double.parseDouble(getConf().getStrings("Delta", "0.005")[0]); + throws IOException { LabelingVertexValue value = vertex.getValue(); if (getSuperstep() >= 1) { - double sum = 0; - for (Text message : msgs) { - sum += Double.parseDouble(message.toString()); - } - double pr=((0.15f / getTotalNumVertices()) + 0.85f * sum); - value.setTemp(pr);//to change, removing inty - - System.out.println("SIamo al SS "+getSuperstep()+" sono il vertice "+vertex.getId()+" e ho pr "+pr); - - + double sum = 0; + for (Text message : msgs) { + sum += Double.parseDouble(message.toString()); + } + double pr=((0.15f / getTotalNumVertices()) + 0.85f * sum); + value.setTemp(pr);//to change, removing + } if (getSuperstep() < 50) { sendMessageToAllEdges(vertex, new Text( ""+ (value.getTemp() / vertex.getNumEdges()) ) ); From 3a4ab482816ecb5fbeb3b9747e88e275034f3213 Mon Sep 17 00:00:00 2001 From: RufoJ Date: Fri, 18 Jan 2019 18:18:03 +0100 Subject: [PATCH 5/5] Full implementation in Block Framework API --- .../DiffusionMigrationBlockFactory.java | 39 -- ...ffusionMigrationSimulationComputation.java | 128 ------ .../datastructures/DiffusionVertexValue.java | 134 ++++++ .../datastructures/LabelingVertexValue.java | 77 ++++ .../diffusion/DiffusionConstants.java | 40 ++ .../factory/DiffusionBlockFactory.java | 98 +++++ .../piece/DiffusionComputationPiece.java | 387 ++++++++++++++++++ ...ListNoEdgeValueTextVertexOutputFormat.java | 64 +++ .../io/DiffusionVertexInputFormat.java | 64 +++ .../io/LabelingInputFormat.java | 59 +++ .../factory/DegreeLabelingBlockFactory.java | 48 +++ .../factory/FlatTimeLabelingBlockFactory.java | 49 +++ .../factory/KCoreLabelingBlockFactory.java | 130 ++++++ .../factory/PageRankLabelingBlockFactory.java | 84 ++++ .../datastructures/DiffusionVertexValue.java | 117 ------ .../datastructures/LabelingVertexValue.java | 79 ---- ...ListNoEdgeValueTextVertexOutputFormat.java | 66 --- .../io/DiffusionVertexInputFormat.java | 66 --- .../io/LabelingInputFormat.java | 62 --- ...abelingMigrationSimulationComputation.java | 38 -- ...abelingMigrationSimulationComputation.java | 36 -- ...abelingMigrationSimulationComputation.java | 86 ---- .../LabelingMigrationBlockFactory.java | 42 -- ...abelingMigrationSimulationComputation.java | 52 --- 24 files changed, 1234 insertions(+), 811 deletions(-) delete mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMigrationBlockFactory.java delete mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMigrationSimulationComputation.java create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/datastructures/DiffusionVertexValue.java create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/datastructures/LabelingVertexValue.java create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/diffusion/DiffusionConstants.java create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/diffusion/block_app/factory/DiffusionBlockFactory.java create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/diffusion/block_app/piece/DiffusionComputationPiece.java create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/io/AdjacencyListNoEdgeValueTextVertexOutputFormat.java create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/io/DiffusionVertexInputFormat.java create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/io/LabelingInputFormat.java create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/DegreeLabelingBlockFactory.java create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/FlatTimeLabelingBlockFactory.java create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/KCoreLabelingBlockFactory.java create mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/PageRankLabelingBlockFactory.java delete mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/datastructures/DiffusionVertexValue.java delete mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/datastructures/LabelingVertexValue.java delete mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/AdjacencyListNoEdgeValueTextVertexOutputFormat.java delete mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/DiffusionVertexInputFormat.java delete mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/LabelingInputFormat.java delete mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/DegreeLabelingMigrationSimulationComputation.java delete mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/FlatTimeLabelingMigrationSimulationComputation.java delete mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/KCoreLabelingMigrationSimulationComputation.java delete mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/LabelingMigrationBlockFactory.java delete mode 100644 giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/PageRankLabelingMigrationSimulationComputation.java diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMigrationBlockFactory.java b/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMigrationBlockFactory.java deleted file mode 100644 index 1d43099f3..000000000 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMigrationBlockFactory.java +++ /dev/null @@ -1,39 +0,0 @@ -package org.apache.giraph.examples; - -import org.apache.giraph.block_app.framework.block.Block; -import org.apache.giraph.block_app.migration.MigrationFullBlockFactory; -import org.apache.giraph.conf.GiraphConfiguration; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; - -import org.apache.giraph.examples.feature_diffusion_utils.datastructures.DiffusionVertexValue; - - -public class DiffusionMigrationBlockFactory extends MigrationFullBlockFactory { - - public Block createBlock(GiraphConfiguration conf) { - return createMigrationAppBlock( - DiffusionMigrationSimulationComputation.class, - new DiffusionMigrationMasterCompute(), - IntWritable.class, - null, - conf); - } - - @Override - protected Class getEdgeValueClass(GiraphConfiguration arg0) { - return NullWritable.class; - } - - @Override - protected Class getVertexIDClass(GiraphConfiguration arg0) { - return LongWritable.class; - } - - @Override - protected Class getVertexValueClass(GiraphConfiguration arg0) { - return DiffusionVertexValue.class; - } - -} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMigrationSimulationComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMigrationSimulationComputation.java deleted file mode 100644 index 424c2c173..000000000 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMigrationSimulationComputation.java +++ /dev/null @@ -1,128 +0,0 @@ -package org.apache.giraph.examples; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullBasicComputation; -import org.apache.giraph.graph.Vertex; -import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.log4j.Logger; - -import org.apache.giraph.examples.feature_diffusion_utils.datastructures.DiffusionVertexValue; - -public class DiffusionMigrationSimulationComputation extends MigrationFullBasicComputation { - - Logger LOG = Logger.getLogger(this.getClass()); - - - - /*public void initialize(GraphState graphState, - WorkerClientRequestProcessor workerClientRequestProcessor, - CentralizedServiceWorker serviceWorker, - WorkerGlobalCommUsage workerGlobalCommUsage) { - super.initialize(graphState, workerClientRequestProcessor, serviceWorker, workerGlobalCommUsage); - delta = getConf().getDouble(DiffusionMigrationMasterCompute.diffusionDeltaOption, DiffusionMigrationMasterCompute.diffusionDeltaOptionDefault); - modelSwitch = getConf().getBoolean(DiffusionMigrationMasterCompute.diffusionListenOption, false); - - }*/ - - @Override - public void compute(Vertex vertex, Iterable msgs) - throws IOException { - - DiffusionVertexValue value = vertex.getValue(); - if(getSuperstep()==0) { //First superstep just to know the first label to analyze - setup(value); - }else { - boolean byLabel = ((BooleanWritable)getAggregatedValue(DiffusionMigrationMasterCompute.byLabelOption)).get(); - int currentLabel = (int)((LongWritable)getAggregatedValue(DiffusionMigrationMasterCompute.currentLabel)).get(); - // aggregators must be analyzed after first superstep - boolean timeToSwitch = ((BooleanWritable)getAggregatedValue(DiffusionMigrationMasterCompute.timeToSwitch)).get(); - if (timeToSwitch)//time to switch label? - if(value.getLabel() msgs, DiffusionVertexValue value) { - Iterator it = msgs.iterator(); - int activeNeighbors = 0; - while(it.hasNext()) - activeNeighbors += it.next().get(); - if(activeNeighbors > value.getVertexThreshold()) - value.modifyCurrentActivationProbability(1); - else if(activeNeighbors < value.getVertexThreshold()) - value.modifyCurrentActivationProbability(-1); - return activeNeighbors; - } - - private void aggregateVerticesBasedOnProbability(DiffusionVertexValue value) { - if(value.isVertexConvinced()) - aggregate(DiffusionMigrationMasterCompute.convincedVerticesAggregator, new LongWritable(1)); - //LOG.info("I'm with a probability " + value.getCurrentActivationProbability()); - if(value.isVertexDead()) { //Dead aggregator update - aggregate(DiffusionMigrationMasterCompute.deadVerticesAggregator, new LongWritable(1)); - } - if(value.isAlmostConvinced()){ - aggregate(DiffusionMigrationMasterCompute.almostConvincedVerticesAggregator,new LongWritable(1)); - } - aggregate(DiffusionMigrationMasterCompute.invitedVerticesAggregator, new LongWritable(1)); - - } - -} \ No newline at end of file diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/datastructures/DiffusionVertexValue.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/datastructures/DiffusionVertexValue.java new file mode 100644 index 000000000..6a4c1baa2 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/datastructures/DiffusionVertexValue.java @@ -0,0 +1,134 @@ +package org.apache.giraph.examples.feature_diffusion.datastructures; + +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.math.BigDecimal; +import java.math.RoundingMode; + +public class DiffusionVertexValue implements Writable { + + protected int vertexThreshold; + protected int label; + protected double currentActivationProbability; + protected double delta; + protected double almostConvincedTreshold; + + protected int activeNeighbors = 0; + + public DiffusionVertexValue() { + this.vertexThreshold = 1; + this.label = 1; + } + + public DiffusionVertexValue(int label) { + this.vertexThreshold = 1; + this.label = label; + } + + public DiffusionVertexValue(int vertexThreshold, int label) { + this.vertexThreshold = vertexThreshold; + this.label = label; + } + + public void readFields(DataInput in) throws IOException { + vertexThreshold = in.readInt(); + label = in.readInt(); + currentActivationProbability = in.readDouble(); + activeNeighbors = in.readInt(); + } + + public void write(DataOutput out) throws IOException { + out.writeInt(vertexThreshold); + out.writeInt(label); + out.writeDouble(currentActivationProbability); + out.writeInt(activeNeighbors); + } + + public double getCurrentActivationProbability() { + return currentActivationProbability; + } + + public void modifyCurrentActivationProbability(int sign) { + BigDecimal tmpcurrentActivationProbability = + new BigDecimal(currentActivationProbability) + .add(new BigDecimal(sign * delta)) + .setScale(5, RoundingMode.HALF_UP); + if (tmpcurrentActivationProbability.doubleValue() > 1) currentActivationProbability = 1; + else currentActivationProbability = tmpcurrentActivationProbability.doubleValue(); + if (tmpcurrentActivationProbability.doubleValue() <= 0) currentActivationProbability = 0; + } + + public boolean isVertexInvited(long currentLabel) { + return this.label >= currentLabel; + } + + public boolean isVertexDead() { + return new BigDecimal(currentActivationProbability) + .setScale(2, RoundingMode.HALF_DOWN) + .floatValue() + == 0; + } + + public boolean isVertexConvinced() { + return new BigDecimal(currentActivationProbability) + .setScale(2, RoundingMode.HALF_DOWN) + .floatValue() + == 1; + } + + public void setVertexThreshold(int threshold) { + this.vertexThreshold = threshold; + } + + public int getVertexThreshold() { + return vertexThreshold; + } + + public long getLabel() { + return this.label; + } + + public boolean rollActivationDice() { + return Math.random() <= currentActivationProbability; + } + + public void setlabel(int coreness) { + this.label = coreness; + } + + public boolean isAlmostConvinced() { + return currentActivationProbability > almostConvincedTreshold; + } + + // used at ss=0 in case of differences from default 0.2 + public void setInitialActivationProbability(double initialActivationProbability) { + this.currentActivationProbability = initialActivationProbability; + } + + public void setAlmostConvincedTreshold(double almostConvincedTreshold) { + this.almostConvincedTreshold = almostConvincedTreshold; + } + + public void setDelta(double delta) { + this.delta = delta; + } + + public int getActiveNeighbors() { + return activeNeighbors; + } + + public void setActiveNeighbors(int activeNeighbors) { + this.activeNeighbors = activeNeighbors; + } + + public void reset() { // Method to reset temporary data structures + activeNeighbors = 0; + } + + public String toString() { + return "" + label + "," + currentActivationProbability; + } +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/datastructures/LabelingVertexValue.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/datastructures/LabelingVertexValue.java new file mode 100644 index 000000000..376a21175 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/datastructures/LabelingVertexValue.java @@ -0,0 +1,77 @@ +package org.apache.giraph.examples.feature_diffusion.datastructures; + +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; + +public class LabelingVertexValue implements Writable { + + protected long label; + protected int threshold; + protected boolean labelJustChanged = false; + protected double temporaryValue; + protected HashMap neighborsLabels = new HashMap(); + + public LabelingVertexValue() { + this.threshold = 1; + } + + public LabelingVertexValue(int threshold) { + this.threshold = threshold; + } + + public void readFields(DataInput in) throws IOException { + label = in.readLong(); + threshold = in.readInt(); + labelJustChanged = in.readBoolean(); + temporaryValue = in.readDouble(); + } + + public void write(DataOutput out) throws IOException { + out.writeLong(label); + out.writeInt(threshold); + out.writeBoolean(labelJustChanged); + out.writeDouble(temporaryValue); + } + + public long getLabel() { + return label; + } + + public HashMap getNeighborsLabel() { + return neighborsLabels; + } + + public boolean isChanged() { + return labelJustChanged; + } + + public void setLabel(long label) { + this.label = label; + this.labelJustChanged = true; + } + + public void setChanged(boolean newChanged) { + this.labelJustChanged = newChanged; + } + + public void updateNeighboorLabel(long id, long label) { + if (!neighborsLabels.containsKey(id)) neighborsLabels.put(id, label); + else if (neighborsLabels.get(id) > label) neighborsLabels.put(id, label); + } + + public double getTemp() { + return temporaryValue; + } + + public void setTemp(double temp) { + this.temporaryValue = temp; + } + + public String toString() { + return "" + label + "," + threshold; + } +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/diffusion/DiffusionConstants.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/diffusion/DiffusionConstants.java new file mode 100644 index 000000000..b53575454 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/diffusion/DiffusionConstants.java @@ -0,0 +1,40 @@ +package org.apache.giraph.examples.feature_diffusion.diffusion; + +public class DiffusionConstants { + + // BROADCAST STRINGS + public static final String superstepBroadcast = "CURR_SST"; + public static final String convincedVerticesAggregator = "CONV_AGG_DIFF"; + public static final String usingVerticesAggregator = "AGG_DIFF"; + public static final String deadVerticesAggregator = "AGG_DIFF_DEAD"; + public static final String latestActivationsAggregator = "AGG_ACTIVATED_LAST"; + public static final String potentialVerticesAggregator = "Potential_invited_vertices"; + public static final String oldInvitedVerticesAggregator = "Old_invited_vertices"; + public static final String oldConvincedVerticesAggregator = "Old_convinced_vertices"; + public static final String oldDeadVerticesAggregator = "Old_dead_vertices"; + public static final String oldUsingVerticesAggregator = "Old_using_vertices"; + public static final String oldHesitantVerticesAggregator = "Old_hesitant_vertices"; + public static final String oldAlmostConvincedVerticesAggregator = "Old_almostConvinced_vertices"; + public static final String justChangedTimeToSwitch = "Just_changed_timeToSwitch_value"; + public static final String timeToSwitchBroadcast = "is_time_to_switch"; + + // GROUPS + + public static final String activatedVerticesCounterGroup = "Diffusion Counters"; + public static final String convincedVerticesCounter = "Convinced_Vertices "; + public static final String usingVerticesCounter = "Using_Vertices "; + public static final String deadVerticesCounter = "Dead_Vertices "; + public static final String hesitantVerticesAggregator = "hesitantVerticesAggregator "; + public static final String invitedVerticesAggregator = "Invited_Vertices "; + public static final String almostConvincedVerticesAggregator = "AlmostConvinced_Vertices "; + public static final String currentLabel = + "Label_active "; // label da analizzare nello specifico superstep + + // OPTIONS + public static final String diffusionDeltaOption = "diffusion.delta"; + public static final double diffusionDeltaOptionDefault = 0.005; + public static final String diffusionListenOption = "diffusion.listenWhileUnactive"; + public static final String byLabelOption = "by_label"; + + public double KSwitchTreshold; +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/diffusion/block_app/factory/DiffusionBlockFactory.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/diffusion/block_app/factory/DiffusionBlockFactory.java new file mode 100644 index 000000000..8866a42d7 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/diffusion/block_app/factory/DiffusionBlockFactory.java @@ -0,0 +1,98 @@ +package org.apache.giraph.examples.feature_diffusion.diffusion.block_app.factory; + +import org.apache.giraph.block_app.framework.AbstractBlockFactory; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.RepeatUntilBlock; +import org.apache.giraph.block_app.framework.block.SequenceBlock; +import org.apache.giraph.block_app.library.Pieces; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.TypesHolder; +import org.apache.giraph.function.ObjectTransfer; +import org.apache.giraph.reducers.impl.MaxReduce; +import org.apache.giraph.types.ops.LongTypeOps; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import org.apache.giraph.examples.feature_diffusion.datastructures.DiffusionVertexValue; +import org.apache.giraph.examples.feature_diffusion.diffusion.block_app.piece.DiffusionComputationPiece; + +public class DiffusionBlockFactory extends AbstractBlockFactory + implements TypesHolder { + + ObjectTransfer firstMaxLabel; + ObjectTransfer stoppingCondition; + + double delta; + String thresholdType; + double initialActivationProbability; + double almostConvincedThreshold; + + public DiffusionBlockFactory() { + firstMaxLabel = new ObjectTransfer(); + stoppingCondition = new ObjectTransfer(); + } + + @Override + public Block createBlock(GiraphConfiguration conf) { + delta = (double) conf.getFloat("Delta", (float) 0.005); + thresholdType = conf.getStrings("ThresholdType", "")[0]; + + initialActivationProbability = (double) conf.getFloat("InitialProbability", (float) 0.02); + almostConvincedThreshold = (double) conf.getFloat("AlmostConvincedTreshold", (float) 0.7); + + DiffusionComputationPiece dcp = new DiffusionComputationPiece(firstMaxLabel, stoppingCondition); + + return new SequenceBlock( + Pieces.reduce( + "Setup block", + new MaxReduce(LongTypeOps.INSTANCE), + (vertex) -> { + return new LongWritable(setup(vertex.getValue())); + }, + (value) -> { + firstMaxLabel.apply(value); + }), + new RepeatUntilBlock( + GiraphConstants.MAX_NUMBER_OF_SUPERSTEPS.get(conf), dcp, stoppingCondition)); + } + + @Override + public Object createExecutionStage(GiraphConfiguration arg0) { + return new Object(); + } + + @Override + protected Class getEdgeValueClass(GiraphConfiguration arg0) { + return NullWritable.class; + } + + @Override + protected Class getVertexIDClass(GiraphConfiguration arg0) { + return LongWritable.class; + } + + @Override + protected Class getVertexValueClass(GiraphConfiguration arg0) { + return DiffusionVertexValue.class; + } + + /** + * Set the initial values for some vertex parameters + * + * @param value the vertex value on which operate + * @return the vertex label + */ + private long setup(DiffusionVertexValue value) { + value.setDelta(delta); + value.setInitialActivationProbability(initialActivationProbability); + value.setAlmostConvincedTreshold(almostConvincedThreshold); + if (thresholdType.compareTo("1") == 0) value.setVertexThreshold(1); + else if (thresholdType.compareTo("Prop") == 0) { + value.setVertexThreshold((int) value.getLabel() / 20); + } + return value.getLabel(); + } +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/diffusion/block_app/piece/DiffusionComputationPiece.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/diffusion/block_app/piece/DiffusionComputationPiece.java new file mode 100644 index 000000000..3386227eb --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/diffusion/block_app/piece/DiffusionComputationPiece.java @@ -0,0 +1,387 @@ +package org.apache.giraph.examples.feature_diffusion.diffusion.block_app.piece; + +import java.util.Iterator; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi; +import org.apache.giraph.block_app.framework.piece.Piece; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender; +import org.apache.giraph.combiner.MessageCombiner; +import org.apache.giraph.combiner.SumMessageCombiner; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.function.ObjectTransfer; +import org.apache.giraph.reducers.impl.MaxReduce; +import org.apache.giraph.reducers.impl.OrReduce; +import org.apache.giraph.reducers.impl.SumReduce; +import org.apache.giraph.types.ops.IntTypeOps; +import org.apache.giraph.types.ops.LongTypeOps; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; + +import org.apache.giraph.examples.feature_diffusion.datastructures.DiffusionVertexValue; +import org.apache.giraph.examples.feature_diffusion.diffusion.DiffusionConstants; + +public class DiffusionComputationPiece + extends Piece { + + // ADDED CONSTANTS + ReducerHandle superstepReducerHandle; + + // EXISTING CONSTANTS + + ReducerHandle convincedVerticesHandle; + ReducerHandle usingVerticesHandle; + ReducerHandle deadVerticesHandle; + ReducerHandle latestActivationsHandle; + ReducerHandle hesitantVerticesHandle; + + // for KCORE (or general label) based algorithm + ReducerHandle invitedVerticesHandle; + ReducerHandle almostConvincedVerticesHandle; + ReducerHandle + currentLabelHandle; // label da analizzare nello specifico superstep + ReducerHandle + nextLabelHandle; // ogni volta riceve tutte le label ancora da eseguire + ReducerHandle timeToSwitchHandle; + + // for MIN_NUMBER based algorithm + ReducerHandle potentialVerticesHandle; + ReducerHandle oldInvitedVerticesHandle; + ReducerHandle oldConvincedVerticesHandle; + ReducerHandle oldDeadVerticesHandle; + ReducerHandle oldUsingVerticesHandle; + ReducerHandle oldHesitantVerticesHandle; + ReducerHandle oldAlmostConvincedVerticesHandle; + ReducerHandle justChangedTimeToSwitchHandle; + + // GLOBAL VARIABLES + long superstep = 0; + + long currentLabel; + boolean byLabel; + boolean timeToSwitch; + long nextLabel; + + int minNumber; + double kSwitchThreshold; + + long convincedVertices; + long usingVertices; + long deadVertices; + long invitedVertices; + long almostConvincedVertices; + long hesitantVertices; + long oldInvitedVertices; + private long oldConvincedVertices; + private long oldDeadVertices; + private long oldHesitantVertices; + private long oldUsingVertices; + private long oldAlmostConvincedVertices; + + ObjectTransfer setupMaxLabel; + ObjectTransfer stoppingCondition; + + boolean oldMaxLabelRetrieved = false; + + private boolean justChangedTimeToSwitch; + + public DiffusionComputationPiece( + ObjectTransfer oldMaxLabel, ObjectTransfer stoppingCondition) { + this.setupMaxLabel = oldMaxLabel; + this.stoppingCondition = stoppingCondition; + } + + @Override + public void registerReducers(CreateReducersApi reduceApi, Object executionStage) { + super.registerReducers(reduceApi, executionStage); + + byLabel = reduceApi.getConf().getBoolean("ByLabel", true); + kSwitchThreshold = (double) reduceApi.getConf().getFloat("KSwitchThreshold", (float) 0.7); + + convincedVerticesHandle = + reduceApi.createLocalReducer(new SumReduce(LongTypeOps.INSTANCE)); + usingVerticesHandle = + reduceApi.createLocalReducer(new SumReduce(LongTypeOps.INSTANCE)); + deadVerticesHandle = + reduceApi.createLocalReducer(new SumReduce(LongTypeOps.INSTANCE)); + invitedVerticesHandle = + reduceApi.createLocalReducer(new SumReduce(LongTypeOps.INSTANCE)); + almostConvincedVerticesHandle = + reduceApi.createLocalReducer(new SumReduce(LongTypeOps.INSTANCE)); + nextLabelHandle = + reduceApi.createLocalReducer(new MaxReduce(LongTypeOps.INSTANCE)); + currentLabelHandle = + reduceApi.createLocalReducer(new MaxReduce(LongTypeOps.INSTANCE)); + timeToSwitchHandle = reduceApi.createLocalReducer(new OrReduce()); + hesitantVerticesHandle = + reduceApi.createLocalReducer(new SumReduce(LongTypeOps.INSTANCE)); + + if (!byLabel) { + minNumber = reduceApi.getConf().getInt("minNumber", 200); + potentialVerticesHandle = + reduceApi.createLocalReducer(new SumReduce(LongTypeOps.INSTANCE)); + oldInvitedVerticesHandle = + reduceApi.createLocalReducer(new SumReduce(LongTypeOps.INSTANCE)); + oldConvincedVerticesHandle = + reduceApi.createLocalReducer(new SumReduce(LongTypeOps.INSTANCE)); + oldDeadVerticesHandle = + reduceApi.createLocalReducer(new SumReduce(LongTypeOps.INSTANCE)); + oldUsingVerticesHandle = + reduceApi.createLocalReducer(new SumReduce(LongTypeOps.INSTANCE)); + oldHesitantVerticesHandle = + reduceApi.createLocalReducer(new SumReduce(LongTypeOps.INSTANCE)); + oldAlmostConvincedVerticesHandle = + reduceApi.createLocalReducer(new SumReduce(LongTypeOps.INSTANCE)); + justChangedTimeToSwitchHandle = reduceApi.createLocalReducer(new OrReduce()); + } + } + + @Override + public void masterCompute(BlockMasterApi master, Object executionStage) { + super.masterCompute(master, executionStage); + + convincedVertices = convincedVerticesHandle.getReducedValue(master).get(); + usingVertices = usingVerticesHandle.getReducedValue(master).get(); + deadVertices = deadVerticesHandle.getReducedValue(master).get(); + invitedVertices = invitedVerticesHandle.getReducedValue(master).get(); + almostConvincedVertices = almostConvincedVerticesHandle.getReducedValue(master).get(); + hesitantVertices = hesitantVerticesHandle.getReducedValue(master).get(); + + if (oldMaxLabelRetrieved) { // First time the new label is transferred from another piece + if (timeToSwitch) nextLabel = nextLabelHandle.getReducedValue(master).get(); + } else { + nextLabel = setupMaxLabel.get().get(); + oldMaxLabelRetrieved = true; + } + + // This avoid having counters' value "0" when it's timeToSwitch and so the computation based on + // MIN_NUMBER is "paused" + if (!byLabel && timeToSwitch) { + almostConvincedVertices = oldAlmostConvincedVerticesHandle.getReducedValue(master).get(); + invitedVertices = oldInvitedVerticesHandle.getReducedValue(master).get(); + usingVertices = oldUsingVerticesHandle.getReducedValue(master).get(); + deadVertices = oldDeadVerticesHandle.getReducedValue(master).get(); + convincedVertices = oldConvincedVerticesHandle.getReducedValue(master).get(); + hesitantVertices = oldHesitantVerticesHandle.getReducedValue(master).get(); + } + + master + .getCounter( + DiffusionConstants.activatedVerticesCounterGroup, + DiffusionConstants.hesitantVerticesAggregator + superstep) + .setValue(hesitantVertices); + master + .getCounter( + DiffusionConstants.activatedVerticesCounterGroup, + DiffusionConstants.usingVerticesCounter + superstep) + .setValue(usingVertices); + master + .getCounter( + DiffusionConstants.activatedVerticesCounterGroup, + DiffusionConstants.convincedVerticesCounter + superstep) + .setValue(convincedVertices); + master + .getCounter( + DiffusionConstants.activatedVerticesCounterGroup, + DiffusionConstants.deadVerticesCounter + superstep) + .setValue(deadVertices); + master + .getCounter( + DiffusionConstants.activatedVerticesCounterGroup, + DiffusionConstants.invitedVerticesAggregator + superstep) + .setValue(invitedVertices); + master + .getCounter( + DiffusionConstants.activatedVerticesCounterGroup, + DiffusionConstants.almostConvincedVerticesAggregator + superstep) + .setValue(almostConvincedVertices); + master + .getCounter( + DiffusionConstants.activatedVerticesCounterGroup, + DiffusionConstants.currentLabel + superstep) + .setValue(currentLabel); + + if (byLabel) { // Kcore or similar + if (superstep >= 0) { + if (timeToSwitch) timeToSwitch = false; + if (superstep == 0) { + currentLabel = nextLabel; + nextLabel = -1; + timeToSwitch = true; + } else if (currentLabel != 1) { // if we haven't reached the lowest coreness + long almostConvicedVal = almostConvincedVerticesHandle.getReducedValue(master).get(); + long invitedVal = invitedVerticesHandle.getReducedValue(master).get(); + // if the threshold is reached or all the invited vertices are dead, convinced or hesitant + if (((double) almostConvicedVal) / invitedVal > kSwitchThreshold + || invitedVertices == (deadVertices + convincedVertices + hesitantVertices)) { + timeToSwitch = true; + currentLabel = nextLabel; + nextLabel = -1; + } + } + } + } else { // degree, pagerank or other similar where the label does not represent a group of + // vertices + justChangedTimeToSwitch = false; + if (superstep == 0) { + currentLabel = nextLabel; + timeToSwitch = true; + oldInvitedVertices = oldInvitedVerticesHandle.getReducedValue(master).get(); + } + if (superstep > 0) { + + if (!timeToSwitch) { + if (currentLabel > 0) { + // if the threshold is reached or all the invited vertices are dead, convinced or + // hesitant + if (((double) convincedVertices) / invitedVertices > kSwitchThreshold + || invitedVertices == (deadVertices + convincedVertices + hesitantVertices)) { + timeToSwitch = true; + oldInvitedVertices = invitedVertices; + oldConvincedVertices = convincedVertices; + oldAlmostConvincedVertices = almostConvincedVertices; + oldDeadVertices = deadVertices; + oldHesitantVertices = hesitantVertices; + oldUsingVertices = usingVertices; + } + } + } else { // it's time to switch: let's scan some label until we find at least + // vertices more than now + long old = oldInvitedVertices; + long actual = potentialVerticesHandle.getReducedValue(master).get(); + if (actual - old > minNumber) { + timeToSwitch = false; + justChangedTimeToSwitch = true; + } else if (nextLabelHandle.getReducedValue(master).get() < 0 + && superstep + > 10) { // reached the lowest label without finding at least MIN_NUMBER vertices + timeToSwitch = false; + justChangedTimeToSwitch = true; + currentLabel = 0; + } else { // continue to scan + currentLabel = nextLabel; + nextLabel = -1; + } + } + } + } + + superstep++; + + stoppingCondition.apply( + superstep > 0 + && master.getTotalNumVertices() + == (deadVertices + convincedVertices + hesitantVertices)); + } + + @Override + public VertexSender getVertexSender( + BlockWorkerSendApi workerApi, + Object executionStage) { + return (vertex) -> { + DiffusionVertexValue value = vertex.getValue(); + if (timeToSwitch && superstep > 0) + if (value.getLabel() < currentLabel) + nextLabelHandle.reduce(new LongWritable(value.getLabel())); + if (value.isVertexInvited(currentLabel) && superstep > 1) { + int activeNeighbors = value.getActiveNeighbors(); + if (byLabel) { + if (!value.isVertexDead() + && superstep != 1 + && activeNeighbors == value.getVertexThreshold()) + hesitantVerticesHandle.reduce(new LongWritable(activeNeighbors)); + aggregateVerticesBasedOnProbability(value); + } else { + if (!timeToSwitch) { + if (!value.isVertexDead() + && !justChangedTimeToSwitch + && activeNeighbors == value.getVertexThreshold()) + hesitantVerticesHandle.reduce(new LongWritable(1)); + aggregateVerticesBasedOnProbability(value); + } else potentialVerticesHandle.reduce(new LongWritable(1)); + } + } + value.reset(); + + if (value.isVertexInvited(currentLabel) && value.rollActivationDice() && superstep > 0) + if (byLabel || (!byLabel && !timeToSwitch)) { + usingVerticesHandle.reduce(new LongWritable(1)); + workerApi.sendMessageToAllEdges(vertex, new IntWritable(1)); + } + }; + } + + @Override + public VertexReceiver + getVertexReceiver(BlockWorkerReceiveApi workerApi, Object executionStage) { + return (vertex, messages) -> { + DiffusionVertexValue value = vertex.getValue(); + + if (value.isVertexInvited(currentLabel)) { + if (byLabel) { + if (!value.isVertexDead() + && superstep != 1) { // Update the using probability, if not dead + value.setActiveNeighbors(checkMsgsAndUpdateProbability(messages, value)); + } + } else { + if (!timeToSwitch) { + // Update the using probability if not dead and the computation has not just became + // active + // (because we don't have old messages sent so it would wrongly decrease the + // probability) + if (!value.isVertexDead() && !justChangedTimeToSwitch) { + value.setActiveNeighbors(checkMsgsAndUpdateProbability(messages, value)); + } + } + } + } + }; + } + + /** + * Check all the messages received by the vertex and update its probability with respect to the + * its threshold + * + * @param msgs the list of messages received by the vertex + * @param value the vertex value + * @return the number of neighbors using the feature (invited and active) + */ + private int checkMsgsAndUpdateProbability( + Iterable msgs, DiffusionVertexValue value) { + Iterator it = msgs.iterator(); + int activeNeighbors = 0; + while (it.hasNext()) activeNeighbors += it.next().get(); + if (activeNeighbors > value.getVertexThreshold()) value.modifyCurrentActivationProbability(1); + else if (activeNeighbors < value.getVertexThreshold()) + value.modifyCurrentActivationProbability(-1); + return activeNeighbors; + } + + /** + * Basing on the vertex current activation probability, update the relative reducers + * + * @param value the vertex value, containing the current activation probability + */ + private void aggregateVerticesBasedOnProbability(DiffusionVertexValue value) { + if (value.isVertexConvinced()) convincedVerticesHandle.reduce(new LongWritable(1)); + if (value.isVertexDead()) { // Dead aggregator update + deadVerticesHandle.reduce(new LongWritable(1)); + } + if (value.isAlmostConvinced()) { + almostConvincedVerticesHandle.reduce(new LongWritable(1)); + } + invitedVerticesHandle.reduce(new LongWritable(1)); + } + + @Override + public MessageCombiner getMessageCombiner( + ImmutableClassesGiraphConfiguration conf) { + return new SumMessageCombiner(IntTypeOps.INSTANCE); + } +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/io/AdjacencyListNoEdgeValueTextVertexOutputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/io/AdjacencyListNoEdgeValueTextVertexOutputFormat.java new file mode 100644 index 000000000..23e5ad311 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/io/AdjacencyListNoEdgeValueTextVertexOutputFormat.java @@ -0,0 +1,64 @@ +package org.apache.giraph.examples.feature_diffusion.io; + +import org.apache.giraph.edge.Edge; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.io.formats.TextVertexOutputFormat; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +/** + * OutputFormat to write out the graph nodes as text, value-separated (by tabs, by default). With + * the default delimiter, a vertex is written out as: + * + *

[]+ + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + */ +@SuppressWarnings("rawtypes") +public class AdjacencyListNoEdgeValueTextVertexOutputFormat< + I extends WritableComparable, V extends Writable, E extends Writable> + extends TextVertexOutputFormat { + + /** Split delimiter */ + public static final String LINE_TOKENIZE_VALUE = "output.delimiter"; + /** Default split delimiter */ + public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t"; + + @Override + public AdjacencyListTextVertexWriter createVertexWriter(TaskAttemptContext context) { + return new AdjacencyListTextVertexWriter(); + } + + /** Vertex writer associated with {@link AdjacencyListTextVertexOutputFormat}. */ + protected class AdjacencyListTextVertexWriter extends TextVertexWriterToEachLine { + /** Cached split delimeter */ + private String delimiter; + + @Override + public void initialize(TaskAttemptContext context) throws IOException, InterruptedException { + super.initialize(context); + delimiter = getConf().get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT); + } + + @Override + public Text convertVertexToLine(Vertex vertex) throws IOException { + StringBuffer sb = new StringBuffer(vertex.getId().toString()); + sb.append(delimiter); + sb.append(vertex.getValue()); + sb.append(delimiter); + + for (Edge edge : vertex.getEdges()) { + sb.append(edge.getTargetVertexId()).append(","); + // sb.append(delimiter).append(edge.getValue()); + } + sb.setLength(sb.length() - 1); + return new Text(sb.toString()); + } + } +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/io/DiffusionVertexInputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/io/DiffusionVertexInputFormat.java new file mode 100644 index 000000000..1e82ba4dc --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/io/DiffusionVertexInputFormat.java @@ -0,0 +1,64 @@ +package org.apache.giraph.examples.feature_diffusion.io; + +import com.google.common.collect.Lists; + +import org.apache.giraph.examples.feature_diffusion.datastructures.DiffusionVertexValue; + +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; +import org.apache.giraph.io.formats.TextVertexInputFormat; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.List; + +public class DiffusionVertexInputFormat + extends TextVertexInputFormat { + + @Override + public TextVertexReader createVertexReader(InputSplit arg0, TaskAttemptContext arg1) + throws IOException { + return new DiffusionVertexReader(); + } + + protected class DiffusionVertexReader extends TextVertexReaderFromEachLine { + + @Override + protected Iterable> getEdges(Text line) throws IOException { + String[] fA = line.toString().split("\t"); + String[] edgeArray = fA[fA.length - 1].split(","); + List> edges = Lists.newArrayList(); + int i; + for (i = 0; i < edgeArray.length; ++i) { + long neighborId = Long.parseLong(edgeArray[i]); + edges.add(EdgeFactory.create(new LongWritable(neighborId), NullWritable.get())); + } + return edges; + } + + @Override + protected LongWritable getId(Text line) throws IOException { + return new LongWritable(Long.parseLong(line.toString().split("\t")[0])); + } + + @Override + protected DiffusionVertexValue getValue(Text line) throws IOException { + String[] split = line.toString().split("\t"); + String value = split[1]; + String[] reSplit = value.split(","); + if (reSplit.length == 2) { + int treshold = Integer.parseInt(reSplit[1]); + int label = Integer.parseInt(reSplit[0]); + return new DiffusionVertexValue(treshold, label); + + } else { + int label = Integer.parseInt(reSplit[0]); + return new DiffusionVertexValue(label); + } + } + } +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/io/LabelingInputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/io/LabelingInputFormat.java new file mode 100644 index 000000000..46e8f317f --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/io/LabelingInputFormat.java @@ -0,0 +1,59 @@ +package org.apache.giraph.examples.feature_diffusion.io; + +import com.google.common.collect.Lists; + +import org.apache.giraph.examples.feature_diffusion.datastructures.LabelingVertexValue; + +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; +import org.apache.giraph.io.formats.TextVertexInputFormat; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.List; + +public class LabelingInputFormat + extends TextVertexInputFormat { + + @Override + public TextVertexReader createVertexReader(InputSplit arg0, TaskAttemptContext arg1) + throws IOException { + return new DiffusionVertexReader(); + } + + protected class DiffusionVertexReader extends TextVertexReaderFromEachLine { + + @Override + protected Iterable> getEdges(Text line) throws IOException { + String[] fA = line.toString().split("\t"); + String[] edgeArray = fA[fA.length - 1].split(","); + List> edges = Lists.newArrayList(); + int i; + for (i = 0; i < edgeArray.length; ++i) { + long neighborId = Long.parseLong(edgeArray[i]); + edges.add(EdgeFactory.create(new LongWritable(neighborId), NullWritable.get())); + } + return edges; + } + + @Override + protected LongWritable getId(Text line) throws IOException { + return new LongWritable(Long.parseLong(line.toString().split("\t")[0])); + } + + @Override + protected LabelingVertexValue getValue(Text line) throws IOException { + String[] split = line.toString().split("\t"); + if (split.length == 2) { + return new LabelingVertexValue(); + } else { + String treshold = split[split.length - 2]; + return new LabelingVertexValue(Integer.parseInt(treshold)); + } + } + } +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/DegreeLabelingBlockFactory.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/DegreeLabelingBlockFactory.java new file mode 100644 index 000000000..d313e356a --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/DegreeLabelingBlockFactory.java @@ -0,0 +1,48 @@ +package org.apache.giraph.examples.feature_diffusion.labeling.factory; + +import org.apache.giraph.block_app.framework.AbstractBlockFactory; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.library.Pieces; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.TypesHolder; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; + +import org.apache.giraph.examples.feature_diffusion.datastructures.LabelingVertexValue; + +public class DegreeLabelingBlockFactory extends AbstractBlockFactory + implements TypesHolder< + LongWritable, LabelingVertexValue, NullWritable, NullWritable, NullWritable> { + + long superstep = 0; + + @Override + public Block createBlock(GiraphConfiguration arg0) { + return Pieces.forAllVertices( + "Degree Labeling", + (vertex) -> { + LabelingVertexValue value = vertex.getValue(); + value.setLabel(vertex.getNumEdges()); + }); + } + + @Override + protected Class getEdgeValueClass(GiraphConfiguration arg0) { + return NullWritable.class; + } + + @Override + protected Class getVertexIDClass(GiraphConfiguration arg0) { + return LongWritable.class; + } + + @Override + protected Class getVertexValueClass(GiraphConfiguration arg0) { + return LabelingVertexValue.class; + } + + @Override + public Object createExecutionStage(GiraphConfiguration arg0) { + return new Object(); + } +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/FlatTimeLabelingBlockFactory.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/FlatTimeLabelingBlockFactory.java new file mode 100644 index 000000000..2b38e0118 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/FlatTimeLabelingBlockFactory.java @@ -0,0 +1,49 @@ +package org.apache.giraph.examples.feature_diffusion.labeling.factory; + +import org.apache.giraph.block_app.framework.AbstractBlockFactory; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.library.Pieces; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.TypesHolder; +import org.apache.giraph.writable.tuple.LongLongWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; + +import org.apache.giraph.examples.feature_diffusion.datastructures.LabelingVertexValue; + +public class FlatTimeLabelingBlockFactory extends AbstractBlockFactory + implements TypesHolder< + LongWritable, LabelingVertexValue, NullWritable, LongLongWritable, LongLongWritable> { + + long superstep = 0; + + @Override + public Block createBlock(GiraphConfiguration arg0) { + return Pieces.forAllVertices( + "Degree Labeling", + (vertex) -> { + LabelingVertexValue value = vertex.getValue(); + value.setLabel(1); + }); + } + + @Override + protected Class getEdgeValueClass(GiraphConfiguration arg0) { + return NullWritable.class; + } + + @Override + protected Class getVertexIDClass(GiraphConfiguration arg0) { + return LongWritable.class; + } + + @Override + protected Class getVertexValueClass(GiraphConfiguration arg0) { + return LabelingVertexValue.class; + } + + @Override + public Object createExecutionStage(GiraphConfiguration arg0) { + return new Object(); + } +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/KCoreLabelingBlockFactory.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/KCoreLabelingBlockFactory.java new file mode 100644 index 000000000..ca3cc750e --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/KCoreLabelingBlockFactory.java @@ -0,0 +1,130 @@ +package org.apache.giraph.examples.feature_diffusion.labeling.factory; + +import org.apache.giraph.block_app.framework.AbstractBlockFactory; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.RepeatUntilBlock; +import org.apache.giraph.block_app.framework.block.SequenceBlock; +import org.apache.giraph.block_app.library.Pieces; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.TypesHolder; +import org.apache.giraph.function.ObjectTransfer; +import org.apache.giraph.function.vertex.ConsumerWithVertex; +import org.apache.giraph.reducers.impl.AndReduce; +import org.apache.giraph.writable.tuple.LongLongWritable; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; + +import java.util.HashMap; +import java.util.Map.Entry; + +import org.apache.giraph.examples.feature_diffusion.datastructures.LabelingVertexValue; + +@SuppressWarnings("unused") +public class KCoreLabelingBlockFactory extends AbstractBlockFactory + implements TypesHolder< + LongWritable, LabelingVertexValue, NullWritable, LongLongWritable, LongLongWritable> { + + ObjectTransfer stoppingCondition; + + public static ConsumerWithVertex< + LongWritable, LabelingVertexValue, NullWritable, Iterable> + kCoreConsumer = + (vertex, messages) -> { + LabelingVertexValue value = vertex.getValue(); + for (LongLongWritable msg : messages) { + long id = msg.getLeft().get(); + int coreness = (int) msg.getRight().get(); + value.updateNeighboorLabel(id, coreness); + } + }; + + @Override + public Block createBlock(GiraphConfiguration conf) { + stoppingCondition = new ObjectTransfer(false); + return new SequenceBlock( + Pieces + . + sendMessageToNeighbors( + "KCore Labeling First run", + LongLongWritable.class, + (vertex) -> { + LabelingVertexValue value = vertex.getValue(); + value.setLabel(Integer.max(vertex.getNumEdges(), 1)); + value.setChanged(false); + return new LongLongWritable(vertex.getId().get(), value.getLabel()); + }, + kCoreConsumer), + new RepeatUntilBlock( + GiraphConstants.MAX_NUMBER_OF_SUPERSTEPS.get(conf), + new SequenceBlock( + Pieces + . + sendMessageToNeighbors( + "KCore Labeling", + LongLongWritable.class, + (vertex) -> { + LabelingVertexValue value = vertex.getValue(); + int tempLabel = + computeIndex(value.getNeighborsLabel(), value.getLabel()); + if (tempLabel < value.getLabel()) value.setLabel(tempLabel); + if (value.isChanged()) { + // value.setChanged(false); + return new LongLongWritable(vertex.getId().get(), value.getLabel()); + } + return null; + }, + kCoreConsumer), + Pieces + . + reduce( + "Reducing stopping Condition", + AndReduce.INSTANCE, + (vertex) -> { + boolean isLabelChanged = vertex.getValue().isChanged(); + vertex.getValue().setChanged(false); + return new BooleanWritable(!isLabelChanged); + }, + (value) -> { + stoppingCondition.apply(value.get()); + })), + stoppingCondition)); + } + + private int computeIndex(HashMap neighborsLabel, long coreness) { + int[] corenessCount = new int[(int) coreness]; + for (int i = 0; i < coreness; i++) corenessCount[i] = 0; + for (Entry pair : neighborsLabel.entrySet()) { + long corenessCandidate = Long.min(pair.getValue(), coreness); + corenessCount[(int) corenessCandidate - 1]++; + } + for (int i = (int) (coreness - 1); i > 0; i--) corenessCount[i - 1] += corenessCount[i]; + int i = (int) coreness; + while (i > 1 && corenessCount[i - 1] < i) { + i--; + } + return i; + } + + @Override + protected Class getEdgeValueClass(GiraphConfiguration arg0) { + return NullWritable.class; + } + + @Override + protected Class getVertexIDClass(GiraphConfiguration arg0) { + return LongWritable.class; + } + + @Override + protected Class getVertexValueClass(GiraphConfiguration arg0) { + return LabelingVertexValue.class; + } + + @Override + public Object createExecutionStage(GiraphConfiguration arg0) { + return new Object(); + } +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/PageRankLabelingBlockFactory.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/PageRankLabelingBlockFactory.java new file mode 100644 index 000000000..2b03c1efd --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/PageRankLabelingBlockFactory.java @@ -0,0 +1,84 @@ +package org.apache.giraph.examples.feature_diffusion.labeling.factory; + +import org.apache.giraph.block_app.framework.AbstractBlockFactory; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.RepeatBlock; +import org.apache.giraph.block_app.framework.block.SequenceBlock; +import org.apache.giraph.block_app.library.Pieces; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.TypesHolder; +import org.apache.giraph.function.vertex.ConsumerWithVertex; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; + +import org.apache.giraph.examples.feature_diffusion.datastructures.LabelingVertexValue; + +public class PageRankLabelingBlockFactory extends AbstractBlockFactory + implements TypesHolder< + LongWritable, LabelingVertexValue, NullWritable, DoubleWritable, DoubleWritable> { + + long numVerticesTotal; + + private ConsumerWithVertex< + LongWritable, LabelingVertexValue, NullWritable, Iterable> + pageRankConsumer = + (vertex, messages) -> { + LabelingVertexValue value = vertex.getValue(); + double sum = 0; + for (DoubleWritable message : messages) { + if (message.get() >= 0) sum += message.get(); + } + double pr = ((0.15f / numVerticesTotal) + 0.85f * sum); + value.setTemp(pr); // to change, removing + }; + + @Override + public Block createBlock(GiraphConfiguration conf) { + return new SequenceBlock( + Pieces.masterCompute( + "Master preprocess", + (master) -> { + numVerticesTotal = master.getTotalNumVertices(); + }), + new RepeatBlock( + conf.getInt("labeling.pagerank.iterations", 49), + Pieces + . + sendMessageToNeighbors( + "PageRank labeling", + DoubleWritable.class, + (vertex) -> { + LabelingVertexValue value = vertex.getValue(); + return new DoubleWritable(value.getTemp() / vertex.getNumEdges()); + }, + pageRankConsumer)), + Pieces.forAllVertices( + "Closing PageRank", + (vertex) -> { + LabelingVertexValue value = vertex.getValue(); + int cif = (int) (Math.log10(numVerticesTotal) + 2); + value.setLabel((long) (value.getTemp() * Math.pow(10, cif))); + })); + } + + @Override + protected Class getEdgeValueClass(GiraphConfiguration arg0) { + return NullWritable.class; + } + + @Override + protected Class getVertexIDClass(GiraphConfiguration arg0) { + return LongWritable.class; + } + + @Override + protected Class getVertexValueClass(GiraphConfiguration arg0) { + return LabelingVertexValue.class; + } + + @Override + public Object createExecutionStage(GiraphConfiguration arg0) { + return new Object(); + } +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/datastructures/DiffusionVertexValue.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/datastructures/DiffusionVertexValue.java deleted file mode 100644 index 2edb4a6c2..000000000 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/datastructures/DiffusionVertexValue.java +++ /dev/null @@ -1,117 +0,0 @@ -package org.apache.giraph.examples.feature_diffusion_utils.datastructures; - -import org.apache.hadoop.io.Writable; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.math.BigDecimal; -import java.math.RoundingMode; -// DEBUG -//import java.util.LinkedList; -import java.util.LinkedList; - -public class DiffusionVertexValue implements Writable { - - protected int vertexThreshold; - protected int label; - protected double currentActivationProbability=0.2; - protected double delta=0.05; - protected double almostConvincedTreshold=0.7; - - public DiffusionVertexValue() { - this.vertexThreshold=1; - this.label=1; - } - - public DiffusionVertexValue(int label) { - this.vertexThreshold=1; - this.label=label; - } - - public DiffusionVertexValue( int vertexThreshold, int label) { - this.vertexThreshold=vertexThreshold; - this.label=label; - } - - public void readFields(DataInput in) throws IOException { - vertexThreshold = in.readInt(); - label = in.readInt(); - currentActivationProbability = in.readDouble(); - } - - public void write(DataOutput out) throws IOException { - out.writeInt(vertexThreshold); - out.writeInt(label); - out.writeDouble(currentActivationProbability); - } - - public double getCurrentActivationProbability() { - return currentActivationProbability; - } - - public void modifyCurrentActivationProbability(int sign) { - BigDecimal tmpcurrentActivationProbability = new BigDecimal(currentActivationProbability).add(new BigDecimal(sign*delta)).setScale(5, RoundingMode.HALF_UP); - if(tmpcurrentActivationProbability.doubleValue() > 1) - currentActivationProbability = 1; - else - currentActivationProbability = tmpcurrentActivationProbability.doubleValue(); - if(tmpcurrentActivationProbability.doubleValue() <= 0) - currentActivationProbability = 0; - } - - public boolean isVertexInvited(int currentLabel) { - return this.label >= currentLabel; - - } - - public boolean isVertexDead() { - return new BigDecimal(currentActivationProbability).setScale(2, RoundingMode.HALF_DOWN).floatValue() == 0; - } - - public boolean isVertexConvinced() { - return new BigDecimal(currentActivationProbability).setScale(2, RoundingMode.HALF_DOWN).floatValue() == 1; - } - - public void setVertexThreshold(int threshold) { - this.vertexThreshold=threshold; - } - - public int getVertexThreshold() { - return vertexThreshold; - } - - public long getLabel() { - return this.label; - } - - public boolean rollActivationDice() { - return Math.random() <= currentActivationProbability; - } - - public void setlabel(int coreness) { - this.label=coreness; - } - - public boolean isAlmostConvinced(){ - return currentActivationProbability>almostConvincedTreshold; - } - - // used at ss=0 in case of differences from default 0.2 - public void setInitialActivationProbability(double initialActivationProbability){ - this.currentActivationProbability=initialActivationProbability; - } - - public void setAlmostConvincedTreshold(double almostConvincedTreshold){ - this.almostConvincedTreshold=almostConvincedTreshold; - } - - public void setDelta(double delta) { - this.delta = delta; - } - - public String toString() { - return ""+label+","+currentActivationProbability; - } - -} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/datastructures/LabelingVertexValue.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/datastructures/LabelingVertexValue.java deleted file mode 100644 index 7bab062af..000000000 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/datastructures/LabelingVertexValue.java +++ /dev/null @@ -1,79 +0,0 @@ -package org.apache.giraph.examples.feature_diffusion_utils.datastructures; - - - -import org.apache.hadoop.io.Writable; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.HashMap; - -public class LabelingVertexValue implements Writable { - - - protected long label; - protected int treshold; - protected boolean changed = false; - protected double temp; - protected HashMap neighboorsLabels=new HashMap(); - - public LabelingVertexValue(){ - this.treshold=1; - } - - public LabelingVertexValue(int treshold){ - this.treshold=treshold; - } - - public void readFields(DataInput in) throws IOException { - label=in.readLong(); - treshold=in.readInt(); - changed=in.readBoolean(); - temp=in.readDouble(); - } - - public void write(DataOutput out) throws IOException { - out.writeLong(label); - out.writeInt(treshold); - out.writeBoolean(changed); - out.writeDouble(temp); - } - - public Long getLabel() {return label;} - - public HashMap getNeighboorsLabel() {return neighboorsLabels;} - - public boolean isChanged() {return changed;} - - public void setLabel(long label) { - this.label=label; - this.changed=true; - } - - public void setChanged(boolean newChanged) { - this.changed = newChanged; - } - - - public void updateNeighboorLabel(long id,long label) { - if(!neighboorsLabels.containsKey(id)) - neighboorsLabels.put(id, label); - else - if(neighboorsLabels.get(id) > label) - neighboorsLabels.put(id, label); - } - - public double getTemp() { - return temp; - } - - public void setTemp(double temp) { - this.temp=temp; - } - public String toString() { - return ""+label+","+treshold; - } - - -} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/AdjacencyListNoEdgeValueTextVertexOutputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/AdjacencyListNoEdgeValueTextVertexOutputFormat.java deleted file mode 100644 index 3534356a0..000000000 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/AdjacencyListNoEdgeValueTextVertexOutputFormat.java +++ /dev/null @@ -1,66 +0,0 @@ -package org.apache.giraph.examples.feature_diffusion_utils.io; - - -import org.apache.giraph.edge.Edge; -import org.apache.giraph.graph.Vertex; -import org.apache.giraph.io.formats.TextVertexOutputFormat; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import java.io.IOException; - -/** -* OutputFormat to write out the graph nodes as text, value-separated (by -* tabs, by default). With the default delimiter, a vertex is written out as: -* -* []+ -* -* @param Vertex index value -* @param Vertex value -* @param Edge value -*/ -@SuppressWarnings("rawtypes") -public class AdjacencyListNoEdgeValueTextVertexOutputFormat extends TextVertexOutputFormat { - - /** Split delimiter */ - public static final String LINE_TOKENIZE_VALUE = "output.delimiter"; - /** Default split delimiter */ - public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t"; - - @Override - public AdjacencyListTextVertexWriter createVertexWriter(TaskAttemptContext context) { - return new AdjacencyListTextVertexWriter(); - } - - /** - * Vertex writer associated with {@link AdjacencyListTextVertexOutputFormat}. - */ - protected class AdjacencyListTextVertexWriter extends TextVertexWriterToEachLine { - /** Cached split delimeter */ - private String delimiter; - - @Override - public void initialize(TaskAttemptContext context) throws IOException, InterruptedException { - super.initialize(context); - delimiter = getConf().get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT); - } - - @Override - public Text convertVertexToLine(Vertex vertex) throws IOException { - StringBuffer sb = new StringBuffer(vertex.getId().toString()); - sb.append(delimiter); - sb.append(vertex.getValue()); - sb.append(delimiter); - - for (Edge edge : vertex.getEdges()) { - sb.append(edge.getTargetVertexId()).append(","); - //sb.append(delimiter).append(edge.getValue()); - } - sb.setLength(sb.length()-1); - return new Text(sb.toString()); - } - } - -} \ No newline at end of file diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/DiffusionVertexInputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/DiffusionVertexInputFormat.java deleted file mode 100644 index 2ffa2d11e..000000000 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/DiffusionVertexInputFormat.java +++ /dev/null @@ -1,66 +0,0 @@ -package org.apache.giraph.examples.feature_diffusion_utils.io; - -import com.google.common.collect.Lists; - -import org.apache.giraph.examples.feature_diffusion_utils.datastructures.DiffusionVertexValue; - -import org.apache.giraph.edge.Edge; -import org.apache.giraph.edge.EdgeFactory; -import org.apache.giraph.io.formats.TextVertexInputFormat; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import java.io.IOException; -import java.util.List; - -public class DiffusionVertexInputFormat extends TextVertexInputFormat { - - @Override - public TextVertexReader createVertexReader( - InputSplit arg0, TaskAttemptContext arg1) throws IOException { - return new DiffusionVertexReader(); - } - - protected class DiffusionVertexReader extends TextVertexReaderFromEachLine{ - - @Override - protected Iterable> getEdges(Text line) throws IOException { - String[] fA = line.toString().split("\t"); - String[] edgeArray = fA[fA.length-1].split(","); - List> edges = Lists.newArrayList(); - int i; - for (i = 0; i < edgeArray.length; ++i) { - long neighborId = Long.parseLong(edgeArray[i]); - edges.add(EdgeFactory.create(new LongWritable(neighborId), - NullWritable.get())); - } - return edges; - } - - @Override - protected LongWritable getId(Text line) throws IOException { - return new LongWritable(Long.parseLong(line.toString().split("\t")[0])); - } - - @Override - protected DiffusionVertexValue getValue(Text line) throws IOException { - String[] split = line.toString().split("\t"); - String value=split[1]; - String [] reSplit=value.split(","); - if(reSplit.length==2) { - int treshold= Integer.parseInt(reSplit[1]); - int label=Integer.parseInt(reSplit[0]); - return new DiffusionVertexValue(treshold,label); - - }else { - int label=Integer.parseInt(reSplit[0]); - return new DiffusionVertexValue(label); - - } - } - } - -} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/LabelingInputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/LabelingInputFormat.java deleted file mode 100644 index 91471c01b..000000000 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/io/LabelingInputFormat.java +++ /dev/null @@ -1,62 +0,0 @@ -package org.apache.giraph.examples.feature_diffusion_utils.io; - -import com.google.common.collect.Lists; - -import org.apache.giraph.examples.feature_diffusion_utils.datastructures.LabelingVertexValue; - -import org.apache.giraph.edge.Edge; -import org.apache.giraph.edge.EdgeFactory; -import org.apache.giraph.io.formats.TextVertexInputFormat; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import java.io.IOException; -import java.util.List; - -public class LabelingInputFormat extends TextVertexInputFormat { - - @Override - public TextVertexReader createVertexReader( - InputSplit arg0, TaskAttemptContext arg1) throws IOException { - return new DiffusionVertexReader(); - } - - protected class DiffusionVertexReader extends TextVertexReaderFromEachLine{ - - @Override - protected Iterable> getEdges(Text line) throws IOException { - String[] fA = line.toString().split("\t"); - String[] edgeArray = fA[fA.length-1].split(","); - List> edges = Lists.newArrayList(); - int i; - for (i = 0; i < edgeArray.length; ++i) { - long neighborId = Long.parseLong(edgeArray[i]); - edges.add(EdgeFactory.create(new LongWritable(neighborId), - NullWritable.get())); - } - return edges; - } - - @Override - protected LongWritable getId(Text line) throws IOException { - return new LongWritable(Long.parseLong(line.toString().split("\t")[0])); - } - - @Override - protected LabelingVertexValue getValue(Text line) throws IOException { - String[] split = line.toString().split("\t"); - if (split.length==2){ - return new LabelingVertexValue(); - }else { - String treshold = split[split.length-2]; - return new LabelingVertexValue(Integer.parseInt(treshold)); - } - - } - - } - -} \ No newline at end of file diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/DegreeLabelingMigrationSimulationComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/DegreeLabelingMigrationSimulationComputation.java deleted file mode 100644 index 502be2a0c..000000000 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/DegreeLabelingMigrationSimulationComputation.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.apache.giraph.examples.feature_diffusion_utils.labeling; - -import java.io.IOException; - -import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullBasicComputation; -import org.apache.giraph.graph.Vertex; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; - -import org.apache.giraph.examples.feature_diffusion_utils.datastructures.LabelingVertexValue; - -public class DegreeLabelingMigrationSimulationComputation extends MigrationFullBasicComputation { - - - Logger LOG = Logger.getLogger(this.getClass()); - - - /*public void initialize(GraphState graphState, - WorkerClientRequestProcessor workerClientRequestProcessor, - CentralizedServiceWorker serviceWorker, - WorkerGlobalCommUsage workerGlobalCommUsage) { - super.initialize(graphState, workerClientRequestProcessor, serviceWorker, workerGlobalCommUsage); - delta = getConf().getDouble(DiffusionMasterCompute.diffusionDeltaOption, DiffusionMasterCompute.diffusionDeltaOptionDefault); - modelSwitch = getConf().getBoolean(DiffusionMasterCompute.diffusionListenOption, false); - - }*/ - - @Override - public void compute(Vertex vertex, Iterable msgs) - throws IOException { - LabelingVertexValue value = vertex.getValue(); - value.setLabel(vertex.getNumEdges()); - vertex.voteToHalt(); - } - -} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/FlatTimeLabelingMigrationSimulationComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/FlatTimeLabelingMigrationSimulationComputation.java deleted file mode 100644 index c0ea7432e..000000000 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/FlatTimeLabelingMigrationSimulationComputation.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.apache.giraph.examples.feature_diffusion_utils.labeling; - -import java.io.IOException; - -import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullBasicComputation; -import org.apache.giraph.graph.Vertex; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; - -import org.apache.giraph.examples.feature_diffusion_utils.datastructures.LabelingVertexValue; - -public class FlatTimeLabelingMigrationSimulationComputation extends MigrationFullBasicComputation { - - Logger LOG = Logger.getLogger(this.getClass()); - - /*public void initialize(GraphState graphState, - WorkerClientRequestProcessor workerClientRequestProcessor, - CentralizedServiceWorker serviceWorker, - WorkerGlobalCommUsage workerGlobalCommUsage) { - super.initialize(graphState, workerClientRequestProcessor, serviceWorker, workerGlobalCommUsage); - delta = getConf().getDouble(DiffusionMasterCompute.diffusionDeltaOption, DiffusionMasterCompute.diffusionDeltaOptionDefault); - modelSwitch = getConf().getBoolean(DiffusionMasterCompute.diffusionListenOption, false); - - }*/ - - @Override - public void compute(Vertex vertex, Iterable msgs) - throws IOException { - LabelingVertexValue value = vertex.getValue(); - value.setLabel(1); - vertex.voteToHalt(); - } - -} \ No newline at end of file diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/KCoreLabelingMigrationSimulationComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/KCoreLabelingMigrationSimulationComputation.java deleted file mode 100644 index 4686706ec..000000000 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/KCoreLabelingMigrationSimulationComputation.java +++ /dev/null @@ -1,86 +0,0 @@ -package org.apache.giraph.examples.feature_diffusion_utils.labeling; - -import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullBasicComputation; -import org.apache.giraph.bsp.CentralizedServiceWorker; -import org.apache.giraph.comm.WorkerClientRequestProcessor; -import org.apache.giraph.graph.GraphState; -import org.apache.giraph.graph.Vertex; -import org.apache.giraph.worker.WorkerGlobalCommUsage; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.log4j.Logger; - - -import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map.Entry; - -import org.apache.giraph.examples.feature_diffusion_utils.datastructures.*; - -@SuppressWarnings("unused") -public class KCoreLabelingMigrationSimulationComputation extends MigrationFullBasicComputation { - - Logger LOG = Logger.getLogger(this.getClass()); - - - /*public void initialize(GraphState graphState, - WorkerClientRequestProcessor workerClientRequestProcessor, - CentralizedServiceWorker serviceWorker, - WorkerGlobalCommUsage workerGlobalCommUsage) { - super.initialize(graphState, workerClientRequestProcessor, serviceWorker, workerGlobalCommUsage); - delta = getConf().getDouble(DiffusionMasterCompute.diffusionDeltaOption, DiffusionMasterCompute.diffusionDeltaOptionDefault); - modelSwitch = getConf().getBoolean(DiffusionMasterCompute.diffusionListenOption, false); - - }*/ - - @Override - public void compute(Vertex vertex, Iterable msgs) - throws IOException { - LabelingVertexValue value = vertex.getValue(); - if(getSuperstep()==0) { - value.setLabel(Math.max(vertex.getNumEdges(),1)); - sendMessageToAllEdges(vertex, new Text(""+vertex.getId().get()+" "+value.getLabel())); - value.setChanged(false); - }else { - - for(Text msg: msgs) { - long id = Long.parseLong(msg.toString().split(" ")[0]); - int coreness = Integer.parseInt(msg.toString().split(" ")[1]); - value.updateNeighboorLabel(id, coreness); - } - - int tempLabel = computeIndex(value.getNeighboorsLabel(),value.getLabel()); - if (tempLabel neighboorsLabel, long coreness) { - int[] corenessCount = new int[(int) coreness]; - for (int i = 0 ; i pair: neighboorsLabel.entrySet()) { - long corenessCandidate =Math.min( pair.getValue() , coreness); - corenessCount[(int)corenessCandidate-1]++; - } - for (int i=(int) (coreness-1); i>0 ; i--) - corenessCount[i-1]+=corenessCount[i]; - int i = (int) coreness; - while(i>1 && corenessCount[i-1]> computationClass=(Class>)conf.getClass("giraph.typesHolder", KCoreLabelingMigrationSimulationComputation.class); - return createMigrationAppBlock( - computationClass, - new MigrationFullMasterCompute(), - Text.class, - null, - conf); - } - - @Override - protected Class getEdgeValueClass(GiraphConfiguration arg0) { - return NullWritable.class; - } - - @Override - protected Class getVertexIDClass(GiraphConfiguration arg0) { - return LongWritable.class; - } - - @Override - protected Class getVertexValueClass(GiraphConfiguration arg0) { - return LabelingVertexValue.class; - } - -} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/PageRankLabelingMigrationSimulationComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/PageRankLabelingMigrationSimulationComputation.java deleted file mode 100644 index 2b4c8b51d..000000000 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion_utils/labeling/PageRankLabelingMigrationSimulationComputation.java +++ /dev/null @@ -1,52 +0,0 @@ -package org.apache.giraph.examples.feature_diffusion_utils.labeling; - -import java.io.IOException; - -import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullBasicComputation; -import org.apache.giraph.graph.Vertex; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; - -import org.apache.giraph.examples.feature_diffusion_utils.datastructures.*; - -public class PageRankLabelingMigrationSimulationComputation extends MigrationFullBasicComputation { - - - Logger LOG = Logger.getLogger(this.getClass()); - - - /*public void initialize(GraphState graphState, - WorkerClientRequestProcessor workerClientRequestProcessor, - CentralizedServiceWorker serviceWorker, - WorkerGlobalCommUsage workerGlobalCommUsage) { - super.initialize(graphState, workerClientRequestProcessor, serviceWorker, workerGlobalCommUsage); - delta = getConf().getDouble(DiffusionMasterCompute.diffusionDeltaOption, DiffusionMasterCompute.diffusionDeltaOptionDefault); - modelSwitch = getConf().getBoolean(DiffusionMasterCompute.diffusionListenOption, false); - - }*/ - - @Override - public void compute(Vertex vertex, Iterable msgs) - throws IOException { - LabelingVertexValue value = vertex.getValue(); - if (getSuperstep() >= 1) { - double sum = 0; - for (Text message : msgs) { - sum += Double.parseDouble(message.toString()); - } - double pr=((0.15f / getTotalNumVertices()) + 0.85f * sum); - value.setTemp(pr);//to change, removing - - } - if (getSuperstep() < 50) { - sendMessageToAllEdges(vertex, new Text( ""+ (value.getTemp() / vertex.getNumEdges()) ) ); - } else { - int cif= (int)(Math.log10(getTotalNumVertices())+2); - value.setLabel((long)(value.getTemp()*Math.pow(10, cif))); - vertex.voteToHalt(); - } - } - -} \ No newline at end of file