Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Spark Application Planning

...

Widget Connector
width1000
urlhttps://www.youtube.com/watch?v=5hVWKahgb8M
height600

Hi. Welcome to the Spark Fundamentals course. This lesson will cover Spark application programming.
After completing this lesson, you should be able to understand the purpose and usage of
the SparkContext.This lesson will show you how to can get started by programming your
own Spark application. First you will see how to link to Spark. Next you will see how
to run some Spark examples. You should also be able to understand how to pass functions
to Spark and be able to create and run a Spark standalone application. Finally, you should
be able to submit applications to the Spark cluster.
The SparkContext is the main entry point to everything Spark. It can be used to create
RDDs and shared variables on the cluster. When you start up the Spark Shell, the SparkContext
is automatically initialized for you with the variable sc. For a Spark application,
you must first import some classes and implicit conversions and then create the SparkContext
object. The three import statements for Scala are shown on the slide here.
Each Spark application you create requires certain dependencies. The next three slides
will show you how to link to those dependencies depending on which programming language you
decide to use.

To link with Spark using Scala, you must have a compatible version of Scala with the Spark
you choose to use. For example, Spark 1.1.1 uses Scala 2.10, so make sure that you have
Scala 2.10 if you wish to write applications for Spark 1.1.1.
To write a Spark application, you must add a Maven dependency on Spark. The information
is shown on the slide here. If you wish to access a Hadoop cluster, you need to add a
dependency to that as well.
In the lab environment, this is already set up for you. The information on this page shows
how you would set up for your own Spark cluster.

Spark 1.1.1 works with Python 2.6 or higher, but not Python 3. It uses the standard CPython
interpreter, so C libraries like NumPy can be used.
To run Spark applications in Python, use the bin/spark-submit script located in the Spark's
home directory. This script will load the Spark's Java/Scala libraries and allow you
to submit applications to a cluster. If you wish to use HDFS, you will have to link to
it as well. In the lab environment, you will not need to do this as Spark is bundled with
it. You also need to import some Spark classes shown here.

Spark 1.1.1 works with Java 6 and higher. If you are using Java 8, Spark supports lambda
expressions for concisely writing functions. Otherwise, you can use the org.apache.spark.api.java.function
package with older Java versions.

As with Scala, you need to a dependency on Spark, which is available through Maven Central.
If you wish to access an HDFS cluster, you must add the dependency there as well. Last,
but not least, you need to import some Spark classes.
Once you have the dependencies established, the first thing is to do in your Spark application
before you can initialize Spark is to build a SparkConf object. This object contains information
about your application.
For example, val conf = new SparkConf().setAppName(appName).setMaster(master).
You set the application name and tell it which is the master node. The master parameter can
be a standalone Spark distribution, Mesos, or a YARN cluster URL. You can also decide
to use the local keyword string to run it in local mode. In fact, you can run local[16]
to specify the number of cores to allocate for that particular job or Spark shell as
16.

For production mode, you would not want to hardcode the master path in your program.
Instead, launch it as an argument to the spark-submit command.
Once you have the SparkConf all set up, you pass it as a parameter to the SparkContext
constructor to create the SparkContext
Here's the information for Python. It is pretty much the same information as Scala. The syntax
here is slightly different, otherwise, you are required to set up a SparkConf object
to pass as a parameter to the SparkContext object. You are also recommended to pass the
master parameter as an argument to the spark-submit operation.
Here's the same information for Java. Same idea, you need to create the SparkConf object
and pass that to the SparkContext, which in this case, would be a JavaSparkContext. Remember,
when you imported statements in the program, you imported the JavaSparkContext libraries.Passing functions to Spark. I wanted to touch a little bit on

Widget Connector
width1000
urlhttps://www.youtube.com/watch?v=3WQnyfjMsSU
height600

Passing functions to Spark. I wanted to touch a little bit on this before we move further.
This is important to understand as you begin to think about the business logic of your
application.
The design of Spark's API relies heavily on passing functions in the driver program to
run on the cluster.

When a job is executed, the Spark driver needs to tell its worker
how to process the data.
There are three methods that you can use to pass functions.
The first method to do this is using an anonymous function syntax. You saw briefly what an anonymous
function is in the first lesson. This is useful for short pieces of code. For example, here
we define the anonymous function that takes in a particular parameter x of type Int and
add one to it. Essentially, anonymous functions are useful for one-time use of the function.
In other words, you don't need to explicitly define the function to use it. You define
it as you go. Again, the left side of the => are the parameters or the arguments. The
right side of the => is the body of the function.

Another method to pass functions around Spark is to use static methods in a global singleton
object. This means that you can create a global object, in the example, it is the object MyFunctions.
Inside that object, you basically define the function func1. When the driver requires that
function, it only needs to send out the object -- the worker will be able to access it. In
this case, when the driver sends out the instructions to the worker, it just has to send out the
singleton object.
It is possible to pass reference to a method in a class instance, as opposed to a singleton
object. This would require sending the object that contains the class along with the method.
To avoid this consider copying it to a local variable within the function instead of accessing
it externally.

Example, say you have a field with the string Hello. You want to avoid calling that directly
inside a function as shown on the slide as x => field + x.
Instead, assign it to a local variable so that only the reference is passed along and
not the entire object shown val field_ = this.field.
For an example such as this, it may seem trivial, but imagine if the field object is not a simple
text Hello, but is something much larger, say a large log file. In that case, passing
by reference will have greater value by saving a lot of storage by not having to pass the
entire file.

Back to our regularly scheduled program. At this point, you should know how to link
dependencies with Spark and also know how to initialize the SparkContext. I also touched
a little bit on passing functions with Spark to give you a better view of how you can program
your business logic. This course will not focus too much on how to program business
logics, but there are examples available for you to see how it is done. The purpose is
to show you how you can create an application using a simple, but effective example which
demonstrates Spark's capabilities.

Once you have the beginning of your application ready by creating the SparkContext object,
you can start to program in the business logic using Spark's API available in Scala, Java,
or Python. You create the RDD from an external dataset or from an existing RDD. You use transformations
and actions to compute the business logic. You can take advantage of RDD persistence,
broadcast variables and/or accumulators to improve the performance of your jobs.
Here's a sample Scala application. You have your import statement. After the beginning
of the object, you see that the SparkConf is created with the application name. Then
a SparkContext is created. The several lines of code after is creating the RDD from a text
file and then performing the Hdfs test on it to see how long the iteration through the
file takes. Finally, at the end, you stop the SparkContext by calling the stop() function.
Again, just a simple example to show how you would create a Spark application. You will
get to practice this in the lab exercise.
I mentioned that there are examples available which shows the various usage of Spark. Depending
on your programming language preference, there are examples in all three languages that work
with Spark. You can view the source code of the examples on the Spark website or within
the Spark distribution itself. I provided some screenshots here to show you some of
the examples available.

On the slide, I also listed the step to run these examples. To run Scala or Java examples,
you would execute the run-example script under the Spark's home/bin directory. So for example,
to run the SparkPi application, execute run-example SparkPi, where SparkPi would be the name of
the application. Substitute that with a different application name to run that other application.
To run the sample Python applications, use the spark-submit command and provide the path
to the application.In the next three slides, you will see another example of creating a Spark application. First
you

Widget Connector
width1000
urlhttps://www.youtube.com/watch?v=JgdQtbZNdj4
height600

In the next three slides, you will see another example of creating a Spark application. First you will see how to do this in Scala. The next following sets of slides will show Python
and Java.
The application shown here counts the number of lines with 'a' and the number of lines
with 'b'. You will need to replace the YOUR_SPARK_HOME with the directory where Spark is installed,
if you wish to code this application.
Unlike the Spark shell, you have to initialize the SparkContext in a program. First you must
create a SparkConf to set up your application's name. Then you create the SparkContext by
passing in the SparkConf object. Next, you create the RDD by loading in the textFile,
and then caching the RDD. Since we will be applying a couple of transformations on it,
caching will help speed up the process, especially if the logData RDD is large. Finally, you
get the values of the RDD by executing the count action on it. End the program by printing
it out onto the console.
In Python, this application does the exact same thing, that is, count the number of lines
with 'a' in it, and the number of lines with 'b' in it. You use a SparkContext object to
create the RDDs and cache it.

Then you run the transformations and actions, follow by
a print to the console. Nothing entirely new here, just a difference in syntax.
Similar to Scala and Python, in Java you need to get a JavaSparkContext. RDDs are represented
by JavaRDD. Then you run the transformations and actions on them. The lambda expressions
of Java 8 allows you to concisely write functions. Otherwise, you can use the classes in the
org.apache.spark.api.java.function package for older versions of java. The business logic
is the same as the previous two examples to count the number of a's and b's from the Readme
file. Just a matter of difference in the syntax and library names.
Up until this point, you should know how to create a Spark application using any of the
supported programming languages. Now you get to see how to run the application. You will
need to first define the dependencies. Then you have to package the application together
using system build tools such as Ant, sbt, or Maven. The examples here show how you would
do it using the various tools. You can use any tool for any of the programming languages.
For Scala, the example is shown using sbt, so you would have a simple.sbt file. In Java,
the example shows using Maven so you would have the pom.xml file. In Python, if you need
to have dependencies that requires third party libraries, then you can use the --py-files
argument to handle that.

Again, shown here are examples of what a typical directory structure would look like for the
tool that you choose.
Finally, once you have the JAR packaged created, run the spark-submit to execute the application.

In the lab exercise, you will get to practice this.
In short, you package up your application into a JAR for Scala or Java or a set of .py
or .zip files for Python.
To submit your application to the Spark cluster, you use spark-submit command, which is located
under the $SPARK_HOME/bin directory.

The options shown on the slide are the commonly used options. To see other options, just invoke
spark-submit with the help argument.
Let's briefly go over what each of these options mean.
The class option is the main entry point to your class. If it is under a package name,
you must provide the fully qualified name. The master URL is where your cluster is located.
Remember that it is recommended approach to provide the master URL here, instead of hardcoding
it in your application code.
The deploy-mode is whether you want to deploy your driver on the worker nodes (cluster)
or locally as an external client (client). The default deploy-mode is client.
The conf option is any configuration property you wish to set in key=value format.
The application jar is the file that you packaged up using one of the build tools.
Finally, if the application has any arguments, you would supply it after the jar file.
Here's an actual example of running a Spark application locally on 8 cores. The class
is the org.apache.spark.examples.SparkPi. local[8] is saying to run it locally on 8
cores. The examples.jar is located on the given path with the argument 100 to be passed
into the SparkPi application.
Having completed this lesson, you should now know how to create a standalone Spark application
and run it by submitting it to the Spark Cluster. You saw briefly on the different methods on
how to pass functions in Spark. All three programming languages were shown in this lesson.Next steps. Copmlete lab exercise #3, Creating a Spark application and then proceed to the

Introduction to the Spark libraries

...

Widget Connector
width1000
urlhttps://www.youtube.com/watch?v=DReyHQAQxBw
height600

This lesson will cover the Introduction to the Spark libraries. After completing this lesson, you should be able to understand and use the various Spark libraries including SparkSQL, Spark Streaming, MLlib and GraphX

Spark comes with libraries that you can utilize for specific use cases. These libraries are an extension of the Spark Core API. Any improvements made to the core will automatically take effect with these libraries. One of the big benefits of Spark is that there is little overhead to use these libraries with Spark as they are tightly integrated. The rest of this lesson will cover on a high level, each of these libraries and their capabilities. The main focus will be on Scala with specific callouts to Java or Python if there are major differences. The four libraries are Spark SQL, Spark Streaming, MLlib, and GraphX. Spark SQL allows you to write relational queries that are expressed in either SQL, HiveQL, or Scala to be executed using Spark. Spark SQL has a new RDD called the SchemaRDD.

The SchemaRDD consists of rows objects and a schema that describes the type of data in each column in the row. You can think of this as a table in a traditional relational database. You create a SchemaRDD from existing RDDs, a Parquet file, a JSON dataset, or using HiveQL to query against the data stored in Hive. You can write Spark SQL application using Scala, Java or Python.

The SQLContext is created from the SparkContext. You can see here that in Scala, the SQL context is created from the SparkContext. In Java, you create the JavaSQLContext from the JavaSparkContext.

In Python, you also do the same. There is a new RDD, called the SchemaRDD that you use with Spark SQL. In Scala only, you have to import a library to convert an existing RDD to a SchemaRDD. For the others, you do not need to import a library to work with the Schema RDD. So how do these SchemaRDDs get created? There are two ways you can do this. The first method uses reflection to infer the schema of the RDD. This leads to a more concise code and works well when you already know the schema while writing your Spark application. The second method uses a programmatic interface to construct a schema and then apply that to an existing RDD. This method gives you more control when you don't know the schema of the RDD until runtime. The next two slides will cover these two methods in more detail.

The first of the two methods used to determine the schema of the RDD is to use reflection. In this scenario, use the case class in Scala to define the schema of the table. The arguments of the case class are read using reflection and becomes the names of the columns.

Let's go over the code shown on the slide. First thing is to create the RDD of the person object. You load the text file in using the textFile method. Then you invoke the map transformation to split the elements on a comma to get the individual columns of name and age. The final transformation creates the Person object based on the elements. Next you register the people RDD that you just created by loading in the text file and performing the transformation as a table.

Once the RDD is a table, you use the sql method provided by SQLContext to run SQL statements. The example here selects from the people table, the schemaRDD. Finally, the results that comes out from the select statement is also a SchemaRDD. That RDD, teenagers on our slide, can run normal RDD operations. The programmatic interface is used when cannot define the case classes ahead of time. For example, when the structure of records is encoded in a string or a text dataset will be parsed and fields will be projected different for different users.

A schemaRDD is created with three steps. The first is to create an RDD of Rows from the original RDD. In the example, we create a schemaString of name and age. The second step is to create the schema using the RDD from step one. The schema is represented by a StructType that takes the schemaString from the first step and splits it up into StructFields. In the example, the schema is created using the StructType by splitting the name and age schemaString and mapping it to the StructFields, name and age. The third step convert records of the RDD of people to Row objects.

Then you apply the schema to the row RDD. Once you have your SchemaRDD, you register that RDD as a table and then you run sql statements against it using the sql method. The results are returned as the SchemaRDD and you can run any normal RDD operation on it. In the example, we select the name from the people table, which is the SchemaRDD. Then we print it out on the console.

Widget Connector
width1000
urlhttps://www.youtube.com/watch?v=PJ9KXtQEHCQ
height600

Spark streaming gives you the capability to process live streaming data in small batches. Utilizing Spark's core, Spark Streaming is scalable, high-throughput and fault-tolerant. You write Stream programs with DStreams, which is a sequence of RDDs from a stream of data. There are various data sources that Spark Streaming receives from including, Kafka, Flume, HDFS, Kinesis, or Twitter. It pushes data out to HDFS, databases, or some sort of dashboard.

Spark Streaming supports Scala, Java and Python. Python was actually introduced with Spark 1.2. Python has all the transformations that Scala and Java have with DStreams, but it can only support text data types. Support for other sources such as Kafka and Flume will be available in future releases for Python. Here's a quick view of how Spark Streaming works. First the input stream comes in to Spark Streaming. Then that data stream is broken up into batches of data that is fed into the Spark engine for processing. Once the data has been processed, it is sent out in batches.

Spark Stream support sliding window operations. In a windowed computation, every time the window slides over a source of DStream, the source RDDs that falls within the window are combined and operated upon to produce the resulting RDD. There are two parameters for a sliding window. The window length is the duration of the window and the sliding interval is the interval in which the window operation is performed. Both of these must be in multiples of the batch interval of the source DStream.

In the diagram, the window length is 3 and the sliding interval is 2. To put it in a different perspective, say you wanted to generate word counts over last 30 seconds of data, every 10 seconds. To do this, you would apply the reduceByKeyAndWindow operation on the pairs of DStream of (Word,1) pairs over the last 30 seconds of data. Here we a have a simple example. We want to count the number of words coming in from the TCP socket. First and foremost, you must import the appropriate libraries. Then, you would create the StreamingContext object. In it, you specify to use two threads with the batch interval of 1 second. Next, you create the DStream, which is a sequence of RDDs, that listens to the TCP socket on the given hostname and port. Each record of the DStream is a line of text. You split up the lines into individual words using the flatMap function.

The flatMap function is a one-to-many DStream operation that takes one line and creates a new DStream by generating multiple records (in our case, that would be the words on the line). Finally, with the words DStream, you can count the words using the map and reduce model. Then you print out the results to the console. One thing to note is that when each element of the application is executed, the real processing doesn't actually happen yet. You have to explicitly tell it to start. Once the application begins, it will continue running until the computation terminates. The code snippets that you see here is from a full sample found in the NetworkWordCount. To run the full example, you must first start up netcat, which is a small utility found in most Unix-like systems. This will act as a data source to give the application streams of data to work with. Then, on a different terminal window, run the application using the command shown here.

Widget Connector
width1000
urlhttps://www.youtube.com/watch?v=dmRn-mGvg4M
height600

Here is a really short overview of the machine learning library. The MLlib library contains algorithms and utilities for classification, regression, clustering, collaborative filtering and dimensionality reduction. Essentially, you would use this for specific machine learning use cases that requires these algorithms. In the lab exercise, you will use the clustering K-Means algorithm on a set of taxi drop off points to figure out potentially where the best place to hail a cab would be.

The GraphX is another library that sits on top of the Spark Core. It is basically a graph processing library which can used for social networks and language modeling. Graph data and the requirement for graph parallel systems is becoming more common, which is why the GraphX library was developed. Specific scenarios would not be efficient if it is processed using the data-parallel model. A need for the graph-parallel model is introduced with new graph-parallel systems like Giraph and GraphLab to efficiently execute graph algorithms much faster than general data-parallel systems.

There are new inherent challenges that comes with graph computations, such as constructing the graph, modifying its structure, or expressing computations that span several graphs. As such, it is often necessary to move between table and graph views depending on the objective of the application and the business requirements.

The goal of GraphX is to optimize the process by making it easier to view data both as a graph and as collections, such as RDD, without data movement or duplication. The lab exercise goes through an example of loading in a text file and creating a graph from it to find attributes of the top users. Having completed this lesson, you should be able to understand and use the various Spark libraries.

Complete lab exercise #4, creating applications using Spark SQL, MLib, Spark Streaming, and GraphX and then proceed to the next lesson in this course.