pyspark
Or Spark and Python !!
PLEASE NOTE: You can Only run PYTHON 2.7 Unless you have CDH4 it seems
So I have flipped to my venv for Python
Starting the Shell
As I have my Spark Env all setup in a private Directory I use
source ~/Java.env
pyspark
IPYTHON
To Start pyshell and use the friendly ipython use the following options
export IPYTHON=1 pyshell
IPYTHON-Notebook
TO Start the iPython shell - a little similar to Zepplin - But easier to install :)
export IPYTHON=1 export IPYTHON_OPTS="notebook" pyspark
Spark RDDs
We want to be abe to get Data sets - or RDD's - Reliable Distributed Datasets....
These are some of the common types
Seeing "Into" a RDD
There is a very useful method toDebugString() It can be used to explain how the RDD is being constructed.
Program data
If we were able to generate some data (say using Numpy or Pandas) - and we wanted to process it in spark - this would be a way to do this.
So whilst not an RDD - this technique may also assist in some areas.
data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)
FILE
A simple and humble file can be a RDD. It in not truely an RDD unless it is stored inside HDFS - but for the moment we will ignore this oversight.
lines=sc.textFile("README.md") lines.take(10)
If we see output like
u'Basic', u'#README'
Why the 'u' ? - Python 2.7, It knows that the data is Unicode - Python3 you can happily forget about this.
Whilst we will do more RDD with Files later - we need to try and access other RDD Data Types
Hive
Get the Data from Hive and then process the rdd. This however needs Spark build with Hive !! and Another export Var export SPARK_HIVE=true .
from pyspark import HiveContext rdd =HiveContext(sc).sql('select * from Table')
HBase
Saving Data
We can save data as
- Pickle
- Text
Or being more Hadoop orientated
- As a Sequence File
Sequence FIle
SequenceFile is a flat file consisting of binary key/value pairs. It is extensively used in MapReduce as input/output formats. It is also worth noting that, internally, the temporary outputs of maps are stored using SequenceFile.
The SequenceFile provides a Writer, Reader and Sorter classes for writing, reading and sorting respectively.
There are 3 different SequenceFile formats:
- Uncompressed key/value records.
- Record compressed key/value records - only 'values' are compressed here.
- Block compressed key/value records - both keys and values are collected in 'blocks' separately and compressed. The size of the 'block' is configurable.
The recommended way is to use the SequenceFile.createWriter methods to construct the 'preferred' writer implementation.
rdd = sc.parallelize(range(1, 5)).map(lambda x: (x, "z" * x )) rdd.saveAsSequenceFile("./myOutput.txt") # #Now read back # >>> sorted(sc.sequenceFile("./myOutput.txt").collect()) [(1, u'z'), (2, u'zz'), (3, u'zzz'), (4, u'zzzz')]
Transactions
A transaction is an operation and modifies (transforms) a RDD - they are the core of getting to the data you want.
A word of caution here - in order to Maximize performance you need to take care when writing functions - as Spark is implimented in Scala - you should try and use Functional Programming
Transformation | Description |
---|---|
map(func) | Return a new distributed dataset formed by passing each element of the source through a function func. |
filter(func) | Return a new dataset formed by selecting those elements of the source on which func returns true. |
flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item). |
mapPartitions(func) | Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator[T] => Iterator[U] when running on an RDD of type T. |
mapPartitionsWithSplit(func) | Similar to mapPartitions, but also provides func with an integer value representing the index of the split, so func must be of type (Int, Iterator[T]) => Iterator[U] when running on an RDD of type T. |
sample(withReplacement, fraction, seed) | Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed. |
union(otherDataset) | Return a new dataset that contains the union of the elements in the source dataset and the argument. |
distinct([numTasks])) | Return a new dataset that contains the distinct elements of the source dataset. |
groupByKey([numTasks]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, Seq[V]) pairs. Note: By default, this uses only 8 parallel tasks to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. |
reduceByKey(func, [numTasks]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument. |
sortByKey([ascending], [numTasks]) | When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument. |
join(otherDataset, [numTasks]) | When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. |
cogroup(otherDataset, [numTasks]) | When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Seq[V], Seq[W]) tuples. This operation is also called groupWith. |
cartesian(otherDataset) | When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements). |
KeyBy(f) | Sets the Key of a Record |
MAP
A Map transformation is one that does a similar operation to a Python Map call
I am deliberatly going to do this in not a good way to start with. I am going to ignore my Functional Warning.
def MyUpcase(s): d=s d=d.upper() return d code=sc.textfile('README.md') #Now transform using a Map CODE=code.map(MyUpcase)
Now whilst that works - this mechanism will not be as fast as possible. We should use Functional Style
code=sc.textfile('README.md') #Now transform using a Map CODE=code.map(lambda x: x.upper())
You should strive to write your transformations like this.
There is an argument for
If you want performance write it in Scala
But Python gives you access to Numpy and Pandas....
MapValues
Here using mapValue(list) - you return a LIST of the values. You can also fo mapValue(Set) - a Set of Values or mapValue(Function)
import pyspark import random def rnd(): return random.randrange(0,100) #20 Keys with data rdd = sc.parallelize([(rnd()%20,rnd()) for a in range(100)]) **GroupByKey** Is a **BAD** function as it need lots of sorting. **aggregatebykey** is MUCH MUCH better. #Group the data #I Want the Key and the list of items sorted(rdd.groupByKey().mapValues(list).collect())
To Get number of items
sorted(rdd.groupByKey().mapValues(len).collect())
Reading CSV
Map should be used for converting an input line into individual fields..
#Python Version of Lab 1 import pyspark #Read Trips #Remove Header input1=sc.textFile("data/trips/*") header1=input1.first() trips=input1.filter(lambda line: line != header1).map(lambda x: x.split(','))
Filter
Defn: Filter(func)
*Return a new dataset formed by selecting those elements of the source on which func returns true.
def MyFilterFunction(s,ss): if s.find(ss) != -1: return True else: return False code=sc.textfile('README.md') #Now transform using a Map CODE=code.map(lambda x: x.upper()) #Look for string PIP MATCHES=CODE.filter(lambda x: MyFilterFunction(x,"PIP")) MATCHES.collect()
Here are some more Filter examples
#Only want lines with ERROR in them logData = sc.textFile(logFile).cache() errors = logData.filter(lambda line: "ERROR" in line)
Using a Function this would be
def is_error(line): return "ERROR" in line errors = logData.filter(is_error) logData = sc.textFile(logFile).cache() errors = logData.filter(is_error)
Functions can access objects in enclosing scopes, although modifications to those objects within RDD methods will not be propagated back:
Multiple matches
error_keywords = ["Exception", "Error"] def is_error(line): return any(keyword in line for keyword in error_keywords) errors = logData.filter(is_error)
Filter on Number Data looks like
[(0, 'Junk'), (1, 'Junk'), (2, 'Junk'), (3, 'Junk'), (4, 'Junk'), (5, 'Junk'), (6, 'Junk'), (7, 'Junk'), (8, 'Junk'), (9, 'Junk')]
To Only take values < 5....
sc.parallelize(zip(range(10),["Junk"]*10)).filter(lambda x: x[0]< 5).collect()
FlatMap
Defn: flatMap(func)
Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
code=sc.textfile('README.md') #Now transform using a Map codelist=code.flatMap(lambda x: x.split())
mapPartitions
Defn: mapPartitions(func)
Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator[T] => Iterator[U] when running on an RDD of type T.
This function seems to work best when using yield - which is used as a Python Iterator.
data = [ [1, 2, 3], [3,2,4], [5,2,7]] def MaxList(partition): for e in partition: yield max(partition) distData = sc.parallelize(data) distData.mapPartitions(MaxList).collect()
You should see
3,4,7
mapPartitionsWithSplit
Defn: mapPartitionsWithSplit(func)
Similar to mapPartitions, but also provides func with an integer value representing the index of the split, so func must be of type (Int, Iterator[T]) => Iterator[U] when running on an RDD of type T.
This functions appears to be deprecciated - replaced with mapPartitionWithIndex
Sample
Defn: sample(withReplacement, fraction, seed)
Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
rdd_data=sc.parallelize(range(1000)) rdd_data.sample(False,0.1,100).collect()
Union
Defn: union(otherDataset)
Return a new dataset that contains the union of the elements in the source dataset and the argument.
ds1 = [1, 2, 3] ds2 = [9,8,7] rrd1 =sc.parallelize(ds1) rrd2 =sc.parallelize(ds2) rrd3=rrd1.union(rrd2) rrd3.collect()
Distinct
Defn:
distinct([numTasks])) Return a new dataset that contains the distinct elements of the source dataset.
ds1 = [1, 2, 3] ds2 = [1,2,3,9,8,7] rrd1 =sc.parallelize(ds1) rrd2 =sc.parallelize(ds2) rrd3=rrd1.union(rrd2) rrd3.collect() rrd3.distinct().collect()
[1, 2, 3, 1, 2, 3, 9, 8, 7]
GroupByKey
Defn: When called on a dataset of (K, V) pairs, returns a dataset of (K, Seq[V]) pairs. Note: By default, this uses only 8 parallel tasks to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.
ds1 = [(1,1),(1,2),(1,3),(2,22)] ds2 = [(1,4),(2,22),(2,23),(3,33)] rrd1 =sc.parallelize(ds1) rrd2 =sc.parallelize(ds2) rrd3=rrd1.union(rrd2) rrd3.groupByKey().collect()
Shows just the Keys from the Tuples
[(1, <pyspark.resultiterable.ResultIterable at 0x10973ed10>), (2, <pyspark.resultiterable.ResultIterable at 0x10973e690>), (3, <pyspark.resultiterable.ResultIterable at 0x10973e510>)]
NASTY DO NOT USE if you can help it
aggregatebykey
This reduces the key in the partition. Resulting in less SHUFFLE
def seq(x, y): return x+y def comb(x,y): return x+y #0 means we wants ints as 0 is type int #seq and comb.... not fully understood sorted(rdd.aggregateByKey(0,seq,comb).collect())
ReduceByKey
Defn: reduceByKey(func, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
from operator import add ds1 = [(1,1),(1,2),(1,3),(2,22)] ds2 = [(1,4),(2,22),(2,23),(3,33)] rrd1 =sc.parallelize(ds1) rrd2 =sc.parallelize(ds2) rrd3=rrd1.union(rrd2) rrd3.reduceByKey(add).collect()
This yields
[(1, 10), (2, 67), (3, 33)]
SortByKey
Defn: sortByKey([ascending], [numTasks])
When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
from operator import add ds1 = [(1,1),(1,2),(1,3),(2,22)] ds2 = [(1,4),(2,22),(2,23),(3,33)] rrd1 =sc.parallelize(ds1) rrd2 =sc.parallelize(ds2) rrd3=rrd1.union(rrd2) rrd3.sortByKey().collect()
This Yields
[(1, 1), (1, 2), (1, 3), (1, 4), (2, 22), (2, 22), (2, 23), (3, 33)]
Join
Def: Join(otherDataset, [numTasks])
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.
from operator import add ds1 = [(1,1),(1,2),(1,3),(2,22)] ds2 = [(1,3),(2,21),(9,4),(10,22),(11,23),(12,33)] rrd1 =sc.parallelize(ds1) rrd2 =sc.parallelize(ds2) rrd3=rrd1.join(rrd2) rrd3.collect()
We only have Matching Keys 1 & 3
This Yields
[(1, (1, 3)), (1, (2, 3)), (1, (3, 3)), (2, (22, 21))]
CoGroup
Defn: cogroup(otherDataset, [numTasks])
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Seq[V], Seq[W]) tuples. This operation is also called groupWith.
from operator import add ds1 = [(1,1),(1,2),(1,3),(2,22)] ds2 = [(1,3),(2,21),(9,4),(10,22),(11,23),(12,33)] rrd1 =sc.parallelize(ds1) rrd2 =sc.parallelize(ds2) rrd3=rrd1.cogroup(rrd2) rrd3.collect()
This yields
[(1, (<pyspark.resultiterable.ResultIterable at 0x109742410>, <pyspark.resultiterable.ResultIterable at 0x109776f50>)), (2, (<pyspark.resultiterable.ResultIterable at 0x109776fd0>, <pyspark.resultiterable.ResultIterable at 0x109776a50>)), (9, (<pyspark.resultiterable.ResultIterable at 0x109776d50>, <pyspark.resultiterable.ResultIterable at 0x109776dd0>)), (10, (<pyspark.resultiterable.ResultIterable at 0x109776e10>, <pyspark.resultiterable.ResultIterable at 0x109776d90>)), (11, (<pyspark.resultiterable.ResultIterable at 0x109776e90>, <pyspark.resultiterable.ResultIterable at 0x109776ed0>)), (12, (<pyspark.resultiterable.ResultIterable at 0x109776a90>, <pyspark.resultiterable.ResultIterable at 0x109776b10>))]
Cartesian
cartesian(otherDataset)
When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
ds1 = [(1,1),(1,2)] ds2 = [(3,3),(3,4)] rrd1 =sc.parallelize(ds1) rrd2 =sc.parallelize(ds2) rrd3=rrd1.cartesian(rrd2) rrd3.collect()
This Yields
[((1, 1), (3, 3)), ((1, 1), (3, 4)), ((1, 2), (3, 3)), ((1, 2), (3, 4))]
KeyBY(f)
Used to set generate a (k,rec) from rec.
ds1 = [(1,1),(2,2),(3,3)] left = sc.parallelize(ds1) left.keyBy(lambda r: r[0])
Useful in joins
#Create ReKeyed data using keys 4 and 7 from trip rdd startstations = stations.join(trips.keyBy(lambda r: r[4])) endstations = stations.join(trips.keyBy(lambda r: r[7]))
Spark SQL
Create a DataFrame
rdd=sc.parallelize([(0,1),(0,2),(1,3),(1,5)]) sqlc=sqlContext.createDataFrame(rdd,["id","score"]) ``` # Spark2 Course ## Lab2 Implemented in iPython-Notebook #Python Version of Lab 1 import pyspark #Read Trips #Remove Header input1=sc.textFile("data/trips/*") header1=input1.first() trips=input1.filter(lambda line: line != header1).map(lambda x: x.split(',')) #Read Stations #Remove Header input2=sc.textFile("data/stations/*") header2=input2.first() stations=input2.filter(lambda line: line != header2).map(lambda x: x.split(',')).keyBy(lambda r: r[0]) #Set the Key Position #Station has key field of 0 #Trips has StartStation at position4 #Trips has EndStation as position7 #Create ReKeyed data using keys 4 and 7 from trip rdd startstations = stations.join(trips.keyBy(lambda r: r[4])) endstations = stations.join(trips.keyBy(lambda r: r[7])) # #Examine the lineage of this # start_stations.toDebugString().split('|') startstations.count() endstations.count() ['(4) PythonRDD[359] at collect at <ipython-input-151-e6868402bfd5>:5 []\n ', ' MapPartitionsRDD[358] at mapPartitions at PythonRDD.scala:346 []\n ', ' ShuffledRDD[357] at partitionBy at NativeMethodAccessorImpl.java:-2 []\n +-(4) PairwiseRDD[356] at join at <ipython-input-151-e6868402bfd5>:4 []\n ', ' PythonRDD[355] at join at <ipython-input-151-e6868402bfd5>:4 []\n ', ' UnionRDD[354] at union at NativeMethodAccessorImpl.java:-2 []\n ', ' PythonRDD[352] at RDD at PythonRDD.scala:43 []\n ', ' MapPartitionsRDD[255] at textFile at null:-1 []\n ', ' data/stations/* HadoopRDD[254] at textFile at null:-1 []\n ', ' PythonRDD[353] at RDD at PythonRDD.scala:43 []\n ', ' MapPartitionsRDD[252] at textFile at null:-1 []\n ', ' data/trips/* HadoopRDD[251] at textFile at null:-1 []'] #Implement Partitioning input1=sc.textFile("data/trips/*") header1=input1.first() trips0=input1.filter(lambda line: line != header1).map(lambda x: x.split(',')) #Partition the data # #This caused me "too many values" error.... # trips=trips0.partitionBy(trips0.getNumPartitions()) #Read Stations #Remove Header input2=sc.textFile("data/stations/*") header2=input2.first() stations=input2.filter(lambda line: line != header2).map(lambda x: x.split(',')).keyBy(lambda r: r[0]) #Set the Key Position #Station has key field of 0 #Trips has StartStation at position4 #Trips has EndStation as position7 #Create ReKeyed data using keys 4 and 7 from trip rdd startstations = stations.join(trips.keyBy(lambda r: r[4])) endstations = stations.join(trips.keyBy(lambda r: r[7])) # #Examine the lineage of this # start_stations.toDebugString().split('|') startstations.count() endstations.count() 310359 ## Reading into a Class The class looks like this ```python class army2: def __init__(self,list_of_data): fields = list_of_data.split(",") self.id =fields[0] self.rank=fields[1] self.num=fields[2] def __repr__(self): return "We're in the Army2 Now" def CheckF(x): #return str(x[0])+" "+type(x) return x[0]+" "+str(type(x)) if __name__ == "__main__": a=army2('1,"tim",345') print a
However due to seperate threads being invoked in the pyspark environment we need to pip install this module. This is quite easy to do.... there is already some documentation written for PIP here.
from army2 import army2 People=["1,Maj,123","2,Pvt,333","3,Col,999"] rrd1=sc.parallelize(People) rrd2=rrd1.map( lambda y: army2(y))
Targets/Customer Matching
We have 2 lists - Targets (Customers), and Population (People) - as ever - simple
Is *T* in *P* ?
Please note: Nice way to zip - data.
import pyspark targets=[1,10,20,30] targets=zip(targets,["Bad People"]*len(targets)) people=range(1000) people=zip(people,["People"]*len(people)) rdd_targets= sc.parallelize(targets) rdd_people= sc.parallelize(people) rdd_matched=rdd_people.join(rdd_targets) rdd_matched.toDebugString()
print_Table
#Take a 2D array of data and create a dataframe to display #the data in tabular form def print_table(column_labels, row_labels, contents): tmp = [[t[0]] + t[1] for t in zip(row_labels, contents)] df = DataFrame(tmp, columns=column_labels) pandas.set_option('display.max_colwidth', 100) display(HTML(df.to_html()))
SQL
Let's overlay some structure from our raw data
#after the pig job, we get a ctrl-A separated file raw_data = sc.textFile("/user/hrt_qa/open_payments/general/post/part-m-*") #create a SQL Context so that we may use spark-sql #This allows us to use a very simple subset of SQL, for a more complete #set of SQL available, you can use Hive as the underlying engine #by using HiveContext instead of SQLContext sqlContext = SQLContext(sc) #split up the line into tokens separated on ctrl-a parts = raw_data.map(lambda l : l.split('\x01')) #We're only really concerned about a few fields, so we'll project out only #the fields we're interested in. def tokens_to_columns(tokens): return Row( physician_id=tokens[7]\ , physician_name="{} {}".format(tokens[8], tokens[10])\ , physician_specialty=tokens[21] \ , payer=tokens[43] \ , reason=tokens[52] \ , amount_str=tokens[48] \ , amount=float(tokens[48]) \ ) #Consider rows with either empty or null physician ID's to be bad and we want #to ignore those. payments = parts.map(tokens_to_columns)\ .filter(lambda row : len(row.physician_id) > 0) #Now, we can register this as a table called payments. #This allows us to refer to the table in our SQL statements schemaPayments = sqlContext.inferSchema(payments) schemaPayments.registerAsTable('payments')
Now lets get some data output this rdd
#Broken down by reasons count_by_reasons = sqlContext.sql("""select reason, count(*) as num_payments from payments group by reason order by num_payments desc""").collect() print_table(['Payment Reason', '# of Payments']\ , [x[0] for x in count_by_reasons]\ , [ [locale.format("%d", x[1], grouping=True)] \ for x in count_by_reasons]\ )