From 248810873e6cfae66cb8b84f0b81bc1dfcbe5fdc Mon Sep 17 00:00:00 2001 From: ASvyatkovskiy Date: Wed, 13 Jul 2016 11:37:00 -0400 Subject: [PATCH 1/8] Add flag to enable switch between in-out state and in-out & in-in. Not propagated to the code yet --- src/main/resources/makeCartesian.conf | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/main/resources/makeCartesian.conf b/src/main/resources/makeCartesian.conf index d394b85..2b5b912 100644 --- a/src/main/resources/makeCartesian.conf +++ b/src/main/resources/makeCartesian.conf @@ -1,9 +1,10 @@ makeCartesian { nPartitions = 30, - docVersion = "Enacted", - inputFile = "file:///scratch/network/alexeys/bills/lexs/bills_metadata_3b.json", - outputFile = "/user/alexeys/valid_pairs", - use_strict = true, + docVersion = "Introduced", + inOutOnly = false, + inputFile = "file:///scratch/network/alexeys/bills/lexs/bills_metadata_50000.json", + outputFile = "/user/alexeys/valid_pairs_50000test", + use_strict = false, strict_state = 5, strict_docid = "HB1001", strict_year = 1991 From 75e7f2e51b7a655625fc17a586bf2c9aa9258f65 Mon Sep 17 00:00:00 2001 From: ASvyatkovskiy Date: Wed, 13 Jul 2016 14:18:32 -0400 Subject: [PATCH 2/8] Section parser: save work --- dataformat/secformat_for_df.py | 53 ++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 dataformat/secformat_for_df.py diff --git a/dataformat/secformat_for_df.py b/dataformat/secformat_for_df.py new file mode 100644 index 0000000..03e0188 --- /dev/null +++ b/dataformat/secformat_for_df.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python + +import simplejson +import re + +#switch between primary key as a long integer and primary key as a string +use_cryptic_pk = True + +finput = open("/scratch/network/alexeys/bills/lexs/bills_50000.json","r") + +#data first +foutput = open("/scratch/network/alexeys/bills/lexs/sectioned_bills_50000.json","wa") + +#step through JSON. Tokenize "content" string +sec_output_dicts = list() +for line in finput.readlines(): + output_dict = simplejson.loads(line) + for j,section in enumerate(re.split("SECTION \d|section \d",output_dict['content'])): + sec_output_dict = {'primary_key':'', 'content':None} + + sec_output_dict['content'] = section + sec_output_dict['primary_key'] = str(j)+"_"+output_dict['primary_key'] + sec_output_dicts.append(sec_output_dict) + +for i, sec_output_dict in enumerate(sec_output_dicts): + if not use_cryptic_pk: sec_output_dict['primary_key'] = i + simplejson.dump(sec_output_dict, foutput) + foutput.write('\n') + +#metadata +#FIXME yes... +finput = open("/scratch/network/alexeys/bills/lexs/bills_50000.json","r") +finput_meta = open("/scratch/network/alexeys/bills/lexs/bills_metadata_50000.json","r") +foutput_meta = open("/scratch/network/alexeys/bills/lexs/sectioned_bills_metadata_50000.json","wa") + +#step through JSON. Tokenize "content" string +sec_output_dicts_meta = list() +for line_meta, line in zip(finput_meta.readlines(),finput.readlines()): + output_dict_meta = simplejson.loads(line_meta) + output_dict = simplejson.loads(line) + for j in range(len(re.split("SECTION \d|section \d",output_dict['content']))): + sec_output_dict_meta = {'year':None, 'state':'','docid':'', 'docversion':'', 'primary_key':''} + sec_output_dict_meta['primary_key'] = str(j)+"_"+output_dict_meta['primary_key'] + sec_output_dict_meta['year'] = output_dict_meta['year'] + sec_output_dict_meta['state'] = output_dict_meta['state'] + sec_output_dict_meta['docid'] = output_dict_meta['docid'] + sec_output_dict_meta['docversion'] = output_dict_meta['docversion'] + sec_output_dicts_meta.append(sec_output_dict_meta) + +for i, sec_output_dict_meta in enumerate(sec_output_dicts_meta): + if not use_cryptic_pk: sec_output_dict_meta['primary_key'] = i + simplejson.dump(sec_output_dict_meta, foutput_meta) + foutput_meta.write('\n') From d5290f5073725240c9ab085e0ab8470291a5b562 Mon Sep 17 00:00:00 2001 From: ASvyatkovskiy Date: Thu, 14 Jul 2016 11:10:09 -0400 Subject: [PATCH 3/8] Add in-out flag --- src/main/resources/makeCartesian.conf | 6 +++--- src/main/scala/MakeCartesian.scala | 25 +++++++++++++++++-------- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/src/main/resources/makeCartesian.conf b/src/main/resources/makeCartesian.conf index 2b5b912..d6cc4fa 100644 --- a/src/main/resources/makeCartesian.conf +++ b/src/main/resources/makeCartesian.conf @@ -1,9 +1,9 @@ makeCartesian { nPartitions = 30, docVersion = "Introduced", - inOutOnly = false, - inputFile = "file:///scratch/network/alexeys/bills/lexs/bills_metadata_50000.json", - outputFile = "/user/alexeys/valid_pairs_50000test", + onlyInOut = false, + inputFile = "file:///scratch/network/alexeys/bills/lexs/sectioned_bills_metadata_50000.json", + outputFile = "/user/alexeys/valid_section_pairs_50000test", use_strict = false, strict_state = 5, strict_docid = "HB1001", diff --git a/src/main/scala/MakeCartesian.scala b/src/main/scala/MakeCartesian.scala index 0f71b4a..1b8ccd4 100644 --- a/src/main/scala/MakeCartesian.scala +++ b/src/main/scala/MakeCartesian.scala @@ -4,6 +4,7 @@ Application: MakeCartesian, produce all the pairs of primary keys of the documen Following parameters need to be filled in the resources/makeCartesian.conf file: docVersion: document version: consider document pairs having a specific version. E.g. Introduced, Enacted... nPartitions: number of partitions in bills_meta RDD + onlyInOut: a switch between in-out of state and using both in-out and in-in state pairs use_strict: boolean, yes or no to consider strict parameters strict_state: specify state (a long integer from 1 to 50) for one-against-all user selection (strict) strict_docid: specify document ID for one-against-all user selection (strict) @@ -33,7 +34,7 @@ import scala.collection.mutable.WrappedArray object MakeCartesian { - def pairup (document: MetaDocument, thewholething: org.apache.spark.broadcast.Broadcast[Array[MetaDocument]], strict_params: Tuple4[Boolean, Int, java.lang.String, Int]) : (MetaDocument, Array[CartesianPair]) = { + def pairup (document: MetaDocument, thewholething: org.apache.spark.broadcast.Broadcast[Array[MetaDocument]], strict_params: Tuple4[Boolean, Int, java.lang.String, Int], onlyInOut: Bool) : (MetaDocument, Array[CartesianPair]) = { val documents = thewholething.value @@ -62,12 +63,20 @@ object MakeCartesian { } } } else { - //simple condition - if (pk1 < pk2 && istate != jstate) { - var output: CartesianPair = CartesianPair(pk1,pk2) - output_arr += output - } - } + //simple condition + if (onlyInOut) { + if (pk1 < pk2 && istate != jstate) { + var output: CartesianPair = CartesianPair(pk1,pk2) + output_arr += output + } + } else { + //in-out and in-in + if (pk1 < pk2) { + var output: CartesianPair = CartesianPair(pk1,pk2) + output_arr += output + } + } + } } (document,output_arr.toArray) } @@ -106,7 +115,7 @@ object MakeCartesian { //will be array of tuples, but the keys are unique var cartesian_pairs = bills_meta.rdd.repartition(params.getInt("makeCartesian.nPartitions")) - .map(x => pairup(x,bills_meta_bcast, strict_params)) + .map(x => pairup(x,bills_meta_bcast, strict_params, params.getInt("makeCartesian.onlyInOut"))) .filter({case (dd,ll) => (ll.length > 0)}) .map({case(k,v) => v}).flatMap(x => x) //.groupByKey() From 8239d01cb67283e9e3390414b46cdf1beda8b5a4 Mon Sep 17 00:00:00 2001 From: ASvyatkovskiy Date: Thu, 14 Jul 2016 11:18:30 -0400 Subject: [PATCH 4/8] onlyInOut argument type is Boolean --- src/main/scala/MakeCartesian.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/MakeCartesian.scala b/src/main/scala/MakeCartesian.scala index 1b8ccd4..8da6b0b 100644 --- a/src/main/scala/MakeCartesian.scala +++ b/src/main/scala/MakeCartesian.scala @@ -34,7 +34,7 @@ import scala.collection.mutable.WrappedArray object MakeCartesian { - def pairup (document: MetaDocument, thewholething: org.apache.spark.broadcast.Broadcast[Array[MetaDocument]], strict_params: Tuple4[Boolean, Int, java.lang.String, Int], onlyInOut: Bool) : (MetaDocument, Array[CartesianPair]) = { + def pairup (document: MetaDocument, thewholething: org.apache.spark.broadcast.Broadcast[Array[MetaDocument]], strict_params: Tuple4[Boolean, Int, java.lang.String, Int], onlyInOut: Boolean) : (MetaDocument, Array[CartesianPair]) = { val documents = thewholething.value @@ -115,7 +115,7 @@ object MakeCartesian { //will be array of tuples, but the keys are unique var cartesian_pairs = bills_meta.rdd.repartition(params.getInt("makeCartesian.nPartitions")) - .map(x => pairup(x,bills_meta_bcast, strict_params, params.getInt("makeCartesian.onlyInOut"))) + .map(x => pairup(x,bills_meta_bcast, strict_params, params.getBoolean("makeCartesian.onlyInOut"))) .filter({case (dd,ll) => (ll.length > 0)}) .map({case(k,v) => v}).flatMap(x => x) //.groupByKey() From fe047f9d6212d78c1a02617bd19e501ffccf5a45 Mon Sep 17 00:00:00 2001 From: ASvyatkovskiy Date: Fri, 15 Jul 2016 15:18:43 -0400 Subject: [PATCH 5/8] Add a possibility to include n-gram features with arbitrary n. Provide a switch --- src/main/scala/AdhocAnalyzer.scala | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/src/main/scala/AdhocAnalyzer.scala b/src/main/scala/AdhocAnalyzer.scala index aba5256..3752a3d 100644 --- a/src/main/scala/AdhocAnalyzer.scala +++ b/src/main/scala/AdhocAnalyzer.scala @@ -2,6 +2,8 @@ Following parameters need to be filled in the resources/adhocAnalyzer.conf file: numTextFeatures: Number of text features to keep in hashingTF + addNGramFeatures: Boolean flag to indicate whether to add n-gram features + nGramGranularity: granularity of a rolling n-gram measureName: Similarity measure used inputBillsFile: Bill input file, one JSON per line inputPairsFile: CartesianPairs object input file @@ -33,9 +35,6 @@ import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors object AdhocAnalyzer { - /* - Experimental - */ def converted(row: scala.collection.Seq[Any]) : Tuple2[String,SparseVector] = { val ret = row.asInstanceOf[WrappedArray[Any]] val first = ret(0).asInstanceOf[String] @@ -59,6 +58,10 @@ object AdhocAnalyzer { println("Elapsed time: " + (t1 - t0)/1000000000 + "s") } + def appendFeature(a: WrappedArray[String], b: WrappedArray[String]) : WrappedArray[String] = { + a ++ b + } + def run(params: Config) { val conf = new SparkConf().setAppName("AdhocAnalyzer") @@ -85,24 +88,31 @@ object AdhocAnalyzer { //remove stopwords var remover = new StopWordsRemover().setInputCol("words").setOutputCol("filtered") - val filtered_df = remover.transform(tokenized_df).drop("words") + var prefeaturized_df = remover.transform(tokenized_df).drop("words") + + if (params.getBoolean("adhocAnalyzer.addNGramFeatures")) { - //ngram = NGram(n=2, inputCol="filtered", outputCol="ngram") - //ngram_df = ngram.transform(tokenized_df) + val ngram = new NGram().setN(params.getInt("adhocAnalyzer.nGramGranularity")).setInputCol("filtered").setOutputCol("ngram") + val ngram_df = ngram.transform(prefeaturized_df) + + def appendFeature_udf = udf(appendFeature _) + prefeaturized_df = ngram_df.withColumn("combined", appendFeature_udf(col("filtered"),col("ngram"))).drop("filtered").drop("ngram").drop("cleaned") + } else { + prefeaturized_df = prefeaturized_df.select(col("primary_key"),col("filtered").alias("combined")) + prefeaturized_df.printSchema() + } //hashing - var hashingTF = new HashingTF().setInputCol("filtered").setOutputCol("rawFeatures").setNumFeatures(params.getInt("adhocAnalyzer.numTextFeatures")) - val featurized_df = hashingTF.transform(filtered_df).drop("filtered") + var hashingTF = new HashingTF().setInputCol("combined").setOutputCol("rawFeatures").setNumFeatures(params.getInt("adhocAnalyzer.numTextFeatures")) + val featurized_df = hashingTF.transform(prefeaturized_df) var idf = new IDF().setInputCol("rawFeatures").setOutputCol("pre_features") //val Array(train, cv) = featurized_df.randomSplit(Array(0.7, 0.3)) var idfModel = idf.fit(featurized_df) val rescaled_df = idfModel.transform(featurized_df).drop("rawFeatures") - rescaled_df.show(5) val hashed_bills = featurized_df.select("primary_key","rawFeatures").rdd.map(row => converted(row.toSeq)) - //Experimental //First, run the hashing step here val cartesian_pairs = spark.objectFile[CartesianPair](params.getString("adhocAnalyzer.inputPairsFile")).map(pp => (pp.pk1,pp.pk2)) From 383b9bf4362b9bff542272270741c2b7c75d9a08 Mon Sep 17 00:00:00 2001 From: ASvyatkovskiy Date: Fri, 15 Jul 2016 15:19:33 -0400 Subject: [PATCH 6/8] Add parameters nGramGranularity and addNGramFeatures to control n-gram features --- src/main/resources/adhocAnalyzer.conf | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main/resources/adhocAnalyzer.conf b/src/main/resources/adhocAnalyzer.conf index 0909b0d..8c00c9f 100644 --- a/src/main/resources/adhocAnalyzer.conf +++ b/src/main/resources/adhocAnalyzer.conf @@ -1,7 +1,9 @@ adhocAnalyzer { - numTextFeatures = 5000, + numTextFeatures = 16384, + addNGramFeatures = false, + nGramGranularity = 2, measureName = "cosine", - inputBillsFile = "file:///scratch/network/alexeys/bills/lexs/bills_3b.json", - inputPairsFile = "/user/alexeys/valid_pairs", - outputMainFile = "/user/alexeys/test_main_output" + inputBillsFile = "file:///scratch/network/alexeys/bills/lexs/sectioned_bills_50000.json", + inputPairsFile = "/user/alexeys/valid_section_pairs_50000test", + outputMainFile = "/user/alexeys/sectioned_output_cosine_50000test" } From c76c9fe75a8ba41ce086aedf9549da7e437fd852 Mon Sep 17 00:00:00 2001 From: ASvyatkovskiy Date: Fri, 15 Jul 2016 15:21:49 -0400 Subject: [PATCH 7/8] Introduce a cut on a minimum section size to remove unphysical sections --- dataformat/secformat_for_df.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/dataformat/secformat_for_df.py b/dataformat/secformat_for_df.py index 03e0188..28d7304 100644 --- a/dataformat/secformat_for_df.py +++ b/dataformat/secformat_for_df.py @@ -15,7 +15,9 @@ sec_output_dicts = list() for line in finput.readlines(): output_dict = simplejson.loads(line) - for j,section in enumerate(re.split("SECTION \d|section \d",output_dict['content'])): + sections = re.split("SECTION \d|section \d",output_dict['content']) + filtered = filter(lambda x: len(x) > 170,sections) + for j,section in enumerate(filtered): sec_output_dict = {'primary_key':'', 'content':None} sec_output_dict['content'] = section @@ -38,7 +40,9 @@ for line_meta, line in zip(finput_meta.readlines(),finput.readlines()): output_dict_meta = simplejson.loads(line_meta) output_dict = simplejson.loads(line) - for j in range(len(re.split("SECTION \d|section \d",output_dict['content']))): + sections = re.split("SECTION \d|section \d",output_dict['content']) + filtered = filter(lambda x: len(x) > 170,sections) + for j in range(len(filtered)): sec_output_dict_meta = {'year':None, 'state':'','docid':'', 'docversion':'', 'primary_key':''} sec_output_dict_meta['primary_key'] = str(j)+"_"+output_dict_meta['primary_key'] sec_output_dict_meta['year'] = output_dict_meta['year'] From 9ef27ac856007af1c62b6921d05869d5ee0c6cda Mon Sep 17 00:00:00 2001 From: ASvyatkovskiy Date: Fri, 15 Jul 2016 15:29:09 -0400 Subject: [PATCH 8/8] Section level analysis is currently perfromed using the same classes, but starting off of different inputs. See dataformat/secformat_for_df.py for more details --- src/main/scala/AdhocSectionAnalyzer.scala | 160 ---------------------- src/main/scala/SectionAnalyzer.scala | 52 ------- src/main/scala/SectionLevelUtils.scala | 37 ----- 3 files changed, 249 deletions(-) delete mode 100644 src/main/scala/AdhocSectionAnalyzer.scala delete mode 100644 src/main/scala/SectionAnalyzer.scala delete mode 100644 src/main/scala/SectionLevelUtils.scala diff --git a/src/main/scala/AdhocSectionAnalyzer.scala b/src/main/scala/AdhocSectionAnalyzer.scala deleted file mode 100644 index 6ac71d3..0000000 --- a/src/main/scala/AdhocSectionAnalyzer.scala +++ /dev/null @@ -1,160 +0,0 @@ -/*AdhocSectionAnalyzer: an app. that performs document or section similarity searches starting off CartesianPairs - -Following parameters need to be filled in the resources/adhocAnalyzer.conf file: - nPartitions: Number of partitions in bills_meta RDD - numTextFeatures: Number of text features to keep in hashingTF - measureName: Similarity measure used - inputBillsFile: Bill input file, one JSON per line - inputPairsFile: CartesianPairs object input file - outputMainFile: key-key pairs and corresponding similarities, as Tuple2[Tuple2[String,String],Double] - outputFilteredFile: CartesianPairs passing similarity threshold -*/ - -import com.typesafe.config._ - -import org.apache.spark.{SparkConf, SparkContext, SparkFiles} -import org.apache.spark.SparkContext._ -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.functions._ - -import org.apache.spark.ml.feature.{HashingTF, IDF} -import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer} -import org.apache.spark.ml.feature.NGram -import org.apache.spark.ml.feature.StopWordsRemover - -//import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator} - -import org.apache.spark.sql.Row -import org.apache.spark.sql.types._ -import org.apache.spark.sql.functions.explode - -import scala.collection.mutable.WrappedArray - -import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} - - -object AdhocSectionAnalyzer { - - /* - Experimental - */ - def converted(row: scala.collection.Seq[Any]) : Tuple2[String,SparseVector] = { - val ret = row.asInstanceOf[WrappedArray[Any]] - val first = ret(0).asInstanceOf[String] - val second = ret(1).asInstanceOf[Vector] - Tuple2(first,second.toSparse) - } - - //get type of var utility - def manOf[T: Manifest](t: T): Manifest[T] = manifest[T] - - def main(args: Array[String]) { - - println(s"\nExample submit command: spark-submit --class AdhocSectionAnalyzer --master yarn-client --num-executors 30 --executor-cores 3 --executor-memory 10g target/scala-2.10/BillAnalysis-assembly-1.0.jar\n") - - val t0 = System.nanoTime() - - val params = ConfigFactory.load("adhocAnalyzer") - run(params) - - val t1 = System.nanoTime() - println("Elapsed time: " + (t1 - t0)/1000000000 + "s") - } - - def run(params: Config) { - - val conf = new SparkConf().setAppName("AdhocSectionAnalyzer") - .set("spark.dynamicAllocation.enabled","true") - .set("spark.shuffle.service.enabled","true") - - val spark = new SparkContext(conf) - val sqlContext = new org.apache.spark.sql.SQLContext(spark) - import sqlContext.implicits._ - - val input = sqlContext.read.json(params.getString("adhocAnalyzer.inputBillsFile")) - val npartitions = (200*(input.count()/100000)).toInt - - val bills = input.repartition(npartitions,col("primary_key"),col("content")) - bills.explain - - //cannot do cleaning because I tokenize sections on section - digit pattern - //def cleaner_udf = udf((s: String) => s.replaceAll("(\\d|,|:|;|\\?|!)", "")) - //val cleaned_df = bills.withColumn("cleaned",cleaner_udf(col("content"))).drop("content") - //cleaned_df.show(15) - - //tokenizer = Tokenizer(inputCol="text", outputCol="words") - var tokenizer1 = new RegexTokenizer().setInputCol("content").setOutputCol("sections").setPattern("(SECTION \\d|section \\d)") - val tokenized1_df = tokenizer1.transform(bills) //cleaned_df) - - //flatten the column - val flattened_df = tokenized1_df.withColumn("sections", explode($"sections")).drop("cleaned") - - var tokenizer2 = new RegexTokenizer().setInputCol("sections").setOutputCol("words").setPattern("\\W") - val tokenized2_df = tokenizer2.transform(flattened_df) - - //remove stopwords - var remover = new StopWordsRemover().setInputCol("words").setOutputCol("filtered") - val filtered_df = remover.transform(tokenized2_df).drop("words") - - //ngram = NGram(n=2, inputCol="filtered", outputCol="ngram") - //ngram_df = ngram.transform(tokenized1_df) - - //hashing - var hashingTF = new HashingTF().setInputCol("filtered").setOutputCol("rawFeatures").setNumFeatures(params.getInt("adhocAnalyzer.numTextFeatures")) - val featurized_df = hashingTF.transform(filtered_df).drop("filtered") - featurized_df.show(5) - - var idf = new IDF().setInputCol("rawFeatures").setOutputCol("pre_features") - //val Array(train, cv) = featurized_df.randomSplit(Array(0.7, 0.3)) - var idfModel = idf.fit(featurized_df) - val rescaled_df = idfModel.transform(featurized_df).drop("rawFeatures") - - val hashed_bills = featurized_df.select("primary_key","rawFeatures").rdd.map(row => converted(row.toSeq)) - - //First, run the hashing step here - val cartesian_pairs = spark.objectFile[CartesianPair](params.getString("adhocAnalyzer.inputPairsFile")).map(pp => (pp.pk1,pp.pk2)) - - var similarityMeasure: SimilarityMeasure = null - var threshold: Double = 0.0 - - params.getString("adhocAnalyzer.measureName") match { - case "cosine" => { - similarityMeasure = CosineSimilarity - //threshold = ??? - } - case "hamming" => { - similarityMeasure = HammingSimilarity - //threshold = ??? - } - case "manhattan" => { - similarityMeasure = ManhattanSimilarity - //threshold = ??? - } - case "jaccard" => { - similarityMeasure = JaccardSimilarity - //threshold = ??? - } - case other: Any => - throw new IllegalArgumentException( - s"Only hamming, cosine, euclidean, manhattan, and jaccard similarities are supported but got $other." - ) - } - - val firstjoin = cartesian_pairs.map({case (k1,k2) => (k1, (k1,k2))}) - .join(hashed_bills) - .map({case (_, ((k1, k2), v1)) => ((k1, k2), v1)}).filter({case ((k1, k2), v1) => (k1 != k2)}) - - val matches = firstjoin.map({case ((k1,k2),v1) => (k2, ((k1,k2),v1))}) - .join(hashed_bills) - .map({case(_, (((k1,k2), v1), v2))=>((k1, k2),(v1, v2))}).mapValues({case (v1,v2) => similarityMeasure.compute(v1.toSparse,v2.toSparse)}) - - //val matches_df = matches.map({case ((k1,k2), v1)=>(k1, k2, v1)}).filter({case (k1, k2, v1) => ((k1 contains "CO_2006_HB1175") || (k2 contains "CO_2006_HB1175"))}).toDF("pk1","pk2","similarity").groupBy("pk1","pk2").max("similarity") - val matches_df = matches.map({case ((k1,k2), v1)=>(k1, k2, v1)}).toDF("pk1","pk2","similarity").groupBy("pk1","pk2").max("similarity").select(col("pk1"),col("pk2"),col("max(similarity)").alias("max_similarity")) - //matches_df.show(1000,false) - matches_df.write.parquet(params.getString("adhocAnalyzer.outputMainFile")) - - //matches.saveAsObjectFile(params.getString("adhocAnalyzer.outputMainFile")) - - spark.stop() - } -} diff --git a/src/main/scala/SectionAnalyzer.scala b/src/main/scala/SectionAnalyzer.scala deleted file mode 100644 index 89770d6..0000000 --- a/src/main/scala/SectionAnalyzer.scala +++ /dev/null @@ -1,52 +0,0 @@ -import com.typesafe.config._ - -import org.apache.spark.{SparkConf, SparkContext, SparkFiles} -import org.apache.spark.SparkContext._ -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.functions._ - -object SectionAnalyzer { - - //get type of var utility - def manOf[T: Manifest](t: T): Manifest[T] = manifest[T] - - def main(args: Array[String]) { - - println(s"\nExample submit command: spark-submit --class SectionAnalyzer --master yarn-client --num-executors 40 --executor-cores 2 --executor-memory 15g target/scala-2.10/BillAnalysis-assembly-1.0.jar\n") - - val t0 = System.nanoTime() - val params = ConfigFactory.load("sectionAnalyzer") - run(params) - - val t1 = System.nanoTime() - println("Elapsed time: " + (t1 - t0)/1000000000 + "s") - } - - def run(params: Config) { - - val conf = new SparkConf().setAppName("SectionAnalyzer") - .set("spark.driver.maxResultSize", "10g") - .set("spark.dynamicAllocation.enabled","true") - .set("spark.shuffle.service.enabled","true") - - val spark = new SparkContext(conf) - val sqlContext = new org.apache.spark.sql.SQLContext(spark) - import sqlContext.implicits._ - - val bills = sqlContext.read.json(params.getString("sectionAnalyzer.inputBillsFile")).as[Document] - - val hashed_bills = bills.rdd.map(bill => (bill.primary_key,bill.content)).mapValues(content => SectionLevelUtils.preprocess(content)).cache() - val cartesian_pairs = spark.objectFile[Tuple2[String,String]](params.getString("sectionAnalyzer.inputFilteredFile")) - - val firstjoin = cartesian_pairs.map({case (k1,k2) => (k1, (k1,k2))}) - .join(hashed_bills) - .map({case (_, ((k1, k2), v1)) => ((k1, k2), v1)}) - val matches = firstjoin.map({case ((k1,k2),v1) => (k2, ((k1,k2),v1))}) - .join(hashed_bills) - .map({case(_, (((k1,k2), v1), v2))=>((k1, k2),(v1, v2))}).mapValues(pp => SectionLevelUtils.extractSimilarities(pp)) - - matches.saveAsObjectFile(params.getString("sectionAnalyzer.outputMainSectionFile")) - - spark.stop() - } -} diff --git a/src/main/scala/SectionLevelUtils.scala b/src/main/scala/SectionLevelUtils.scala deleted file mode 100644 index 81873d7..0000000 --- a/src/main/scala/SectionLevelUtils.scala +++ /dev/null @@ -1,37 +0,0 @@ -import org.apache.spark.{SparkConf, SparkContext, SparkFiles} -import org.apache.spark.SparkContext._ -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.functions._ - -import scala.collection.mutable.ArrayBuffer - -object SectionLevelUtils { - - def preprocess (line: String) : ArrayBuffer[ArrayBuffer[Long]] = { - - val sectionPattern = "(SECTION \\d|section \\d)" - val distinct_sections: Array[String] = line.split(sectionPattern) - var combined_wGrps = ArrayBuffer.empty[ArrayBuffer[Long]] - - //for each section - for (sec <- distinct_sections) { - val wGrps: ArrayBuffer[Long] = DocumentLevelUtils.preprocess(sec) - combined_wGrps += wGrps - } - combined_wGrps - } - - //calculate similarities for each sections (double for loop kind of thing) - def extractSimilarities = (section_grps: Tuple2[ArrayBuffer[ArrayBuffer[Long]], ArrayBuffer[ArrayBuffer[Long]]]) => { - - var matchCnt = ArrayBuffer.empty[Double] - val igrp = section_grps._1 - val jgrp = section_grps._2 - for (isec <- 0 to igrp.length-1) { - for (jsec <- isec to jgrp.length-1) { - matchCnt += DocumentLevelUtils.extractSimilarities((igrp(isec),jgrp(jsec))) - } - } - matchCnt.reduceLeft(_ max _) - } -}