spark

Spark

Adding spark ENV Variable from the shell...

export SPARK_HOME=~/JAVA_ENV/spark/bin

This should be placed in your ~/.bashrc

Starting in ipython

    IPYTHON_OPTS="notebook " ~/JAVA_ENV/spark/bin/pyspark

Then do

import pyspark

File Reading

import pyspark
# Load a text file and convert each line to a dictionary.
lines = sc.textFile("/Users/tim/spark/people/people.csv")
linecount = lines.count()

Lets looks at the Transformations of the data - becuase this is important to understand what is going on.

lines.take(2)

[u'tim,uk,49', u'moh,om,29']

parts.collect()

[[u'tim', u'uk', u'49'],
 [u'moh', u'om', u'29'],
 [u'omm', u'om', u'30'],
 [u'sma', u'om', u'27']]

people.collect()

[{'age': 49, 'country': u'uk', 'name': u'tim'},
 {'age': 29, 'country': u'om', 'name': u'moh'},
 {'age': 30, 'country': u'om', 'name': u'omm'},
 {'age': 27, 'country': u'om', 'name': u'sma'}]

File to Spark SQL

import pyspark

# Load a text file and convert each line to a dictionary.
lines = sc.textFile("/Users/tim/spark/people/people.csv")
#Split on ,
parts = lines.map(lambda l: l.split(","))
#Make a Record
people = parts.map(lambda p:{"name": p[0],"country": p[1], "age": int(p[2])})


#
# Spark SQL time
#
# Infer the schema, and register the SchemaRDD as a table
peopleTable = sqlCtx.createDataFrame(people)
peopleTable.registerAsTable("people")

#
# Now Query 
#
twntysomething = sqlCtx.sql("SELECT name FROM people WHERE age <= 30")
#
#Still is an RRD
#
teenNames = twntysomething.map(lambda p: "Name: " + p.name)
for n in teenNames.collect():
    print n

More

Read a table in as cache memory

cacheTable("people")

Dev

Counting Files

data=sc.textFile(/user/root/*.txt)
data.count()

Counting Words in Files

#read in text file and split each document into words
tokenized = sc.textFile('/user/root/*.txt').flatMap(lambda line: line.split(" "))
words=tokenized.collect()
print("There are {} words in the files".format(len(words)))

Counting Words in Files

tofind="at"
#read in text file and split each document into words
tokenized = sc.textFile('/user/root/*.txt').flatMap(lambda line: line.split(" "))
# filter out words with fewer than threshold occurrences
matched = tokenized.filter(lambda word: word if word==tofind else "")
words=matched.collect()
print("There are {} words in the files".format(len(words)))

Match Text with Key Value

tofind="at" tokens=sc.textFile('/user/root/*.txt').flatMap(lambda line: line.split(" "))