CIS 700-003: Big Data Analytics |
Spring 2017 |
Due February 21, 2017 by 10pm
Note: this is an archived version of a homework. Find a more recent version from www.cis.upenn.edu/~cis545.
For this assignment, we will focus on graph data. You saw an instance of this with Homework 1 -- the airline flight network is actually a graph -- but we only did limited kinds of computation over the graph. However, many real-world datasets are, or can be modeled by, graphs (or trees which are special cases of graphs). Examples include:
For this assignment, we will be doing a few common operations on graphs. In the next assignment, when we have the power of matrices, we will do some further computation over the same graph data. (It’s very common to encode graph connectivity through an adjacency matrix that we’ll discuss in lecture.)
Let’s start by downloading the data files for this assignment. These will include flight data from HW1 as well as social network data (specifically, data about who answered questions from whom on Stack Overflow).
From Bitbucket, clone the HW2 repository:
git clone https://bitbucket.org/pennbigdataanalytics/hw2.git
Now in Jupyter, go into hw2 and open Retrieve.ipynb. Go to Cell|Run All. This will take a while, but will download several data files and, as necessary, decompress them. They also give you a model for how you can retrieve text or gzipped files from the Web.
Apache Spark, which has become the de facto successor to Apache Hadoop, is a complex, cluster-based data processing system that was written in Scala. It leverages a wide variety of distributed tools and components used for big data processing. It interfaces “smoothly” to Python, but be forewarned that there are some rough edges. For those interested in why, there are a few reasons:
While Spark DataFrames try to emulate the same programming style as Pandas DataFrames, there are some differences in how you express things. Please refer to the Lecture Slides for our take on the differences. You may also find the following Web pages to be useful resources for understanding Spark vs Pandas DataFrames:
For this assignment, we are going to get familiar with Spark without worrying too much about sharding and distribution. We are going to run Spark on your Docker container. This isn’t really using it to its strengths -- and in fact you might find Spark to be unexpectedly slow -- but it will get you comfortable with programming in Spark without worrying about distributed nodes and clusters. Your code, if written properly, will “naturally scale” to clusters running on the Cloud. Later in the term we’ll connect your Jupyter instance to Spark running on Amazon -- to handle “truly big data.”
First, create a new Jupyter notebook called “Graphs”. Then create a Cell in the notebook and establish a connection to the Spark engine as follows.
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
spark = SparkSession.builder.appName('Graphs-HW2').getOrCreate()
Note: Spark has multiple interfaces -- SparkSession is the “most modern” one and we’ll be using it for this course. From SparkSession, you can load data into DataFrames as well as RDDs.
For this assignment, we’ll be looking at graph data from the Stack Overflow site. Many of you have probably ended up at Stack Overflow when Googling for answers about how to solve a programming task -- possibly even a programming task related to this very course! Stack Overflow has a variety of users who post questions, answers to questions, and comments on both the questions and the answers. Network science researchers at Stanford have extracted the network structure of these answers into a dataset for study.
A very brief review of graph theory. Recall that a graph G is composed of a set of vertices V (also called nodes) and edges E (sometimes called links). Each vertex has an identity (often represented in the real world as a string or numeric “node ID”). Each edge is a tuple where represents the source or origin of the edge, and represents the target or destination. In the simplest case, the edge tuple above is simply the pair but in many cases we may have additional fields such as a label or a distance. Recall also that graphs may be undirected or directed; in undirected graphs, all edges are symmetric whereas in directed graphs, they are not. For instance, airline flights are directed, whereas Facebook friend relationships are undirected.
Let’s read the social graph data of Stack Overflow, which forms a directed graph. Here, the set of nodes is also not specified; the assumption is that the only nodes that matter are linked to other nodes, and thus their IDs will appear in the set of edges. To load the file into a Spark DataFrame, you can use the following lines.
# Read lines from the text file
answers_sdf = spark.read.load('sx-stackoverflow-a2q.txt', format="text")
We’ll use the suffix _sdf to represent “Spark DataFrame,” much as we used “_df” to denote a Pandas DataFrame in Homework 1. Repeat the load process to create Spark DataFrames for the other files, sx-stackoverflow-c2a.txt, which should go into comments_answers_sdf; and finally, sx-stackoverflow-c2q.txt, which should go into comments_questions_sdf.
Data Check for Step 2.2. Let’s now create some Cells to get a quick look:
|
Currently, the network data from the three source files is not very detailed; it consists of a string-valued attribute called value with three space-delimited columns (representing a social network edge and timestamp). We want to separate out each of these three columns into a Spark DataFrame column, and additionally we want to add a fourth field describing which type of edge is being represented.
Let’s quickly enumerate some of the building blocks. (Look at the lecture slides for this part of the homework for more help.)
To split, you can use a query such as:
SELECT split(col, ' ')[0] as column1, split(col, ' ')[1] as column2
And to cast, you can use a query like:
SELECT CAST(MyVarcharCol AS INT) FROM Table
Combining these, we get a command like (although note that it is incomplete and you need to finish it):
my_sdf.createOrReplaceTempView(‘my_sdf_view’)
spark.sql(‘SELECT CAST(split(column, pattern)[0] AS double) AS my_col FROM my_sdf_view’)
So, for instance, to convert a column to a double, you can do:
my_sdf.select(F.split(my_sdf.column, pattern)[0].alias(“my_col”).cast(“double”))
Or
my_sdf.createOrReplaceTempView(‘my_sdf_view’)
spark.sql(‘SELECT CAST(split(column, pattern)[0] AS double) AS my_col FROM my_sdf_view’)
createOrReplaceTempView creates a temporary view (removed when the SparkSession ends) that allows us to run SQL queries against the contents of my_sdf; my_sdf isn’t visible to Spark’s SQL engine by itself.
Finally, you may need to add columns with literal values. You can do this by composing functions:
my_sdf.withColumn(‘new_col’, F.lit(‘value’))
Which creates a column with a new literal value. Or you can use SQL:
spark.sql(‘select *, “value” AS new_col FROM my_sdf_view’)
For this step, we want to regularize and combine our graph data from the three Spark DataFrames.
Data Check for Step 2.3. Let’s now create some Cells:
|
The study of networks has proposed a wide variety of measures for measuring importance of nodes. A popular metric that is easy to compute is the degree centrality. The degree centrality of a node is simply the number of connections to the node. In a directed graph such as ours, you will want to compute both the indegree centrality (number of nodes with edges coming to this node) and outdegree centrality (number of nodes with edges coming from this node).
Using the Spark DataFrame groupBy(), orderBy(), and count() functions, or their equivalents in SparkSQL using the spark.sql() function, compute a DataFrame with the five vertices with highest indegree. Similarly, you can get the five vertices with highest outdegree. Think about what these measures could tell you. (You can consult the lecture slides for more details.)
Data Check for Step 3. Let’s now create some Cells:
|
Degree centrality is the simplest of the potential measures of graph node importance. We’ll consider other measures in the Advanced part of this assignment, as well as in future assignments.
For our next tasks, we will be “walking” the graph and making connections.
A search algorithm typically starts at a node or set of nodes, and “explores” or “walks” for some number of steps to find a match or a set of matches.
Let’s implement a distributed version of a popular algorithm, breadth-first-search (BFS). This algorithm is given a graph G, a set of origin nodes N, and a depth d. In each iteration or round up to depth d, it explores the set of all new nodes directly connected to the nodes it already has seen, before going on to the nodes another “hop” away. If we do this correctly, we will explore the graph in a way that (1) avoids getting caught in cycles or loops, and (2) visits each node in the fewest number of “hops” from the origin. BFS is commonly used in tasks such as friend recommendation in social networks.
How does distributed BFS in Spark work? Let’s start with a brief sketch of standard BFS. During exploration “rounds”, we can divide the graph into three categories:
We can illustrate these with a figure and an example.
Let’s look at the figure. The green node A represents the origin.
Assume we create data structures (we can make them DataFrames) for the visited and frontier nodes. Consider (1) how to initialize the different sets at the start of computation [note: unexplored nodes are already in the graph], and (2) how to use the graph edges and the existing data structures to update state for the next iteration “round”.
You might possibly have seen how to create a breadth-first-search algorithm in a single-CPU programming language, using a queue to capture the frontier nodes. With Spark we don’t need a queue -- we just need the three sets above.
Create a function spark_bfs(G, origins, max_depth) that takes a Spark DataFrame with a graph G (following the schema for comments_questions_sdf described above), a Python list of maps origins of the form [{‘node’: nid1}, {‘node’: nid2}, …], and a nonnegative integer “exploration depth” max_depth (to only run BFS on a tractable portion of the graph). The function should return a DataFrame containing pairs of the form (n, x) where n is the node or vertex ID, and x is the depth at which n was first encountered. Note that the origin nodes should also be returned in this Spark DataFrame (with depth 0)!
You can create a new DataFrame with an integer node column from the above list of maps origins, as follows. This will give you a DataFrame of the nodes to start the BFS at
schema = StructType([
StructField("node", IntegerType(), True)
])
my_sdf = spark.createDataFrame(my_list_of_maps, schema)
A few hints that should keep Spark from getting too bogged down:
An important note: PySpark’s subtract function may seem useful in this problem. However, one thing to know about it is it treats the inputs as sets; that is, if we runthe subtract function it will automatically remove duplicate rows from the input DataFrames. For example, if we call sdf1.subtract(sdf2), any duplicate rows in sdf1 and sdf2 will be dropped when it performs the computation. This means that if you are relying on one of your DataFrames to store nodes seen multiple times at a depth level, there is a good chance these duplicate rows are being dropped if you are using the subtract function. One way around this is to use what Spark calls a Left Anti-Join. This is syntactic sugar for the following SQL query:
SELECT *
FROM table1 AS t1
LEFT JOIN table2 AS t2
ON t1.col = t2.col
WHERE t2.col IS NULL
To run a Left Anti-Join in Spark, you can use a join with the ‘how’ parameter set to ‘leftanti’, e.g. ...sdf1.join(sdf2, sdf1.col==sdf2.col, ‘leftanti’). When running this, you may or may not get a Spark error that includes the message:
“AnalysisException: 'Both sides of this join are outside the broadcasting threshold and computing it could be prohibitively expensive. To explicitly enable it, please set spark.sql.crossJoin.enabled = true;'”.
If you do, try running the join on its own line instead of chaining it with other commands: e.g. instead of
my_awesome_sdf = my_awesome_sdf.computation1().computation2().join(...)
Try running:
my_awesome_sdf = my_awesome_sdf.computation1().computation2()
my_awesome_sdf = my_awesome_sdf.join(...)
Data Check for Step 4.1. Let’s now create some Cells:
|
Note that in a data-parallel setting like this, the BFS algorithm you’ve implemented above actually is a form of (partial) “transitive closure.”
Now create a function friend_rec that takes in two arguments: filtered_bfs_sdf and graph_sdf (note: not necessarily the graph_sdf you created earlier). filtered_bfs_sdf is a subset of the rows of bfs_sdf. graph_sdf, in this case, will be comments_questions_sdf. friend_rec should return the set of recommendations for nodes that are not adjacent to one another. For each pair (n1, n2), where both n1 and n2 are non-adjacent nodes in filtered_bfs_sdf, the output DataFrame should have an edge (in both directions) between n1 and n2 to recommend the two nodes become friends. This is the notion of triadic closure (completion of triangles) that is commonly used in social networks like Facebook.
Hint: Since the graph is directed, in order to ensure that the nodes aren’t already connected to one another, you have to check that there is no edge in either direction in the graph.
Data Check for Step 4.2. Let’s now create some Cells:
|
Now, save your Jupyter Notebook (again, as Graphs.ipynb) for submission. If you like, you may go on to the Advanced steps, or you may submit the basic assignment as per Step 6.0.
Warning: This part of the Homework may still receive a few tweaks and refinements over the next few days.
Many of you may be familiar with the PageRank computation, which is used to measure the importance of a Web page. (Contrary to popular belief, PageRank is named after Larry Page, not Web pages…) PageRank is actually a tweaked version of a centrality measure called eigenvector centrality. One way to implement PageRank is as an iterative computation. We take each graph node x and in iteration 0 assign it a corresponding PageRank px:
where N is the total number of nodes.
Now in each iteration i we recompute:
Where B(x) is the set of nodes linking to node x, and Nj is the outdegree of each such node j. Typically, repeating the PageRank computation for a number of iterations (15 or so) results in convergence within an acceptable tolerance. For this assignment we’ll assume β = 0.15 and 𝛼 = 0.85 (we’ll discuss these in more details in the future).
Example. In the figure to the right, nodes j1 and j2 represent the back-link set B(x) for node x. Nj1 is 3 and Nj2 is 2. Thus in each iteration i, we recompute the PageRank score for x by adding half of the PageRank score for j2 and a third of the PageRank score of j3 (both from the previous iteration i-1).
For your solution to this step:
Hint. Build some “helper” DataFrames. We suggest at least 2 DataFrames, where the first is used the build the second, and the second is used in your solution:
Initialize your PageRank values for each node in the “base case”. Then, in each iteration, use the helper DataFrames to compute PageRank scores for each node in the next iteration.
You will likely find it easier to express some of the computations in SparkSQL. If you want to use spark.select, you may find it useful to use the Spark F.udf function to create functions that can be called over each row in the DataFrame. You can create a function that returns a double as follows:
my_fn = F.udf(lambda x: f(x), DoubleType())
Then you can call it like:
my_sdf.select(my_fn(my_arg)).alias(‘col_name’)
Data Check for Step 5. Let’s now create some Cells:
|
Save your Jupyter notebook as Advanced.ipynb.
Please sanity-check that your Jupyter notebooks contain both code and corresponding data. Add the notebook files to hw2.zip using the zip command at the Terminal, much as you did for HW0 and HW1. The notebooks should be:
Next, go to the submission site, and if necessary click on the Google icon and log in using your Google@SEAS or GMail account. At this point the system should know you are in the appropriate course. Select CIS 700-003 Homework 2 and upload hw2.zip from your Jupyter folder, typically found under /Users/{myid}.
If you check on the submission site after a few minutes, you should see whether your submission passed validation. You may resubmit as necessary.