In order to efficiently process large files, all the datasets for these labs are already stored in a distributed filesystem ran at the same machines we will use for computation (15 nodes). The
From the command line, you can browse and download the contents of that distributed filesystem using the command hadoop fs. The following webpage has some examples on how to check the contents of the distributed filesystem.
Check home folder, check data folder.
hadoop fs
For this lab we will interact with Spark using exclusively the interactive Spark Shell, and writing commands in Scala. This shell works as a CLI (Command Line Interpreter) of Scala instructions.
You can start the spark shell by invoking from the ITL the command spark-shell. After several lines of debugging information, you will see a message prompt such as
scala>
The shell creates a Scala Context object, sc, that can be invoked in order to create the initial RDDs for your computation.
There is an equivalent version that uses the Python, known as pyspark. For details on the python API for Spark, you can check the official documentation of Spark.
We will create our first RDD by reading the dataset from our lab 2 in Scala (the project Gutenberg files). In order to do so, we need to invoke the textFile method from the sc, and save the resulting RDD in a Scala variable (val):
val reads= sc.textFile("hdfs://moonshot-ha-nameservice/camp/twitch.aug.txt")
As you can see, in order to load HDFS files we need to specify the url of the HDFS namenode (in our case moonshot-ha-nameservice resolves to it within QMUL network).
Once the command finishes you will see a reference to the RDD. However, the RDD object does not exist yet. Spark RDDs are only materialised whenever we request a result from them. In order to do so, we will use one action, that returns some information from the RDD to our driver program. This involves network transfer, and loads the memory of the single driver machine. Let's invoke the count action on the RDD, which will return to the driver the number of elements in the RDD.
reads.count()
You will see several lines in the command line showing the operations triggered by your command, including the actual creation of the RDD from HDFS. You will finally see the result of your count action in the console. Additionally, by looking at the previous lines, you should be able to get some insight on what is happening in the cluster because of your command.
- How many entries appear in the Twitch 2015 dataset?
- Based on what you can find from the logs, how many splits does the 'reads' RDD have? Remember this structure will be distributed among multiple nodes of your cluster.val
You can check the number of partitions of an RDD by :
reads.partitions.size
In order to have a look at a fraction of the RDD we can use a different action. takeSample collects a random sample of elements from the requested RDD. The following line will provide 10 entries back to the driver program.
val sample = reads.takeSample(false, 10)
When running the takeSample command you will see that again a set of transformations have been triggered.
- Compare the output of this operation with the one of the count before. Based on what you see, do you think the lines RDD is shared between two operations, or is it created again each time?
Note that sample is a local variable inside the driver program. We can write any Scala code to manipulate it, and obtain any result from it. For example, we can use sample.foreach(println), to print in the screen each element as a line of text. This is another example of Scala's functional programming approach (executing the provided function for each element from the array). As this is a local call, the code will execute immediately, no job needs to be scheduled in the cluster.
The data comes initially in a TSV (Tab Separated Values) format, whereas we will be interested in a limited set of fields. A natural way is to map each line of the original dataset so that we only use the fields we are interested in. Scala offers us tuples of elements as a convenient way to store the number of values we are interested in. The following example selects from each measurement only the audience and game figures
For reference, the fields for each entry of the dataset represent the following information.:
timestamp(integer) number_of_current_viewers total_views_of_channel game_name channel_creation_time streamer_id user_create_time language partner?(boolean) delay number_of_followers
for example, here's an example of such a line :
1420291196 14835 6337669 "League of Legends" "2015-01-03T09:06:12Z" "sneakycastroo" "2011-09-02T23:16:11Z" "ru" true 0 152659
First, and in almost every data collection process, the data is not clean due to different reasons; failures in the collection process, bugs, etc.
Let's filter the data first, that means only getting the data that is compliant to the format above,
val freads = reads.filter(x=>x.split("\t").length==11)
We can also apply more complicated filters, for example, look for channels with old streamers subscribed before 2012
freads.filter(x => x.split("\t")(6).substring(1,5).toInt<2012)
Note we use toInt to cast the second argument to an integer. This provides a big reduction in memory, as well as enabling numeric specific actions.
Another transformation that's very important, and is puported to be a buliding block of the map reduce paradigm is...map !
The map takes one line or element at a time, and transforms it to another. Let's take the following example : let's determine the maximum number of viewer that every channel has reached during the study.
First, we map each line in order to have couples of the form (The channel name, number of viewers). Note that a channel will have several of these lines with different number of viewers depending on the recording time.
val audience = freads.map(x => (x.split("\t")(3), x.split("\t")(1).toInt))
As the second part of the list is an integer we can use the stats() action for Spark to compute some information about the split in channel popularity across the dataset. In order to select the second element of a tuple, we map the tuple to its ._2 part (._1 would select the first one, so it is not zero indexed).
audience.map(x=>x._2).stats()
Try taking a relatively large sample of audience, and notice that many of the channels have the name null or empty string, these lines are misleading as they represent corrupted data. Try filtering
val audienceFiltered = audience.filter(x => (x._1!="" && x._1!="null"))
The main value of Spark parallel computations (and its original Map/Reduce implementation) is to go beyond embarrasingly parallel operations such as map or filter, and be able to automatically parallelise many to many communications patterns. The groupBy operation of Spark provides the baseline method, it takes a list of pairs, and puts together all the values belonging to each key, so that they can be consequently reduced. A common pattern is to reduce all the values from each list into a single one, in order to compute some aggregate results.
We are going to use this pattern for computing some data related to each channel. We are going to obtain the peak audience.
val maxs = audienceFiltered.reduceByKey((x,y) => math.max(x,y))
Again, we can use an action such as maxs.stats() to force program execution and retrieve these values.
Advanced Excercise :
By using these functions, try to write the following programs, and answer the following questions.
- Get the viewing figures per game. What is the most popular game on average? You can reduce averages by using RDD.aggregateByKey(initialization),function_one, function_two). We will use an accumulator which is two values, (sum, number_of_elements). The sum is the sum of the viewers over all periods so far, and the number of elements is how many elements have with the same key have we summed so far. An aggregationByKey needs an initialization of the accumulator, and two function :
- The first is to aggregate between (aggregated_value, intermediate_aggr) + (value)
- The second to aggregate couples of (aggregated_value1, intermediate_aggr1) + (aggregated_value2, intermediate_aggr2). Try looking for the aggregator on the official Spark Documentation to get used to this reflexe.
- Finally map each entry to divide the number of viewers by the number of timestamps. You can use map as we did before, or use mapValues, The difference is that the new values you obtain from the aggragation are couples (ie. (key,(sum,n)). And we need to divide sum by n, so it would be easier to use mapValues which is easier.
- RDDs are not sorted, so in order to retrieve the highest value you will need to sort
the resulting RDD according to their value (remember, channel is the key, and the average is the value).
If you find it difficult to find out how to do it, The code for that is the following :
audience.aggregateByKey((0,0))(
(acc,value) => (acc._1 + value, acc._2 + 1), (acc1,acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
.mapValues(sumCount => 1.0 * sumCount._1 / sumCount._2)
.sortBy(_._2, false)
.take(10)
- What is the day with the highest number of registered views from the dataset? Now you will need to select the day (can be extracted from the date by doing...), and group the data by day rather than the specific game.
You might have noticed how Spark recreates the whole chain of transformations whenever we invoke a new action. In order to enable faster results, and provide more interactive interactions through the command shell, we can take advantage of Spark's caching functionality to make an RDD persistent in memory. We do that by invoking the persist transformation.
val inmem = maxs.persist()
Persisted RDDs are kept in memory after executing the first time. This means that multiple transformations and actions over them will be executed substantially faster, as there is no need to recreate them from HDFS, as well as previous transformation steps. We will take advantage of that to interact with it, checking what is the count for several words.
We will use the filter transformation (see Spark documentation) to select only one element from the results. As we want to get the count for a specific word, we need to select the item from the RDD whose key is the word we are looking for. We can select the first element of a Scala tuple with the expression tuple._1 as shown in the code below.
Additionally, we need to invoke an action after our transformation, as otherwise the single value will be still distributed in the cluster. By invoking collect on the filtered RDD we will get back the result we are looking for to our driver program. The following Spark instruction will therefore return the value we are looking for:
inmem.filter(pair=>pair._1.equals("\"LeagueOfLegends\"")).collect()Note that the quotes are part of the string so they need to be included and escaped.
- What is the peak audience for the game LeagueOfLegends in a single channel? Modify now this last query (you can push the up arrow in the keyboard to reload the last command for quickly editing), and check the popularity of other games streamed through a single channel (e.g. "Hearthstone"). You should see substantially improved performance for all these subsequent invocations. This is thanks to the persist() transformation we invoked early. You can
Finally, you can save any RDD back to HDFS by invoking the saveAsTextFile action (if the desired output format is TextOutputFormat). You have to specify the absolute path inside hdfs for that, for which you need to know what is your user account in HDFS.
inmem.saveAsTextFile("hdfs://moonshot-ha-nameservice/user/<<enterYourUserAccount>>/CISGraphL1")
In a real dataset it is common to want to combine multiple datasets in order to merge different information sources. This is commonly implemented by join functions in a database. Spark also provides a set of join transformations, whihch allow to combine the values from two different RDDs, by matching their keys. The syntax is rdd1.join(rdd2). The only requirement is that both datasets have as the first tuple element (aka key), the same elements.
We will load a second dataset from HDFS which contains the genres for each game that appears in the popularity traces.
The file of game genres is in the same folder, first load the file into an RDD. Then let's make sure to make a tuple (primary key, value) where primary key is the name of the game, and value is the genre.
val genres= sc.textFile("hdfs://moonshot-ha-nameservice/camp/game.genre.txt").
filter(x => x.split("\t").length >1).
map(x => (x.split("\t")(0),x.split("\t")(1)))
Next, we do the same thing with the channels dataset, we retrieve the game name as a primary key in an RDD, then we plug the rest of the line data into the value.
val channels = audienceFiltered.map(x => (x._1
.substring(1,x._1.length-1),x._2
))
We load the dataset as an RDD Tuple, with the first element being the game name, and the second being the genre name.
We can now use join to compute information aggregated by genre, rather than title, by joining both datasets as follows.
First, we need to transform the popularity dataset into a tuple where the first parameter is the game, followed by the rest.
val joinedRDD = channels.join(genres)
joinedRDD.take(10).foreach(println)
Spark incorporates a project with several statistics and machine learning methods implemented that simplifies large-scale data mining, called ML. You can learn more about its functionality in the official Spark documentation. Do keep in mind that the same performance limitaitons that appear when implementing base Spark workflows will appear when using these tools, but they might be hidden by the opacity of the library implementation.
You will need to import beforehand the package in the command line.