pyspark

Or Spark and Python !!

PLEASE NOTE: You can Only run PYTHON 2.7 Unless you have CDH4 it seems

So I have flipped to my venv for Python

Starting the Shell

As I have my Spark Env all setup in a private Directory I use

source ~/Java.env
pyspark

IPYTHON

To Start pyshell and use the friendly ipython use the following options

export IPYTHON=1
pyshell

IPYTHON-Notebook

TO Start the iPython shell - a little similar to Zepplin - But easier to install :)

export IPYTHON=1
export IPYTHON_OPTS="notebook"
pyspark

Spark RDDs

We want to be abe to get Data sets - or RDD's - Reliable Distributed Datasets....

These are some of the common types

Seeing "Into" a RDD

There is a very useful method toDebugString() It can be used to explain how the RDD is being constructed.

Program data

If we were able to generate some data (say using Numpy or Pandas) - and we wanted to process it in spark - this would be a way to do this.

So whilst not an RDD - this technique may also assist in some areas.

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

FILE

A simple and humble file can be a RDD. It in not truely an RDD unless it is stored inside HDFS - but for the moment we will ignore this oversight.

lines=sc.textFile("README.md")
lines.take(10)

If we see output like

u'Basic',
u'#README'

Why the 'u' ? - Python 2.7, It knows that the data is Unicode - Python3 you can happily forget about this.

Whilst we will do more RDD with Files later - we need to try and access other RDD Data Types

Hive

Get the Data from Hive and then process the rdd. This however needs Spark build with Hive !! and Another export Var export SPARK_HIVE=true .

from pyspark import HiveContext
rdd =HiveContext(sc).sql('select * from Table')

HBase

Saving Data

We can save data as

  • Pickle
  • Text

Or being more Hadoop orientated

  • As a Sequence File

Sequence FIle

SequenceFile is a flat file consisting of binary key/value pairs. It is extensively used in MapReduce as input/output formats. It is also worth noting that, internally, the temporary outputs of maps are stored using SequenceFile.

The SequenceFile provides a Writer, Reader and Sorter classes for writing, reading and sorting respectively.

There are 3 different SequenceFile formats:

  • Uncompressed key/value records.
  • Record compressed key/value records - only 'values' are compressed here.
  • Block compressed key/value records - both keys and values are collected in 'blocks' separately and compressed. The size of the 'block' is configurable.

The recommended way is to use the SequenceFile.createWriter methods to construct the 'preferred' writer implementation.

rdd = sc.parallelize(range(1, 5)).map(lambda x: (x, "z" * x ))
rdd.saveAsSequenceFile("./myOutput.txt")
#
#Now read back
#
>>> sorted(sc.sequenceFile("./myOutput.txt").collect())
[(1, u'z'), (2, u'zz'), (3, u'zzz'), (4, u'zzzz')]

Transactions

A transaction is an operation and modifies (transforms) a RDD - they are the core of getting to the data you want.

A word of caution here - in order to Maximize performance you need to take care when writing functions - as Spark is implimented in Scala - you should try and use Functional Programming

Transformation Description
map(func) Return a new distributed dataset formed by passing each element of the source through a function func.
filter(func) Return a new dataset formed by selecting those elements of the source on which func returns true.
flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator[T] => Iterator[U] when running on an RDD of type T.
mapPartitionsWithSplit(func) Similar to mapPartitions, but also provides func with an integer value representing the index of the split, so func must be of type (Int, Iterator[T]) => Iterator[U] when running on an RDD of type T.
sample(withReplacement, fraction, seed) Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
union(otherDataset) Return a new dataset that contains the union of the elements in the source dataset and the argument.
distinct([numTasks])) Return a new dataset that contains the distinct elements of the source dataset.
groupByKey([numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, Seq[V]) pairs. Note: By default, this uses only 8 parallel tasks to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.
reduceByKey(func, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numTasks]) When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
join(otherDataset, [numTasks]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.
cogroup(otherDataset, [numTasks]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Seq[V], Seq[W]) tuples. This operation is also called groupWith.
cartesian(otherDataset) When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
KeyBy(f) Sets the Key of a Record

MAP

A Map transformation is one that does a similar operation to a Python Map call

I am deliberatly going to do this in not a good way to start with. I am going to ignore my Functional Warning.

def MyUpcase(s):
     d=s
     d=d.upper()
     return d

code=sc.textfile('README.md')
#Now transform using a Map
CODE=code.map(MyUpcase)

Now whilst that works - this mechanism will not be as fast as possible. We should use Functional Style

code=sc.textfile('README.md')
#Now transform using a Map
CODE=code.map(lambda x: x.upper())

You should strive to write your transformations like this.

There is an argument for

If you want performance write it in Scala

But Python gives you access to Numpy and Pandas....

MapValues

Here using mapValue(list) - you return a LIST of the values. You can also fo mapValue(Set) - a Set of Values or mapValue(Function)

import pyspark
import random

def rnd():
    return random.randrange(0,100)
#20 Keys with data
rdd = sc.parallelize([(rnd()%20,rnd()) for a in range(100)])

**GroupByKey** Is a **BAD** function as it need lots of sorting. **aggregatebykey** is MUCH MUCH better.

#Group the data
#I Want the Key and the list of items
sorted(rdd.groupByKey().mapValues(list).collect())

To Get number of items

sorted(rdd.groupByKey().mapValues(len).collect())

Reading CSV

Map should be used for converting an input line into individual fields..

#Python Version of Lab 1
import pyspark
#Read Trips
#Remove Header
input1=sc.textFile("data/trips/*")
header1=input1.first()
trips=input1.filter(lambda line: line != header1).map(lambda x: x.split(','))

Filter

Defn: Filter(func)

*Return a new dataset formed by selecting those elements of the source on which func returns true.

def MyFilterFunction(s,ss):
    if s.find(ss) != -1:
        return True
    else:
        return False

code=sc.textfile('README.md')
#Now transform using a Map
CODE=code.map(lambda x: x.upper())

#Look for string PIP
MATCHES=CODE.filter(lambda x: MyFilterFunction(x,"PIP"))
MATCHES.collect()

Here are some more Filter examples

#Only want lines with ERROR in them
logData = sc.textFile(logFile).cache()
errors = logData.filter(lambda line: "ERROR" in line)

Using a Function this would be

def is_error(line):
    return "ERROR" in line
errors = logData.filter(is_error)


logData = sc.textFile(logFile).cache()
errors = logData.filter(is_error)

Functions can access objects in enclosing scopes, although modifications to those objects within RDD methods will not be propagated back:

Multiple matches

error_keywords = ["Exception", "Error"]
def is_error(line):
    return any(keyword in line for keyword in error_keywords)
errors = logData.filter(is_error)

Filter on Number Data looks like

[(0, 'Junk'),
 (1, 'Junk'),
 (2, 'Junk'),
 (3, 'Junk'),
 (4, 'Junk'),
 (5, 'Junk'),
 (6, 'Junk'),
 (7, 'Junk'),
 (8, 'Junk'),
 (9, 'Junk')]

To Only take values < 5....

sc.parallelize(zip(range(10),["Junk"]*10)).filter(lambda x: x[0]< 5).collect()

FlatMap

Defn: flatMap(func)

Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).

code=sc.textfile('README.md')
#Now transform using a Map
codelist=code.flatMap(lambda x: x.split())

mapPartitions

Defn: mapPartitions(func)

Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator[T] => Iterator[U] when running on an RDD of type T.

This function seems to work best when using yield - which is used as a Python Iterator.

data = [ [1, 2, 3], [3,2,4], [5,2,7]]

def MaxList(partition):
    for e in partition:
        yield max(partition)

distData = sc.parallelize(data)
distData.mapPartitions(MaxList).collect()

You should see

3,4,7

mapPartitionsWithSplit

Defn: mapPartitionsWithSplit(func)

Similar to mapPartitions, but also provides func with an integer value representing the index of the split, so func must be of type (Int, Iterator[T]) => Iterator[U] when running on an RDD of type T.

This functions appears to be deprecciated - replaced with mapPartitionWithIndex

Sample

Defn: sample(withReplacement, fraction, seed)

Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.

rdd_data=sc.parallelize(range(1000))
rdd_data.sample(False,0.1,100).collect()

Union

Defn: union(otherDataset)

Return a new dataset that contains the union of the elements in the source dataset and the argument.

ds1 = [1, 2, 3]
ds2 = [9,8,7]
rrd1 =sc.parallelize(ds1)
rrd2 =sc.parallelize(ds2)


rrd3=rrd1.union(rrd2)
rrd3.collect()

Distinct

Defn:

distinct([numTasks])) Return a new dataset that contains the distinct elements of the source dataset.

ds1 = [1, 2, 3]
ds2 = [1,2,3,9,8,7]
rrd1 =sc.parallelize(ds1)
rrd2 =sc.parallelize(ds2)


rrd3=rrd1.union(rrd2)
rrd3.collect()
rrd3.distinct().collect()
[1, 2, 3, 1, 2, 3, 9, 8, 7]

GroupByKey

Defn: When called on a dataset of (K, V) pairs, returns a dataset of (K, Seq[V]) pairs. Note: By default, this uses only 8 parallel tasks to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.

ds1 = [(1,1),(1,2),(1,3),(2,22)]
ds2 = [(1,4),(2,22),(2,23),(3,33)]
rrd1 =sc.parallelize(ds1)
rrd2 =sc.parallelize(ds2)


rrd3=rrd1.union(rrd2)
rrd3.groupByKey().collect()

Shows just the Keys from the Tuples

[(1, <pyspark.resultiterable.ResultIterable at 0x10973ed10>),
 (2, <pyspark.resultiterable.ResultIterable at 0x10973e690>),
 (3, <pyspark.resultiterable.ResultIterable at 0x10973e510>)]

NASTY DO NOT USE if you can help it

aggregatebykey

This reduces the key in the partition. Resulting in less SHUFFLE

def seq(x, y):
    return x+y

def comb(x,y):
    return x+y

#0 means we wants ints as 0 is type int
#seq and comb.... not fully understood
sorted(rdd.aggregateByKey(0,seq,comb).collect())

ReduceByKey

Defn: reduceByKey(func, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

from operator import add

ds1 = [(1,1),(1,2),(1,3),(2,22)]
ds2 = [(1,4),(2,22),(2,23),(3,33)]
rrd1 =sc.parallelize(ds1)
rrd2 =sc.parallelize(ds2)


rrd3=rrd1.union(rrd2)
rrd3.reduceByKey(add).collect()

This yields

[(1, 10), (2, 67), (3, 33)]

SortByKey

Defn: sortByKey([ascending], [numTasks])
When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

from operator import add

ds1 = [(1,1),(1,2),(1,3),(2,22)]
ds2 = [(1,4),(2,22),(2,23),(3,33)]
rrd1 =sc.parallelize(ds1)
rrd2 =sc.parallelize(ds2)


rrd3=rrd1.union(rrd2)
rrd3.sortByKey().collect()

This Yields

[(1, 1), (1, 2), (1, 3), (1, 4), (2, 22), (2, 22), (2, 23), (3, 33)]

Join

Def: Join(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.

from operator import add

ds1 = [(1,1),(1,2),(1,3),(2,22)]
ds2 = [(1,3),(2,21),(9,4),(10,22),(11,23),(12,33)]
rrd1 =sc.parallelize(ds1)
rrd2 =sc.parallelize(ds2)

rrd3=rrd1.join(rrd2)
rrd3.collect()

We only have Matching Keys 1 & 3

This Yields

[(1, (1, 3)), (1, (2, 3)), (1, (3, 3)), (2, (22, 21))]

CoGroup

Defn: cogroup(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Seq[V], Seq[W]) tuples. This operation is also called groupWith.

from operator import add

ds1 = [(1,1),(1,2),(1,3),(2,22)]
ds2 = [(1,3),(2,21),(9,4),(10,22),(11,23),(12,33)]
rrd1 =sc.parallelize(ds1)
rrd2 =sc.parallelize(ds2)


rrd3=rrd1.cogroup(rrd2)
rrd3.collect()

This yields

[(1,
  (<pyspark.resultiterable.ResultIterable at 0x109742410>,
   <pyspark.resultiterable.ResultIterable at 0x109776f50>)),
 (2,
  (<pyspark.resultiterable.ResultIterable at 0x109776fd0>,
   <pyspark.resultiterable.ResultIterable at 0x109776a50>)),
 (9,
  (<pyspark.resultiterable.ResultIterable at 0x109776d50>,
   <pyspark.resultiterable.ResultIterable at 0x109776dd0>)),
 (10,
  (<pyspark.resultiterable.ResultIterable at 0x109776e10>,
   <pyspark.resultiterable.ResultIterable at 0x109776d90>)),
 (11,
  (<pyspark.resultiterable.ResultIterable at 0x109776e90>,
   <pyspark.resultiterable.ResultIterable at 0x109776ed0>)),
 (12,
  (<pyspark.resultiterable.ResultIterable at 0x109776a90>,
   <pyspark.resultiterable.ResultIterable at 0x109776b10>))]

Cartesian

cartesian(otherDataset)

When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).

ds1 = [(1,1),(1,2)]
ds2 = [(3,3),(3,4)]
rrd1 =sc.parallelize(ds1)
rrd2 =sc.parallelize(ds2)


rrd3=rrd1.cartesian(rrd2)
rrd3.collect()

This Yields

[((1, 1), (3, 3)), ((1, 1), (3, 4)), ((1, 2), (3, 3)), ((1, 2), (3, 4))]

KeyBY(f)

Used to set generate a (k,rec) from rec.

 ds1 = [(1,1),(2,2),(3,3)]
 left = sc.parallelize(ds1)
 left.keyBy(lambda r: r[0])

Useful in joins

#Create ReKeyed data using keys 4 and 7 from trip rdd
startstations = stations.join(trips.keyBy(lambda r: r[4]))
endstations  = stations.join(trips.keyBy(lambda r: r[7]))

Spark SQL

Create a DataFrame

    rdd=sc.parallelize([(0,1),(0,2),(1,3),(1,5)])
    sqlc=sqlContext.createDataFrame(rdd,["id","score"])
 ```



# Spark2 Course

## Lab2 Implemented in iPython-Notebook

    #Python Version of Lab 1
    import pyspark
    #Read Trips
    #Remove Header
    input1=sc.textFile("data/trips/*")
    header1=input1.first()
    trips=input1.filter(lambda line: line != header1).map(lambda x: x.split(','))

    #Read Stations
    #Remove Header
    input2=sc.textFile("data/stations/*")
    header2=input2.first()
    stations=input2.filter(lambda line: line != header2).map(lambda x: x.split(',')).keyBy(lambda r: r[0])
    #Set the Key Position

    #Station has key field of 0
    #Trips has StartStation at position4
    #Trips has EndStation as position7
    #Create ReKeyed data using keys 4 and 7 from trip rdd
    startstations = stations.join(trips.keyBy(lambda r: r[4]))
    endstations  = stations.join(trips.keyBy(lambda r: r[7]))


    #
    #Examine the lineage of this
    #

    start_stations.toDebugString().split('|')
    startstations.count()
    endstations.count()




    ['(4) PythonRDD[359] at collect at <ipython-input-151-e6868402bfd5>:5 []\n ',
     '  MapPartitionsRDD[358] at mapPartitions at PythonRDD.scala:346 []\n ',
     '  ShuffledRDD[357] at partitionBy at NativeMethodAccessorImpl.java:-2 []\n +-(4) PairwiseRDD[356] at join at <ipython-input-151-e6868402bfd5>:4 []\n    ',
     '  PythonRDD[355] at join at <ipython-input-151-e6868402bfd5>:4 []\n    ',
     '  UnionRDD[354] at union at NativeMethodAccessorImpl.java:-2 []\n    ',
     '  PythonRDD[352] at RDD at PythonRDD.scala:43 []\n    ',
     '  MapPartitionsRDD[255] at textFile at null:-1 []\n    ',
     '  data/stations/* HadoopRDD[254] at textFile at null:-1 []\n    ',
     '  PythonRDD[353] at RDD at PythonRDD.scala:43 []\n    ',
     '  MapPartitionsRDD[252] at textFile at null:-1 []\n    ',
     '  data/trips/* HadoopRDD[251] at textFile at null:-1 []']




    #Implement Partitioning

    input1=sc.textFile("data/trips/*")
    header1=input1.first()
    trips0=input1.filter(lambda line: line != header1).map(lambda x: x.split(','))
    #Partition the data
    #
    #This caused me "too many values" error....
    #
    trips=trips0.partitionBy(trips0.getNumPartitions())
    #Read Stations
    #Remove Header
    input2=sc.textFile("data/stations/*")
    header2=input2.first()
    stations=input2.filter(lambda line: line != header2).map(lambda x: x.split(',')).keyBy(lambda r: r[0])
    #Set the Key Position

    #Station has key field of 0
    #Trips has StartStation at position4
    #Trips has EndStation as position7
    #Create ReKeyed data using keys 4 and 7 from trip rdd
    startstations = stations.join(trips.keyBy(lambda r: r[4]))
    endstations  = stations.join(trips.keyBy(lambda r: r[7]))


    #
    #Examine the lineage of this
    #

    start_stations.toDebugString().split('|')
    startstations.count()
    endstations.count()

    310359



## Reading into a Class

The class looks like this

```python
class army2:
    def __init__(self,list_of_data):
        fields = list_of_data.split(",")
        self.id =fields[0]
        self.rank=fields[1]
        self.num=fields[2]

    def __repr__(self):
        return "We're in the Army2 Now"

def CheckF(x):
    #return str(x[0])+" "+type(x)
    return x[0]+" "+str(type(x))


if __name__ == "__main__":
        a=army2('1,"tim",345')
        print a

However due to seperate threads being invoked in the pyspark environment we need to pip install this module. This is quite easy to do.... there is already some documentation written for PIP here.

from army2 import army2

People=["1,Maj,123","2,Pvt,333","3,Col,999"]
rrd1=sc.parallelize(People)
rrd2=rrd1.map( lambda y:  army2(y))

Targets/Customer Matching

We have 2 lists - Targets (Customers), and Population (People) - as ever - simple

Is *T* in *P* ?

Please note: Nice way to zip - data.

import pyspark

targets=[1,10,20,30]
targets=zip(targets,["Bad People"]*len(targets))
people=range(1000)
people=zip(people,["People"]*len(people))


rdd_targets= sc.parallelize(targets)
rdd_people= sc.parallelize(people)


rdd_matched=rdd_people.join(rdd_targets)

rdd_matched.toDebugString()

print_Table

#Take a 2D array of data and create a dataframe to display
#the data in tabular form
def print_table(column_labels, row_labels, contents):
    tmp = [[t[0]] + t[1] for t in zip(row_labels, contents)]
    df = DataFrame(tmp, columns=column_labels)
    pandas.set_option('display.max_colwidth', 100)
    display(HTML(df.to_html()))

SQL

Let's overlay some structure from our raw data

#after the pig job, we get a ctrl-A separated file
raw_data = sc.textFile("/user/hrt_qa/open_payments/general/post/part-m-*")

#create a SQL Context so that we may use spark-sql
#This allows us to use a very simple subset of SQL, for a more complete
#set of SQL available, you can use Hive as the underlying engine
#by using HiveContext instead of SQLContext
sqlContext = SQLContext(sc)

#split up the line into tokens separated on ctrl-a
parts = raw_data.map(lambda l : l.split('\x01'))

#We're only really concerned about a few fields, so we'll project out only
#the fields we're interested in.
def tokens_to_columns(tokens):
    return Row( physician_id=tokens[7]\
              , physician_name="{} {}".format(tokens[8], tokens[10])\
              , physician_specialty=tokens[21] \
              , payer=tokens[43] \
              , reason=tokens[52] \
              , amount_str=tokens[48] \
              , amount=float(tokens[48]) \
              )

#Consider rows with either empty or null physician ID's to be bad and we want
#to ignore those.
payments = parts.map(tokens_to_columns)\
                .filter(lambda row : len(row.physician_id) > 0)

#Now, we can register this as a table called payments.
#This allows us to refer to the table in our SQL statements
schemaPayments = sqlContext.inferSchema(payments)
schemaPayments.registerAsTable('payments')

Now lets get some data output this rdd

#Broken down by reasons
count_by_reasons = sqlContext.sql("""select reason, count(*) as num_payments 
                                     from payments 
                                     group by reason 
                                     order by num_payments desc""").collect()

print_table(['Payment Reason', '# of Payments']\
               , [x[0] for x in count_by_reasons]\
               , [ [locale.format("%d", x[1], grouping=True)] \
                   for x in count_by_reasons]\
               )