Search This Blog

Apache Spark - Word Count Example (using RDDs)

This is a simple PySpark script which counts how many times each word occurs in a given text file. Thе script is not really perfectly polished or production-ready. Why? Well, for example words like "don't" are treated as two words "don" and "t". For producing a more professional version of this code we might want to use some specialized Python NLP library like e.g. NLTK which should be able to extract the words from the text file in a better way. Still, this script is quite useful for illustration purposes.


import re

from pyspark import SparkConf, SparkContext



def get_all_words_in_lowercase(textLine):

    # >>> The input value is a line of text from the book

    # >>> The returned value is a list of all the words from that line in lowercase

    return re.compile(r'\W+').split(textLine.lower())



conf = SparkConf().setMaster("local").setAppName("WordCount")

sc = SparkContext(conf = conf)



input = sc.textFile("file:///D:/PlayGround/book.txt")

# Now in input we have all lines from the text file. Each element of the RDD is a line of text.



rdd1 = input.flatMap(get_all_words_in_lowercase);

# We map each line to a list of all words which occur in that line.

# OK, now in rdd1 we have all words from the text file.



rdd2 = rdd1.map(lambda word : (word, 1));

# We map each word to the couple (word, 1)

# This way we produce rdd2



rdd3 = rdd2.reduceByKey(lambda x,y : x+y);

# We now count how many times each word occurs in rdd2.

# This way we produce rdd3 which contains

# couples of the form (word, word_count)



rdd4 = rdd3.map(lambda couple : (couple[1], couple[0]))

# We now change the roles of key and value.

# This way from rdd3 we get rdd4 which contains

# couples of the form (word_count, word)



rdd5 = rdd4.sortByKey(ascending=True)


lst5 = rdd5.collect()


print("Number of unique words ---> " + str(len(lst5)))


for pair in lst5:

    print(str(pair[1]) + " ---> " + str(pair[0]) + " occurrences")




Apache Spark RDDs - RDD transformations and RDD actions

Definitions:

RDD - a resilient distributed dataset

DAG of operations - a graph containing the requested RDD operations; it is built by Spark under the hood; it kind of tells Spark what RDD operations the code wants to perform on the input RDD and in what order it wants to perform them.


Example (PySpark code):

>>> sc.parallelize([1,1,2,2,3,3,4,4,5,5,6,6,7,7,8,8]).filter(lambda x : x % 2 == 0).filter(lambda x : x < 6).map(lambda x : x*x).distinct().collect()

The result of this PySpark code (Python code for Spark) is the list [16, 4] 

The input RDD in this case is the result of this call 

sc.parallelize([1,1,2,2,3,3,4,4,5,5,6,6,7,7,8,8])

This call just turns a simple list of elements into an RDD.


Explanations:

An RDD is an immutable distributed collection of data elements which is partitioned across a set of nodes/hosts of the cluster that can be recovered if a partition is lost, thus providing fault tolerance.

For programming purposes it can be thought of as one large collection, it's just that its elements are not stored on a single node/host, instead they are partitioned and stored on multiple nodes/hosts.

The RDD API provides two fundamentally different types of operations that can be performed on an RDD. 


RDD Transformations

When called they construct new RDDs from existing ones, their results are these new RDDs. The new/resulting RDDs are not immediately evaluated, this is called lazy evaluation. So when an RDD transformation is called it just adds up an operation (a transformation) to a DAG of operations. This DAG of operations describeds what operations we want to perform on our input RDD. Here is a list of the most frequently used RDD transformations: map, flatMap, filter, distinct, sample, union, intersection, subtract, cartesian, zip


RDD Actions

When called they actually lead to some computations/evaluations performed on the DAG of operations that was previously built. They return the result or the results of these computations. They can return a single value or a collection of values. Here is a list of the most frequently used RDD actions: collect, count, countByValue, take, top, reduce


Nothing is really computed in our Spark driver program until an RDD action is called.

In the example above the operations filter, filter, map, distinct are RDD transformations and the final operation collect is an RDD action.