Search This Blog

Apache Spark - Explode Example (using DataFrames)

I came across this PySpark explode function which at first looked kind of counter-intuitive to me.

So I decided to write this blog post here to document it for myself. Here is the code of a small PySpark program which demonstrates how this explode function works. 


from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql import functions as funcs

spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

df = spark.createDataFrame(\
[Row(id=1, int_list_column=[1,2,3], map_column={"a": "b", "c" : "d"})\
,Row(id=2, int_list_column=[1,2,3,50,70], map_column={"a": "s", "x": "y"})\
,Row(id=3, int_list_column=[40,60,2,3], map_column={10: 100})\
,Row(id=4, int_list_column=[], map_column={50: 5, 10: 1})\
]\
)

df.show()

df.select(funcs.explode(df.int_list_column).alias("int_column")).show()

df.select(funcs.explode(df.map_column).alias("Key1", "Value1")).show()

The output of this small program is as follows.

+---+-----------------+------------------+
| id|  int_list_column|        map_column|
+---+-----------------+------------------+
|  1|        [1, 2, 3]|  {a -> b, c -> d}|
|  2|[1, 2, 3, 50, 70]|  {x -> y, a -> s}|
|  3|   [40, 60, 2, 3]|       {10 -> 100}|
|  4|               []|{50 -> 5, 10 -> 1}|
+---+-----------------+------------------+

+----------+
|int_column|
+----------+
|         1|
|         2|
|         3|
|         1|
|         2|
|         3|
|        50|
|        70|
|        40|
|        60|
|         2|
|         3|
+----------+

+----+------+
|Key1|Value1|
+----+------+
|   a|     b|
|   c|     d|
|   x|     y|
|   a|     s|
|  10|   100|
|  50|     5|
|  10|     1|
+----+------+ 

We see that what the explode functions does it to explode/flatten the contents of the list column or the map column and to return (in the resulting DataFrame) one row for each value (in the list case) or for each key-value pair (in the map case).

LRU Cache - An Interesting LeetCode Problem

I found this problem today on LeetCode. 

LRU Cache Problem

It's not too hard, but it's not too easy either. 

It seemed interesting so I decided to solve it. 

Below is my solution (in Java). 

For more details, see this page 

LRU Cache - Wikipedia


/**

* LeetCode

*

* Problem 146

*

* LRU Cache

*

* https://leetcode.com/problems/lru-cache/description/

*/


import java.util.HashMap;

import java.util.TreeMap;


public class LRUCache {


private TreeMap<Long, Integer> mpLastUsedToKey = new TreeMap<>();

private TreeMap<Integer, Long> mpKeyToLastUsed = new TreeMap<>();

private HashMap<Integer, Integer> mpKeyToValue = new HashMap<>();

private int capacity = 0;

public LRUCache(int capacity) {

this.capacity = capacity;

this.mpLastUsedToKey = new TreeMap<>();

this.mpKeyToLastUsed = new TreeMap<>();

this.mpKeyToValue = new HashMap<>();


}

public int get(int key) {

int v = -1;

if (mpKeyToValue.containsKey(key)) {

// the key is in the cache

Long ts = mpKeyToLastUsed.get(key);

long m = System.nanoTime();

mpLastUsedToKey.remove(ts);

mpLastUsedToKey.put(m, key);

mpKeyToLastUsed.put(key, m);

v = mpKeyToValue.get(key);

} else {

// the key is not in the cache

v = -1;

}

return v;

}

public void put(int key, int value) {

if (mpKeyToValue.containsKey(key)) {

Long ts = mpKeyToLastUsed.get(key);

mpLastUsedToKey.remove(ts);

}

long m = System.nanoTime();

mpLastUsedToKey.put(m, key);

mpKeyToLastUsed.put(key, m);

mpKeyToValue.put(key, value);

if (mpKeyToValue.size() > this.capacity) {

// evict the least recently used key (kl) from the cache

Long ts = mpLastUsedToKey.firstKey();

int kl = mpLastUsedToKey.get(ts);

mpKeyToValue.remove(kl);

mpLastUsedToKey.remove(ts);

mpKeyToLastUsed.remove(kl);

}

}

public String toString() {

return this.mpKeyToValue.toString();

}

public static void main(String[] args) throws Exception {

LRUCache c = new LRUCache(2);

c.put(1, 10);

Thread.sleep(50);


c.put(2, 20);

Thread.sleep(50);

c.get(1);

c.put(3, 30);

Thread.sleep(50);

System.out.println(c);

System.out.println("DONE!");

}

}





Probability Problem - Competition with 5 players

This problem is from the book of M.G.Bulmer "Principles of Statistics"

Problem 2.6, page 28.

Five players enter a competition in which each plays a game against each of the others. 

The two players in each game have an equal chance of winning it; 

a game must end in a win for one of the players who then scores a point. 

The competition is won by the player or players scoring most points. 

What is the probability that a particular player will

(a) win the competition outright;

(b) share in winning with other players?

I could not solve this by paper and pencil so I wrote a Python program which solves it.

Here's the program. 


import pprint

def main():

    mapping = {}
    mp = {}
    
    # There are 10 games in total.
    # So there are 2^10 = 1024 possible outcomes
    # from the competition as a whole.
    
    k = 0
    for i in range(5):
        for j in range(5):
            if (i<j):
                mapping[k] = (i,j)
                k = k+1
    
    # print(mapping)
    for num in range(1024):
        lst = []
        deg = 1
        for q in range(10):
            lst.append(1 if (num & deg) else 0)
            deg = deg << 1
        mp[num] = lst
    
    pprint.pp(mp)
    
    
    player_0_wins = 0
    player_0_wins_outright = 0
    
    for competition in mp:
        
        print()
        print()
        print()
        
        points = {}
    
        n = 0
        
        for game in mp[competition]:
            
            (p,q) = mapping[n]
            
            if not p in points:
                points[p] = 0
            if not q in points:
                points[q] = 0
                
            points[p] = points[p] + game
            points[q] = points[q] + (1-game)
            
            print("Player", p, ":", "Player", q, " = ", game, " : ", 1-game)
            
            n = n+1
        
            
        print("points = ", points)
        
        # Here we suppose that:
        c1 = 1 # player 0 wins
        c2 = 1 # player 0 wins outright
        
        # Now we check if these suppositions are true:
        for p1 in points:
            if (p1 != 0) and points[p1] >= points[0]:
                c2 = 0 # does not win outright; p1 has the same number of points or more points than player 0
                break
                
        for p1 in points:
            if (p1 != 0) and points[p1] > points[0]:
                c1 = 0 # does not win; p1 has more points than player 0
                break
        
        print("Player 0 wins: ", 1==c1)
        print("Player 0 wins outright: ", 1==c2)
                
        player_0_wins = player_0_wins + c1
        player_0_wins_outright = player_0_wins_outright + c2
        
    
    print()
    print()
    print()
    
    print("Player 0 wins outright probability: ", player_0_wins_outright, '/', len(mp))
        
    print("Player 0 shares the win probability: ", player_0_wins - player_0_wins_outright, '/', len(mp))
    
    print()
    print()
    print()
    
    
    
if __name__ == "__main__":
    main()