These are notes for following along on the talk I am giving.
This builds on the gist: https://gist.github.com/epugh/5729071c3b8aab81636d422c391aa716, but is meant to be stand alone!
- This gist is using the latest version of Zeppelin. Replace the ip address
192.168.99.100
with the your docker machine ip. Get it by runningdocker-machine ip
. - Fire up Zeppelin + Spark Master and a Spark Worker via:
docker run -d --name zeppelin -p 8080:8080 dylanmei/zeppelin
- If it doesnt' work, go back to the specific "stable" version of Zeppelin. There is a 1 GB layer in there, watch out!
docker run -d --name zeppelin -p 8080:8080 dylanmei/zeppelin:0.6.0-stable
docker exec -it zeppelin wget http://central.maven.org/maven2/com/lucidworks/spark/spark-solr/2.0.1/spark-solr-2.0.1.jar
docker exec -it zeppelin wget http://central.maven.org/maven2/org/elasticsearch/elasticsearch-hadoop/2.3.2/elasticsearch-hadoop-2.3.2.jar
%dep
z.reset()
z.addRepo("Spark Packages Repo").url("http://dl.bintray.com/spark-packages/maven")
z.load("com.databricks:spark-csv_2.10:1.4.0")
z.load("/zeppelin/spark-solr-2.0.1.jar")
z.load("/zeppelin/elasticsearch-hadoop-2.3.2.jar")
-
Exercise 1: Let's treat Solr as a relational database!
Walk through connecting Zeppelin to Solr via JDBC using Streaming connection.
-
Environment Configuration
Background info on setting up a SolrCloud cluster (required for Streaming) at https://github.com/docker-solr/docker-solr/blob/master/Docker-FAQ.md#can-i-run-zookeeper-and-solr-clusters-under-docker
docker run --name zookeeper -d -p 2181:2181 -p 2888:2888 -p 3888:3888 jplock/zookeeper echo stat | nc 192.168.99.100 2181 docker run --name solr1 --link zookeeper:ZK -d -p 8983:8983 \ solr:6 \ bash -c '/opt/solr/bin/solr start -f -z $ZK_PORT_2181_TCP_ADDR:$ZK_PORT_2181_TCP_PORT' docker run --name solr2 --link zookeeper:ZK -d -p 8984:8983 \ solr:6 \ bash -c '/opt/solr/bin/solr start -f -z $ZK_PORT_2181_TCP_ADDR:$ZK_PORT_2181_TCP_PORT' docker exec -i -t solr1 /opt/solr/bin/solr create_collection \ -c techproducts -shards 2 -p 8983
docker exec -it --user=solr solr1 bash -c "/opt/solr/bin/post -c techproducts /opt/solr/example/exampledocs/*.xml"
curl --data-urlencode 'stmt=SELECT manu_id_s, count(1) FROM techproducts GROUP BY manu_id_s' http://192.168.99.100:8983/solr/techproducts/sql?aggregationMode=facet
-
Set up the Solr JDBC connection in Zeppelin
org.apache.solr:solr-solrj:6.0.0 techproducts.driver=org.apache.solr.client.solrj.io.sql.DriverImpl techproducts.url=jdbc:solr://192.168.99.100:2181?collection=techproducts techproducts.user= techproducts.password=
-
Make a Query
%jdbc(techproducts) SELECT manu_id_s, count(*) FROM techproducts GROUP BY manu_id_s
-
Play with the settings
- Change the sort orders
- flip the count and the
manu_id_s
in the keys
-
-
Exercise 2: So Zeppelin is just a query UI??
Walk through why Zepplin is more then just Tablau, but only if you use Spark!
-
Environment Configuration
We're using the LucidWorks Spark Solr integraiton https://github.com/lucidworks/spark-solr and walking through their NYC Yellow Cab example: https://github.com/lucidworks/spark-solr/blob/master/docs/examples/csv.adoc
-
Setup the Solr Spark connection in Zeppelin Spark interpeter
You can provide the Spark Solr jar via the same interpreter mechanism as you did the Solr JDBC library:
``` com.lucidworks.spark:spark-solr:2.0.1 ```
If you get an error, make sure to restart the Spark interpreter and then do the dependency load mechanism, adding dependencies has to happen before Spark jobs are run!
-
Create Solr Core to put data into
curl -X GET "http://192.168.99.100:8983/solr/admin/collections?action=create&name=test-spark-solr&collection.configName=techproducts&numShards=2&maxShardsPerNode=2"
-
Grab data file onto Zeppelin. Yes, this should be a web request. Don't touch the local file system!
docker exec -it zeppelin wget https://raw.githubusercontent.com/lucidworks/spark-solr/master/src/test/resources/test-data/nyc_yellow_taxi_sample_1k.csv
-
Load data into Solr
These next steps are directly copied from the Spark Solr example (thanks LucidWorks!) At some point this will be a notebook file that you reference at a URL and load directly into your Zeppelin!
%spark val csvFileLocation = "/usr/zeppelin/nyc_yellow_taxi_sample_1k.csv" var csvDF = sqlContext.read.format("com.databricks.spark.csv") .option("header", "true") .option("inferSchema", "true") .load(csvFileLocation) // Filter out invalid lat/lon cols csvDF = csvDF.filter("pickup_latitude >= -90 AND pickup_latitude <= 90 AND pickup_longitude >= -180 AND pickup_longitude <= 180") csvDF = csvDF.filter("dropoff_latitude >= -90 AND dropoff_latitude <= 90 AND dropoff_longitude >= -180 AND dropoff_longitude <= 180") // concat the lat/lon cols into a single value expected by solr location fields csvDF = csvDF.withColumn("pickup", concat_ws(",", col("pickup_latitude"),col("pickup_longitude"))).drop("pickup_latitude").drop("pickup_longitude") csvDF = csvDF.withColumn("dropoff", concat_ws(",", col("dropoff_latitude"),col("dropoff_longitude"))).drop("dropoff_latitude").drop("dropoff_longitude") csvDF.registerTempTable("taxi")
Let's make a query:
%sql select passenger_count,count(*) from taxi group by passenger_count
Now save the data to Solr
%spark val options = Map( "zkhost" -> "192.168.99.100:2181", "collection" -> "test-spark-solr", "gen_uniq_key" -> "true" // Generate unique key if the 'id' field does not exist ) // Write to Solr csvDF.write.format("solr").options(options).mode(org.apache.spark.sql.SaveMode.Overwrite).save
Now make sure the data is committed (yes, this is awkward)!
%spark val client = new org.apache.http.impl.client.DefaultHttpClient() val response = client.execute(new org.apache.http.client.methods.HttpGet("http://192.168.99.100:8983/solr/test-spark-solr/update?commit=true"))
-
Query the data in Solr
Okay, now you can query the data back:
%spark val options = Map( "zkHost" -> "192.168.99.100:2181", "collection" -> "test-spark-solr" ) val taxiDF = sqlContext.read.format("solr").options(options).load taxiDF.printSchema()
This should print out the schema of the data loaded from Solr.
Now load and cache the data (don't forget the cache or bad things happen!)
%spark taxiDF.registerTempTable("trips") taxiDF.cache()
And now query the dataframe using sql:
%sql SELECT avg(tip_amount), avg(fare_amount) FROM trips %sql SELECT max(tip_amount), max(fare_amount) FROM trips WHERE trip_distance > 10
-
-
Exercise 3: So what if I want to transform my data?
-
We also need to tell the %spark interpreter where our ES nodes are by adding
es.nodes =192.168.99.101
to the interpreter. Make sure to addorg.elasticsearch:elasticsearch-hadoop:2.3.2
as well as a dependency. -
Now let's write it out to Elasticsearch. We can put it under any index and type we want!
%spark EsSparkSQL.saveToEs(taxiDF,"taxi/rides")
-
Great, and let's read it back in. We do need to update the %elasticsearch interpreter to point it at our ES node by updating
elasticsearch.host=192.168.99.101
property. Let's query it via:``` %elasticsearch search /taxi { "query": { "match_all": {} } } ```