Joining it all together, some notes from the many web sites, in order to get the demo running on the PC. This was not trivial, as nothing works out of the box, along the way I downloaded Spark, got it in the IDE to debug it, installed Hadoop binaries, recompiled everything with Scala 2.11 rather than 2.10 and so on. When it’s all working with Cassandra, which is the next milestone I’ll post again, think of this post as a memory jogger on the journey, and the next post should give the quick summary of how to do it.

Notes from the wiki’s

Some key points are that the number of Spark stream consumers will match the kafka topic partitions (receiverless mode introduced in Spark 1.3), and also ensure you understand how cassandra partitions relate to Spark processes. This is so that Kafka can be your lossless scalable stream, and cassandra can hold the values which you are aggregating/mapping from the stream. Other client systems can then request data as and when they want it. Obviously, if your clients want to know when ANYTHING changes then this is a bad pattern - but I’d suggest you actually don’t want your clients coupled to the stream in this way, they simply won’t keep up, throttling the output from streams is usually the reason for Kafka->Spark->Cassandra … ->Clients that poll.

How will it work?

Your publisher of events writes to Kafka topics. This ensures no events are lost. You write a Spark DStream processor, and you deploy it to you Spark cluster. This is a 24/7 process which will now sit there handling the flow, and updating Cassandra which is your golden source.

You do NOT need to hand code persistence for the input data, or for recovery, all that is done for you. However, idempotence on the aggregators is critical (so no double counting for instance after failure), but this is nailed for you as well by the Spark check pointing of the Kafka offsets within the topic

Do read up on upgrading and recovery, as upgrading a 24/7 service is a nightmare if you don’t understand it (I don’t yet with these systems, but I will).

Notes: we are using the receiverless mode introduced in Spark 1.3. This in turn uses the Simple Kafka API which no longer uses ZooKeeper to store each clients offsets.

Kafka

Topic, with sub partitions, each partition assigned to a single consumer. Assures ordering for all clients (for the subset of messages that they get). Messages are assigned to partition by key.

Consumers all have offsets for their partition. This indicates which messages they have read. Note that Spark connector for Kafka uses the checkpoint mechanism within Spark Streaming to store these to avoid a small window for error in certain conditions if Kafka was left to store the offsets in Zookeeper.

Your streaming app HAS to be able to identify that it has already processed a stream and not double count/dual process it after failure. The guide indicates how you can insert a row to show it has already been processed - but make sure this is in the same transactional boundary as any other updates. Ideally you think hard about this up front, optimistic strategies and so on - do you actually care about double processing etc.

cd \java\kafka_2.11-0.9.0.1
bin\windows\zookeeper-server-start.bat config/zookeeper.properties
bin\windows\kafka-server-start.bat config/server.properties

bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1  --partitions 1 --topic test
bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
This is a message
This is another message

bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning

// Test for a standalone connector, this starts a source and a sink and reads 
// from a test input file and write out to a file.
// Actually, if you trust that Kafka works this is pretty dull....
bin\windows\connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

Notes on building the Assembly for scala 2.11

Scala is not binary compatible over major releases, and so I rebuilt Spark for Scala 2.11 from source, which means also rebuilding the spark-streaming lib. For your spark kafka 2.11 you need to ensure that your 2.11 streaming lib is on the path - the simplest way to do this is to slap it into the projects lib directory (but this will cause pain with the assembly plugin later). Better to install the package into your local ivy repo (sbt publishLocal or publishM2)

The other thing you will need is the assembly plugin for SBT to produce a single jar for your spark deployment.

Add file project/assembly.sbt containing:

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")

Ensure the Spark jars are marked as provided, and they will not be bundled.

  "org.apache.spark" %% "spark-core" % "1.6.1" % "provided",

and so on - read how here

Note on sbt-assembly for scala 2.11

On one PC I had to rebuild this locally, but I think that was because I was in a hurry, as on another PC it just worked. If you want to rebuild it then:

git clone https://github.com/sbt/sbt-assembly.git
cd [to the base dir]
sbt clean compile package publishLocal

Being in a mad hurry I don’t remember if building it locally was really necessary. The more I use Scala 2.11 (or 2.12) the more I think everyone should be ready to rebuild from source in Scala projects, treat it as the norm.

The final assembly

Back to my own project, and get sbt to assemble me the jar including the kafka dependency but excluding the ‘provided’ spark-core. Nothing is easy, for a start, a previous hack to dump things in lib was a mistake, so remove that and publishLocal those projects instead - the assembly plugin decided to include them all. And then there is some more horribleness, which means a very small kafka spark fatJar build.sbt needs to end up like below (using the new lazy val multi-project format which is a lot better):

import sbt.Keys._

// Multi project build file.  For val xxx = project, xxx is the name of the project and base dir
lazy val commonSettings = Seq(
  organization := "com.jgibbons",
  version := "0.1.0",
  scalaVersion := "2.11.7"
)

val apacheSparkVersion = "1.6.1"

lazy val sparkexample = (project in file(".")).
  settings(commonSettings: _*).
  settings(
    name := "sparkexample",
    libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "1.6.0-M2",
    libraryDependencies += "org.scalatest"     %% "scalatest" % "latest.integration" % "test",
    libraryDependencies += "org.apache.spark" %% "spark-core" % apacheSparkVersion % "provided",
    libraryDependencies += "org.apache.spark" %% "spark-sql" % apacheSparkVersion % "provided",
    libraryDependencies += "org.apache.spark" %% "spark-streaming" % apacheSparkVersion % "provided",
    libraryDependencies +="org.apache.spark" %% "spark-streaming-kafka" % apacheSparkVersion
  )

// Assembly plug in says do this to exclude scala, ie copy the default, but change the value.
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

// To avoid warnings on Meta-Inf merges, where the default is to take the first, and..
// spark-streaming-kafka has a dependency on org.spark-project.spark unused 1.0.0 but also embeds
// the jar in its artifact, which causes a problem using assembly to create the Spark fatjar
assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case PathList("org","apache","spark","unused","UnusedStubClass.class") => MergeStrategy.first
  case x => (assemblyMergeStrategy in assembly).value(x)
}
set JAVA_OPTS=-Xmx1024m
sbt clean compile package assembly
spark-submit target\scala-2.11\sparkexample-assembly-0.1.0.jar

Gotcha from Scala land

// This works
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
// This does not work.  Made me laugh - not suffered like this since 1996 when I moved away from C++
import org.apache.spark.streaming._
import kafka.serializer.StringDecoder

Does it work yet?

In local mode, everything is fine. Running a master and a slave is not fine, very broken with null ptr exceptions in the worker logs - and finding the logs was more googling.

Research digs up some good stufff on configuration, for instance the configuration options used by spark-submit are found in conf/spark-defaults.conf (whitespace seperator), and you can view them while the job is running (ie in the worker) at http://127.0.0.1:4040 in the environments tab. Port number +1 for each job running on that host.

Go into the conf directory and rename the templates for log4j and spark-defaults so that they have the values you want. If you run spark-submit again you get lots of lovely logs to say whats going wrong.

java.io.FileNotFoundException: File file:/C:/tmp/spark-events does not exist

So create that, and then you get the totally meaningless null pointer exception on start. Apparently on windows, Spark uses the Hadoop winutils to access the native filesystem. So even if you have no plan to use Hadoop you need it installed and you need to set the hadoop.home.dir system property, or %HADOOP_HOME% (which is what I did). Steve Loughran kindly put them into GitHub at: https://github.com/steveloughran/winutils

Finally, I can again submit my long lived compute grid job running against a data fabric. Oh, no, I mean my Spark reactive stream processor against my, err, cassandra.

C:\java\spark-1.6.1-bin-hadoop2.6\bin\spark-submit target\scala-2.11\sparkexample-assembly-1.0-SNAPSHOT.jar

Useful admin commands, once its all up and running

// confirm topics exist
bin\windows\kafka-topics.sh --list --zookeeper localhost:2181
// View whats happening on a multi node
bin\windows\kafka-topics.bat --describe --zookeeper localhost:2181 --topic my-replicated-topic

Running in local Spark mode

It works. Code will be mentioned later, but, basically, it works. So, lets run it on the slave/worker.

Gotchas: Running it on the worker

Kafka did warn me about bundling the jars….

java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaRDDPartition

That was in the worker log for the job. I spent 12 years working with Data Synapse and Platform Symphony in big risk departments. Spark streaming jobs are basically yet another way of deploying a service and making a service call.

So, from the Kafka Spark streaming guide: make sure spark-core_2.10 and spark-streaming_2.10 are marked as provided dependencies and bundle spark-streaming-kafka_2.10 with the build. Use spark-submit to deploy the application.

Interesting places on my PC

To have end to end flow I am running ZooKeeper, Kafka, Cassandra, Spark Master, Spark Slave. Then I start up my event source to write to Kafka, and my slow consumer which polls the aggregate data in Cassandra.

Without configuring anything, where can I find the logs and so on? Note I install most frameworks under c:\java\ which you can see in the paths below:

URL’s

Spark worker UI - see jobs and logs
http://127.0.0.1:8081/
My jobs UI
http://127.0.0.1:4040/

Log files

Cassandra
C:\var\lib\cassandra
C:\var\log\cassandra
C:\Users\Jonathan\.cassandra
ZooKeeper
C:\tmp\zookeeper\version-2
Kafka
C:\java\kafka_2.11-0.9.0.1\config
C:\tmp\kafka-logs
INFO Client environment:java.io.tmpdir=C:\Users\Jonathan\AppData\Local\Temp\ (org.apache.zookeeper.ZooKeeper)
Spark
C:\java\spark-1.6.1-bin-hadoop2.6\work\app-20160418192926-0000\0
sbt
Yes, even SBT gets into the act and sticks some build logs into your roaming temp area on windows:
C:\Users\Jonathan\AppData\Local\Temp\sbtxxxxxxxxxxxxxxxxxx.log

The github code example http://kafka.apache.org/documentation.html#introduction https://zookeeper.apache.org/doc/r3.3.2/zookeeperAdmin.html http://spark.apache.org/docs/latest/streaming-kafka-integration.html Writing output guidance