Search This Blog

Apache Spark - Word Count Example (using RDDs)

This is a simple PySpark script which counts how many times each word occurs in a given text file. Thе script is not really perfectly polished or production-ready. Why? Well, for example words like "don't" are treated as two words "don" and "t". For producing a more professional version of this code we might want to use some specialized Python NLP library like e.g. NLTK which should be able to extract the words from the text file in a better way. Still, this script is quite useful for illustration purposes.


import re

from pyspark import SparkConf, SparkContext



def get_all_words_in_lowercase(textLine):

    # >>> The input value is a line of text from the book

    # >>> The returned value is a list of all the words from that line in lowercase

    return re.compile(r'\W+').split(textLine.lower())



conf = SparkConf().setMaster("local").setAppName("WordCount")

sc = SparkContext(conf = conf)



input = sc.textFile("file:///D:/PlayGround/book.txt")

# Now in input we have all lines from the text file. Each element of the RDD is a line of text.



rdd1 = input.flatMap(get_all_words_in_lowercase);

# We map each line to a list of all words which occur in that line.

# OK, now in rdd1 we have all words from the text file.



rdd2 = rdd1.map(lambda word : (word, 1));

# We map each word to the couple (word, 1)

# This way we produce rdd2



rdd3 = rdd2.reduceByKey(lambda x,y : x+y);

# We now count how many times each word occurs in rdd2.

# This way we produce rdd3 which contains

# couples of the form (word, word_count)



rdd4 = rdd3.map(lambda couple : (couple[1], couple[0]))

# We now change the roles of key and value.

# This way from rdd3 we get rdd4 which contains

# couples of the form (word_count, word)



rdd5 = rdd4.sortByKey(ascending=True)


lst5 = rdd5.collect()


print("Number of unique words ---> " + str(len(lst5)))


for pair in lst5:

    print(str(pair[1]) + " ---> " + str(pair[0]) + " occurrences")




Apache Spark RDDs - RDD transformations and RDD actions

Definitions:

RDD - a resilient distributed dataset

DAG of operations - a graph containing the requested RDD operations; it is built by Spark under the hood; it kind of tells Spark what RDD operations the code wants to perform on the input RDD and in what order it wants to perform them.


Example (PySpark code):

>>> sc.parallelize([1,1,2,2,3,3,4,4,5,5,6,6,7,7,8,8]).filter(lambda x : x % 2 == 0).filter(lambda x : x < 6).map(lambda x : x*x).distinct().collect()

The result of this PySpark code (Python code for Spark) is the list [16, 4] 

The input RDD in this case is the result of this call 

sc.parallelize([1,1,2,2,3,3,4,4,5,5,6,6,7,7,8,8])

This call just turns a simple list of elements into an RDD.


Explanations:

An RDD is an immutable distributed collection of data elements which is partitioned across a set of nodes/hosts of the cluster that can be recovered if a partition is lost, thus providing fault tolerance.

For programming purposes it can be thought of as one large collection, it's just that its elements are not stored on a single node/host, instead they are partitioned and stored on multiple nodes/hosts.

The RDD API provides two fundamentally different types of operations that can be performed on an RDD. 


RDD Transformations

When called they construct new RDDs from existing ones, their results are these new RDDs. The new/resulting RDDs are not immediately evaluated, this is called lazy evaluation. So when an RDD transformation is called it just adds up an operation (a transformation) to a DAG of operations. This DAG of operations describeds what operations we want to perform on our input RDD. Here is a list of the most frequently used RDD transformations: map, flatMap, filter, distinct, sample, union, intersection, subtract, cartesian, zip


RDD Actions

When called they actually lead to some computations/evaluations performed on the DAG of operations that was previously built. They return the result or the results of these computations. They can return a single value or a collection of values. Here is a list of the most frequently used RDD actions: collect, count, countByValue, take, top, reduce


Nothing is really computed in our Spark driver program until an RDD action is called.

In the example above the operations filter, filter, map, distinct are RDD transformations and the final operation collect is an RDD action. 


Frénet-Serret Formulas

Frénet-Serret Formulas

I found these formulas in the Vector Calculus book by Baxandall and Liebeck. They are in chapter 2.8, page 73. These formulas are pretty important in the area of differential geometry. See also this Wikipedia page on the same topic. In the text below the following standard notation is used: $T$ is the unit tangent vector, $N$ is the principal normal vector, $B$ is the binormal vector (these three are in fact vector-valued functions); $\kappa$ is the curvature, $\tau$ is the torsion (these two are in fact scalar-valued functions). The associated collection ($T$, $N$, $B$, $\kappa$, $\tau$) is called the Frénet–Serret apparatus. Intuitively, curvature measures the failure of a curve to be a straight line, while torsion measures the failure of a curve to be planar.


Case #1. Suppose that the function $f(s): E \subseteq \mathbb{R} \rightarrow \mathbb{R^3}$ is a three times differentiable path-length parametrization of a curve in $\mathbb{R^3}$ and also suppose that the curvature $\kappa_f(s) \neq 0$ for all $s \in E$. Then 

$T'_f(s) = \kappa_f(s) N_f(s)$ 

$N'_f(s) = -\kappa_f(s) T_f(s) + \tau_f(s) B_f(s)$ 

$B'_f(s) = -\tau_f(s) N_f(s)$ 

In short one can also write 

$T' = \kappa N$ 

$N' = -\kappa T + \tau B$ 

$B' = -\tau N$ 

The three vectors $T, N, B$ are unit vectors and they form (in this order) a right-hand orthonormal basis of $\mathbb{R^3}$. This means that $B = T \times N$ for all $s \in E$. 


Case #2. Suppose that the function $f(s): E \subseteq \mathbb{R} \rightarrow \mathbb{R^3}$ is a three times differentiable smooth parametrization of a curve in $\mathbb{R^3}$ and also suppose that the curvature $\kappa_f(s) \neq 0$ for all $s \in E$. Then 

$T'_f(s) = v(s) \kappa_f(s) N_f(s)$ 

$N'_f(s) = v(s) [-\kappa_f(s) T_f(s) + \tau_f(s) B_f(s)]$ 

$B'_f(s) = -v(s) \tau_f(s) N_f(s)$ 

where $v(s) = || f'(s) || $ for all $s \in E$

In short one can also write 

$T' = v \kappa N$ 

$N' = v (-\kappa T + \tau B)$ 

$B' = -v \tau N$  

In this case again the three vectors $T, N, B$ are unit vectors and they form (in this order) a right-hand orthonormal basis of $\mathbb{R^3}$. This means that $B = T \times N$ for all $s \in E$. 


Note: In the first case $s$ is a path-length parameter i.e $|| f'(s) || = 1$ for all $s \in E$. The second case is more general since in that case we do not require $s$ to be a path-length parameter, i.e. in that case we may have $|| f'(s) || \ne 1$ for some $s \in E$ or for all $s \in E$. 


Optimization with one or more equality constraints

This post is based on some theory that I was reading, and on some problems/exercises that I was solving which were related to optimization (maximization/minimization) problems.


Procedure for solving a two-variable maximization problem with an equality constraint

Let $f(x,y)$ and $g(x,y)$ be real-valued functions of two variables defined on a set $S \subseteq \mathbb{R}^2$ that are continuously differentiable on the interior of $S$, and let $c \in \mathbb{R}$ be a number. If the problem $\max_{(x,y) \in S} f(x, y)$ subject to $g(x, y) = c$ has a solution, it may be found as follows. 

1) Find all the values of $(x, y, \lambda)$ such that 

    (a) $(x, y)$ is an interior point of $S$ and 

    (b) $(x, y, \lambda)$ satisfies the first-order conditions and the constraint, 

      i.e. the points $(x, y, \lambda)$ for which $f'_{x}(x, y) - \lambda \cdot g'_{x}(x, y) = 0$, $f'_{y}(x, y) - \lambda \cdot g'_{y}(x, y) = 0$, and $g(x, y) = c$, 

      i.e. the points $(x, y, \lambda)$ for which $\nabla{f}(x,y) = \lambda \cdot \nabla{g}(x,y)$, and $g(x, y) = c$. 

2) Find all the points $(x, y)$ that satisfy $g'_x(x, y) = 0, g'_y(x, y) = 0$, and $g(x, y) = c$. Note that usually for most problems, there are no such values of $(x, y)$. In particular, if $g$ is linear there are no such values of $(x, y)$.   

3) If the set $S$ has any boundary points, find all the points that solve the problem $\max_{(x,y) \in S} f(x, y)$ subject to the two conditions $g(x, y) = c$ and $(x, y)$ is a boundary point of $S$. 

The points $(x, y)$ that you have found in steps 1), 2), 3) for which $f(x, y)$ is largest are the solutions of the optimization problem. 

A variant of this procedure in which the last step involves choosing the points $(x, y)$ at which $f(x, y)$ is smallest may be used to solve the analogous minimization problem. 


Procedure for solving a three-variable maximization problem with an equality constraint

Let $f(x,y,z)$ and $g(x,y,z)$ be real-valued functions of three variables defined on a set $S \subseteq \mathbb{R}^3$ that are continuously differentiable on the interior of $S$, and let $c \in \mathbb{R}$ be a number. If the problem $\max_{(x,y,z) \in S} f(x, y, z)$ subject to $g(x, y, z) = c$ has a solution, it may be found as follows. 

1) Find all the values of $(x, y, z, \lambda)$ such that 

    (a) $(x, y, z)$ is an interior point of $S$ and 

    (b) $(x, y, z, \lambda)$ satisfies the first-order conditions and the constraint, 

      i.e. the points $(x, y, z, \lambda)$ for which $f'_{x}(x, y, z) - \lambda \cdot g'_{x}(x, y, z) = 0$, $f'_{y}(x, y, z) - \lambda \cdot g'_{y}(x, y, z) = 0$, $f'_{z}(x, y, z) - \lambda \cdot g'_{z}(x, y, z) = 0$, and $g(x, y, z) = c$, 

      i.e. the points $(x, y, z, \lambda)$ for which $\nabla{f}(x,y, z) = \lambda \cdot \nabla{g}(x,y,z)$, and $g(x, y, z) = c$. 

2) Find all the points $(x, y, z)$ that satisfy $g'_x(x, y, z) = 0, g'_y(x, y, z) = 0, g'_z(x, y, z) = 0$, and $g(x, y, z) = c$. Note that usually for most problems, there are no such values of $(x, y, z)$. In particular, if $g$ is linear there are no such values of $(x, y, z)$.  

3) If the set $S$ has any boundary points, find all the points that solve the problem $\max_{(x,y,z) \in S} f(x, y, z)$ subject to the two conditions $g(x, y, z) = c$ and $(x, y, z)$ is a boundary point of $S$. 

The points $(x, y, z)$ that you have found in steps 1), 2), 3) for which $f(x, y, z)$ is largest are the solutions of the optimization problem. 

A variant of this procedure in which the last step involves choosing the points $(x, y, z)$ at which $f(x, y, z)$ is smallest may be used to solve the analogous minimization problem. 


Procedure for solving a three-variable maximization problem with two equality constraints

Let $f(x,y,z)$, $g(x,y,z)$, $h(x,y,z)$ be real-valued functions of three variables defined on a set $S \subseteq \mathbb{R}^3$ that are continuously differentiable on the interior of $S$, and let $c_1, c_2 \in \mathbb{R}$ be two numbers. If the problem $\max_{(x,y,z) \in S} f(x, y, z)$ subject to $g(x, y, z) = c_1$ and $h(x, y, z) = c_2$ has a solution, it may be found as follows. 

1) Find all the values of $(x, y, z, \lambda, \mu)$ such that 

    (a) $(x, y, z)$ is an interior point of $S$, 

    (b) $\nabla{g}(x,y,z)$ and $\nabla{h}(x,y,z)$ are linearly independent, and 

    (c) $(x, y, z, \lambda, \mu)$ satisfies the first-order conditions and the constraints, 

      i.e. the points $(x, y, z, \lambda, \mu)$ for which $\nabla{f}(x,y, z) = \lambda \cdot \nabla{g}(x,y,z) + \mu \cdot \nabla{h}(x,y,z)$, and $g(x, y, z) = c_1$, and $h(x, y, z) = c_2$. 

2) Find all the points $(x, y, z)$ for which $\nabla{g}(x,y,z)$ and $\nabla{h}(x,y,z)$ are linearly dependent, and $g(x, y, z) = c_1$ and $h(x, y, z) = c_2$ 

       i.e. the points $(x, y, z, \lambda, \mu)$ for which $\lambda \cdot \nabla{g}(x,y,z) + \mu \cdot \nabla{h}(x,y,z) = (0,0,0)$, $(\lambda, \mu) \ne (0,0)$, and $g(x, y, z) = c_1$, and $h(x, y, z) = c_2$. 

3) If the set $S$ has any boundary points, find all the points that solve the problem $\max_{(x,y,z) \in S} f(x, y, z)$ subject to the conditions $g(x, y, z) = c_1$, $h(x, y, z) = c_2$, and $(x, y, z)$ is a boundary point of $S$. 

The points $(x, y, z)$ that you have found in steps 1), 2), 3) for which $f(x, y, z)$ is largest are the solutions of the optimization problem. 

A variant of this procedure in which the last step involves choosing the points $(x, y, z)$ at which $f(x, y, z)$ is smallest may be used to solve the analogous minimization problem. 



See also: 

This math page by Martin Osborne 

This math page again by Martin Osborne 

This Math SE post by smcc


Plotting the Chebyshev polynomials of the second kind in Python

Test001