Skip to content

Commit

Permalink
Merge pull request #9 from ASvyatkovskiy/from_cartesian
Browse files Browse the repository at this point in the history
Section level support, n-gram features
  • Loading branch information
ASvyatkovskiy authored Jul 19, 2016
2 parents c563adc + 9ef27ac commit ba5a686
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 275 deletions.
57 changes: 57 additions & 0 deletions dataformat/secformat_for_df.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/usr/bin/env python

import simplejson
import re

#switch between primary key as a long integer and primary key as a string
use_cryptic_pk = True

finput = open("/scratch/network/alexeys/bills/lexs/bills_50000.json","r")

#data first
foutput = open("/scratch/network/alexeys/bills/lexs/sectioned_bills_50000.json","wa")

#step through JSON. Tokenize "content" string
sec_output_dicts = list()
for line in finput.readlines():
output_dict = simplejson.loads(line)
sections = re.split("SECTION \d|section \d",output_dict['content'])
filtered = filter(lambda x: len(x) > 170,sections)
for j,section in enumerate(filtered):
sec_output_dict = {'primary_key':'', 'content':None}

sec_output_dict['content'] = section
sec_output_dict['primary_key'] = str(j)+"_"+output_dict['primary_key']
sec_output_dicts.append(sec_output_dict)

for i, sec_output_dict in enumerate(sec_output_dicts):
if not use_cryptic_pk: sec_output_dict['primary_key'] = i
simplejson.dump(sec_output_dict, foutput)
foutput.write('\n')

#metadata
#FIXME yes...
finput = open("/scratch/network/alexeys/bills/lexs/bills_50000.json","r")
finput_meta = open("/scratch/network/alexeys/bills/lexs/bills_metadata_50000.json","r")
foutput_meta = open("/scratch/network/alexeys/bills/lexs/sectioned_bills_metadata_50000.json","wa")

#step through JSON. Tokenize "content" string
sec_output_dicts_meta = list()
for line_meta, line in zip(finput_meta.readlines(),finput.readlines()):
output_dict_meta = simplejson.loads(line_meta)
output_dict = simplejson.loads(line)
sections = re.split("SECTION \d|section \d",output_dict['content'])
filtered = filter(lambda x: len(x) > 170,sections)
for j in range(len(filtered)):
sec_output_dict_meta = {'year':None, 'state':'','docid':'', 'docversion':'', 'primary_key':''}
sec_output_dict_meta['primary_key'] = str(j)+"_"+output_dict_meta['primary_key']
sec_output_dict_meta['year'] = output_dict_meta['year']
sec_output_dict_meta['state'] = output_dict_meta['state']
sec_output_dict_meta['docid'] = output_dict_meta['docid']
sec_output_dict_meta['docversion'] = output_dict_meta['docversion']
sec_output_dicts_meta.append(sec_output_dict_meta)

for i, sec_output_dict_meta in enumerate(sec_output_dicts_meta):
if not use_cryptic_pk: sec_output_dict_meta['primary_key'] = i
simplejson.dump(sec_output_dict_meta, foutput_meta)
foutput_meta.write('\n')
10 changes: 6 additions & 4 deletions src/main/resources/adhocAnalyzer.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
adhocAnalyzer {
numTextFeatures = 5000,
numTextFeatures = 16384,
addNGramFeatures = false,
nGramGranularity = 2,
measureName = "cosine",
inputBillsFile = "file:///scratch/network/alexeys/bills/lexs/bills_3b.json",
inputPairsFile = "/user/alexeys/valid_pairs",
outputMainFile = "/user/alexeys/test_main_output"
inputBillsFile = "file:///scratch/network/alexeys/bills/lexs/sectioned_bills_50000.json",
inputPairsFile = "/user/alexeys/valid_section_pairs_50000test",
outputMainFile = "/user/alexeys/sectioned_output_cosine_50000test"
}
9 changes: 5 additions & 4 deletions src/main/resources/makeCartesian.conf
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
makeCartesian {
nPartitions = 30,
docVersion = "Enacted",
inputFile = "file:///scratch/network/alexeys/bills/lexs/bills_metadata_3b.json",
outputFile = "/user/alexeys/valid_pairs",
use_strict = true,
docVersion = "Introduced",
onlyInOut = false,
inputFile = "file:///scratch/network/alexeys/bills/lexs/sectioned_bills_metadata_50000.json",
outputFile = "/user/alexeys/valid_section_pairs_50000test",
use_strict = false,
strict_state = 5,
strict_docid = "HB1001",
strict_year = 1991
Expand Down
30 changes: 20 additions & 10 deletions src/main/scala/AdhocAnalyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
Following parameters need to be filled in the resources/adhocAnalyzer.conf file:
numTextFeatures: Number of text features to keep in hashingTF
addNGramFeatures: Boolean flag to indicate whether to add n-gram features
nGramGranularity: granularity of a rolling n-gram
measureName: Similarity measure used
inputBillsFile: Bill input file, one JSON per line
inputPairsFile: CartesianPairs object input file
Expand Down Expand Up @@ -33,9 +35,6 @@ import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors

object AdhocAnalyzer {

/*
Experimental
*/
def converted(row: scala.collection.Seq[Any]) : Tuple2[String,SparseVector] = {
val ret = row.asInstanceOf[WrappedArray[Any]]
val first = ret(0).asInstanceOf[String]
Expand All @@ -59,6 +58,10 @@ object AdhocAnalyzer {
println("Elapsed time: " + (t1 - t0)/1000000000 + "s")
}

def appendFeature(a: WrappedArray[String], b: WrappedArray[String]) : WrappedArray[String] = {
a ++ b
}

def run(params: Config) {

val conf = new SparkConf().setAppName("AdhocAnalyzer")
Expand All @@ -85,24 +88,31 @@ object AdhocAnalyzer {

//remove stopwords
var remover = new StopWordsRemover().setInputCol("words").setOutputCol("filtered")
val filtered_df = remover.transform(tokenized_df).drop("words")
var prefeaturized_df = remover.transform(tokenized_df).drop("words")

if (params.getBoolean("adhocAnalyzer.addNGramFeatures")) {

//ngram = NGram(n=2, inputCol="filtered", outputCol="ngram")
//ngram_df = ngram.transform(tokenized_df)
val ngram = new NGram().setN(params.getInt("adhocAnalyzer.nGramGranularity")).setInputCol("filtered").setOutputCol("ngram")
val ngram_df = ngram.transform(prefeaturized_df)

def appendFeature_udf = udf(appendFeature _)
prefeaturized_df = ngram_df.withColumn("combined", appendFeature_udf(col("filtered"),col("ngram"))).drop("filtered").drop("ngram").drop("cleaned")
} else {
prefeaturized_df = prefeaturized_df.select(col("primary_key"),col("filtered").alias("combined"))
prefeaturized_df.printSchema()
}

//hashing
var hashingTF = new HashingTF().setInputCol("filtered").setOutputCol("rawFeatures").setNumFeatures(params.getInt("adhocAnalyzer.numTextFeatures"))
val featurized_df = hashingTF.transform(filtered_df).drop("filtered")
var hashingTF = new HashingTF().setInputCol("combined").setOutputCol("rawFeatures").setNumFeatures(params.getInt("adhocAnalyzer.numTextFeatures"))
val featurized_df = hashingTF.transform(prefeaturized_df)

var idf = new IDF().setInputCol("rawFeatures").setOutputCol("pre_features")
//val Array(train, cv) = featurized_df.randomSplit(Array(0.7, 0.3))
var idfModel = idf.fit(featurized_df)
val rescaled_df = idfModel.transform(featurized_df).drop("rawFeatures")
rescaled_df.show(5)

val hashed_bills = featurized_df.select("primary_key","rawFeatures").rdd.map(row => converted(row.toSeq))

//Experimental
//First, run the hashing step here
val cartesian_pairs = spark.objectFile[CartesianPair](params.getString("adhocAnalyzer.inputPairsFile")).map(pp => (pp.pk1,pp.pk2))

Expand Down
160 changes: 0 additions & 160 deletions src/main/scala/AdhocSectionAnalyzer.scala

This file was deleted.

25 changes: 17 additions & 8 deletions src/main/scala/MakeCartesian.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Application: MakeCartesian, produce all the pairs of primary keys of the documen
Following parameters need to be filled in the resources/makeCartesian.conf file:
docVersion: document version: consider document pairs having a specific version. E.g. Introduced, Enacted...
nPartitions: number of partitions in bills_meta RDD
onlyInOut: a switch between in-out of state and using both in-out and in-in state pairs
use_strict: boolean, yes or no to consider strict parameters
strict_state: specify state (a long integer from 1 to 50) for one-against-all user selection (strict)
strict_docid: specify document ID for one-against-all user selection (strict)
Expand Down Expand Up @@ -33,7 +34,7 @@ import scala.collection.mutable.WrappedArray

object MakeCartesian {

def pairup (document: MetaDocument, thewholething: org.apache.spark.broadcast.Broadcast[Array[MetaDocument]], strict_params: Tuple4[Boolean, Int, java.lang.String, Int]) : (MetaDocument, Array[CartesianPair]) = {
def pairup (document: MetaDocument, thewholething: org.apache.spark.broadcast.Broadcast[Array[MetaDocument]], strict_params: Tuple4[Boolean, Int, java.lang.String, Int], onlyInOut: Boolean) : (MetaDocument, Array[CartesianPair]) = {

val documents = thewholething.value

Expand Down Expand Up @@ -62,12 +63,20 @@ object MakeCartesian {
}
}
} else {
//simple condition
if (pk1 < pk2 && istate != jstate) {
var output: CartesianPair = CartesianPair(pk1,pk2)
output_arr += output
}
}
//simple condition
if (onlyInOut) {
if (pk1 < pk2 && istate != jstate) {
var output: CartesianPair = CartesianPair(pk1,pk2)
output_arr += output
}
} else {
//in-out and in-in
if (pk1 < pk2) {
var output: CartesianPair = CartesianPair(pk1,pk2)
output_arr += output
}
}
}
}
(document,output_arr.toArray)
}
Expand Down Expand Up @@ -106,7 +115,7 @@ object MakeCartesian {

//will be array of tuples, but the keys are unique
var cartesian_pairs = bills_meta.rdd.repartition(params.getInt("makeCartesian.nPartitions"))
.map(x => pairup(x,bills_meta_bcast, strict_params))
.map(x => pairup(x,bills_meta_bcast, strict_params, params.getBoolean("makeCartesian.onlyInOut")))
.filter({case (dd,ll) => (ll.length > 0)})
.map({case(k,v) => v}).flatMap(x => x) //.groupByKey()

Expand Down
Loading

0 comments on commit ba5a686

Please sign in to comment.