Solr as SparkSQL DataSource, Part II
Solr as a SparkSQL DataSource Part II
Co-authored with Kiran Chitturi, Lucidworks Data Engineer
Last August, we introduced you to Lucidworks’ spark-solr open source project for integrating Apache Spark and Apache Solr, see: Part I. To recap, we introduced Solr as a SparkSQL Data Source and focused mainly on read / query operations. In this posting, we show how to write data to Solr using the SparkSQL DataFrame API and also dig a little deeper into the advanced features of the library, such as data locality and streaming expression support.
Writing Data to Solr
For this posting, we’re going to use the Movielens 100K dataset found at: http://grouplens.org/datasets/movielens/. After downloading the zip file, extract it locally and take note of the directory, such as /tmp/ml-100k.
Setup Solr and Spark
Download Solr 6.x (6.1 is the latest at this time) and extract the archive to a directory, referred to as $SOLR_INSTALL hereafter. Start it in cloud mode by doing:
cd $SOLR_INSTALL bin/solr -cloud
Create some collections to host our movielens data:
bin/solr create -c movielens_ratings bin/solr create -c movielens_movies bin/solr create -c movielens_users
Also, make sure you’ve installed Apache Spark 1.6.2; see Spark’s getting started instructions for more details. Spark Documentation
Load Data using spark-shell
Start the spark-shell with the spark-solr JAR added to the classpath:
cd $SPARK_HOME ./bin/spark-shell --packages "com.lucidworks.spark:spark-solr:2.1.0"
Let’s load the movielens data into Solr using SparkSQL’s built-in support for reading CSV files. We provide the bulk of the loading code you need below, but you’ll need to specify a few environmental specific variables first. Specifically, declare the path to the directory where you extracted the movielens data, such as:
val dataDir = "/tmp/ml-100k"
Also, verify the zkhost val is set to the correct value for your Solr server.
val zkhost = "localhost:9983"
Next, type :paste into the spark shell so that you can paste in the following block of Scala:
sqlContext.udf.register("toInt", (str: String) => str.toInt) var userDF = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter","|").option("header", "false").load(s"${dataDir}/u.user") userDF.registerTempTable("user") userDF = sqlContext.sql("select C0 as user_id,toInt(C1) as age,C2 as gender,C3 as occupation,C4 as zip_code from user") var writeToSolrOpts = Map("zkhost" -> zkhost, "collection" -> "movielens_users") userDF.write.format("solr").options(writeToSolrOpts).save var itemDF = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter","|").option("header", "false").load(s"${dataDir}/u.item") itemDF.registerTempTable("item") val selectMoviesSQL = """ | SELECT C0 as movie_id, C1 as title, C1 as title_txt_en, | C2 as release_date, C3 as video_release_date, C4 as imdb_url, | C5 as genre_unknown, C6 as genre_action, C7 as genre_adventure, | C8 as genre_animation, C9 as genre_children, C10 as genre_comedy, | C11 as genre_crime, C12 as genre_documentary, C13 as genre_drama, | C14 as genre_fantasy, C15 as genre_filmnoir, C16 as genre_horror, | C17 as genre_musical, C18 as genre_mystery, C19 as genre_romance, | C20 as genre_scifi, C21 as genre_thriller, C22 as genre_war, | C23 as genre_western | FROM item """.stripMargin itemDF = sqlContext.sql(selectMoviesSQL) itemDF.registerTempTable("item") val concatGenreListSQL = """ | SELECT *, | concat(genre_unknown,genre_action,genre_adventure,genre_animation, | genre_children,genre_comedy,genre_crime,genre_documentary, | genre_drama,genre_fantasy,genre_filmnoir,genre_horror, | genre_musical,genre_mystery,genre_romance,genre_scifi, | genre_thriller,genre_war,genre_western) as genre_list | FROM item """.stripMargin itemDF = sqlContext.sql(concatGenreListSQL) // build a multi-valued string field of genres for each movie sqlContext.udf.register("genres", (genres: String) => { var list = scala.collection.mutable.ListBuffer.empty[String] var arr = genres.toCharArray val g = List("unknown","action","adventure","animation","children", "comedy","crime","documentary","drama","fantasy", "filmnoir","horror","musical","mystery","romance", "scifi","thriller","war","western") for (i <- arr.indices) { if (arr(i) == '1') list += g(i) } list }) itemDF.registerTempTable("item") itemDF = sqlContext.sql("select *, genres(genre_list) as genre from item") itemDF = itemDF.drop("genre_list") writeToSolrOpts = Map("zkhost" -> zkhost, "collection" -> "movielens_movies") itemDF.write.format("solr").options(writeToSolrOpts).save sqlContext.udf.register("secs2ts", (secs: Long) => new java.sql.Timestamp(secs*1000)) var ratingDF = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter","t").option("header", "false").load(s"${dataDir}/u.data") ratingDF.registerTempTable("rating") ratingDF = sqlContext.sql("select C0 as user_id, C1 as movie_id, toInt(C2) as rating, secs2ts(C3) as rating_timestamp from rating") ratingDF.printSchema writeToSolrOpts = Map("zkhost" -> zkhost, "collection" -> "movielens_ratings") ratingDF.write.format("solr").options(writeToSolrOpts).save
Hit ctrl-d to execute the Scala code in the paste block. There are a couple of interesting aspects of this code to notice. First, I’m using SQL to select and name the fields I want to insert into Solr from the DataFrame created from the CSV files. Moreover, I can use common SQL functions, such as CONCAT, to perform basic transformations on the data before inserting into Solr. I also use some user-defined functions (UDF) to perform custom transformations, such as collapsing the genre fields into a multi-valued string field that is more appropriate for faceting using a UDF named “genres”. In a nutshell, you have the full power for Scala and SQL to prepare data for indexing.
Also notice that I’m saving the data into three separate collections and not de-normalizing all this data into a single collection on the Solr side, as is common practice when building a search index. With SparkSQL and streaming expressions in Solr, we can quickly join across multiple collections, so we don’t have to de-normalize to support analytical questions we want to answer with this data set. Of course, it may still make sense to de-normalize to support fast Top-N queries where you can’t afford to perform joins in real-time, but for this blog post, it’s not needed. The key take-away here is that you now have more flexibility in joining across collections in Solr, as well as joining with other data sources using SparkSQL.
Notice how we’re writing the resulting DataFrames to Solr using code such as:
var writeToSolrOpts = Map("zkhost" -> zkhost, "collection" -> "movielens_users") userDF.write.format("solr").options(writeToSolrOpts).save
Behind the scenes, the spark-solr project uses the schema of the source DataFrame to define fields in Solr using the Schema API. Of course, if you have special needs for specific fields (i.e. custom text analysis), then you’ll need to predefine them before using Spark to insert rows into Solr.
This also assumes that auto soft-commits are enabled for your Solr collections. If auto soft-commits are not enabled, you can do that using the Solr Config API or just include the soft_commit_secs option when writing to Solr, such as:
var writeToSolrOpts = Map("zkhost" -> zkhost, "collection" -> "movielens_users", "soft_commit_secs" -> "10")
One caveat is if the schema of the DataFrame you’re indexing is not correct, then the spark-solr code will create the field in Solr with incorrect field type. For instance, I didn’t convert the rating field into a numeric type on my first iteration, which resulted in it getting indexed as a string. As a result, I was not able to perform any aggregations on the Solr side, such as computing the average rating of action movies for female reviewers in Boston. After correcting the issue on the Spark side, the field was already incorrectly defined in Solr, so I had to use the Solr Schema API to drop and re-create the field definition with the correct data type. The key take-away here is that seemingly minor data type issues in the source data can lead to confusing issues when working with the data in Solr.
In this example, we’re using Spark’s CSV DataSource, but you can write any DataFrame to Solr. This means that you can read data from any SparkSQL DataSource, such as Cassandra or MongoDB, and write to Solr using the same approach as what is shown here. You can even use SparkSQL as a more performant replacement of Solr’s Data Import Handler (DIH) for indexing data from an RDBMS; we show an example of this in the Performance section below.
Ok, so now you have some data loaded into Solr and everything setup correctly to query from Spark. Now let’s dig into some of the additional features of the spark-solr library that we didn’t cover in the previous blog post.
Analyzing Solr Data with Spark
Before you can analyze data in Solr, you need to load it into Spark as a DataFrame, which was covered in the first blog post in this series. Run the following code in the spark-shell to read the movielens data from Solr:
var ratings = sqlContext.read.format("solr").options(Map("zkhost" -> zkhost, "collection" -> "movielens_ratings")).load ratings.printSchema ratings.registerTempTable("ratings") var users = sqlContext.read.format("solr").options(Map("zkhost" -> zkhost, "collection" -> "movielens_users")).load users.printSchema users.registerTempTable("users") sqlContext.cacheTable("users") var movies = sqlContext.read.format("solr").options(Map("zkhost" -> zkhost, "collection" -> "movielens_movies")).load movies.printSchema movies.registerTempTable("movies") sqlContext.cacheTable("movies")
Joining Solr Data with SQL
Here is an example query you can send to Solr from the spark-shell to explore the dataset:
sqlContext.sql(""" | SELECT u.gender as gender, COUNT(*) as num_ratings, avg(r.rating) as avg_rating | FROM ratings r, users u, movies m | WHERE m.movie_id = r.movie_id | AND r.user_id = u.user_id | AND m.genre='romance' AND u.age > 30 | GROUP BY gender | ORDER BY num_ratings desc """.stripMargin).show
NOTE: You may notice a slight delay in executing this query as Spark needs to distribute the spark-solr library to the executor process(es).
In this query, we’re joining data from three different Solr collections and performing an aggregation on the result. To be clear, we’re loading the rows of all three Solr collections into Spark and then relying on Spark to perform the join and aggregation on the raw rows.
Solr 6.x also has the ability to execute basic SQL. But at the time of this writing, it doesn’t support a broad enough feature set to be generally useful as an analytics tool. However, you should think of SparkSQL and Solr’s parallel SQL engine as complementary technologies in that it is usually more efficient to push aggregation requests down into the engine where the data lives, especially when the aggregation can be computed using Solr’s faceting engine. For instance, consider the following SQL query that performs a join on the results of a sub-query that returns aggregated rows.
SELECT m.title as title, solr.aggCount as aggCount FROM movies m INNER JOIN (SELECT movie_id, COUNT(*) as aggCount FROM ratings WHERE rating >= 4 GROUP BY movie_id ORDER BY aggCount desc LIMIT 10) as solr ON solr.movie_id = m.movie_id ORDER BY aggCount DESC
It turns out that the sub-query aliased here as “solr” can be evaluated on the Solr side using the facet engine, which as we all know is one of the most powerful and mature features in Solr. The sub-query:
SELECT movie_id, COUNT(*) as aggCount FROM ratings WHERE rating='[4 TO *]' GROUP BY movie_id ORDER BY aggCount desc LIMIT 10
Is effectively the same as doing:
/select?q=*:* &fq=rating_i:[4 TO *] &facet=true &facet.limit=10 &facet.mincount=1 &facet.field=movie_id
Consequently, what would be nice is if the spark-solr library could detect when aggregations can be pushed down into Solr to avoid loading the raw rows into Spark. Unfortunately, this functionality is not yet supported by Spark, see: SPARK-12449. As that feature set evolves in Spark, we’ll add it to spark-solr. However, we’re also investigating using some of Spark’s experimental APIs to weave push-down optimizations into the query planning process, so stay tuned for updates on this soon. In the meantime, you can perform this optimization in your client application by detecting when sub-queries can be pushed down into Solr’s parallel SQL engine and then re-writing your queries to use the results of the push-down operation. We’ll leave it as an exercise for the user for now and move on to using streaming expressions from Spark.
Streaming Expressions
Streaming expressions are one of the more exciting features in Solr 6.x. We’ll refer you to the Solr Reference Guide for details about streaming expressions, but let’s take a look at an example showing how to use streaming expressions with Spark:
val streamingExpr = """ parallel(movielens_ratings, hashJoin( search(movielens_ratings, q="*:*", fl="movie_id,user_id,rating", sort="movie_id asc", qt="/export", partitionKeys="movie_id"), hashed=search(movielens_movies, q="*:*", fl="movie_id,title", sort="movie_id asc", qt="/export", partitionKeys="movie_id"), on="movie_id" ), workers="1", sort="movie_id asc" ) """ var opts = Map( "zkhost" -> zkhost, "collection" -> "movielens_ratings", "expr" -> streamingExpr ) var ratings = sqlContext.read.format("solr").options(opts).load ratings.printSchema ratings.show
Notice that instead of just reading all rows from the movielens_ratings collection, we’re asking the spark-solr framework to execute a streaming expression and then expose the results as a DataFrame. Specifically in this case, we’re asking Solr to perform a hashJoin of the movies collection with the ratings collection to generate a new relation that includes movie_id, title, user_id, and rating. Recall that a DataFrame is an RDD[Row] and a schema. The spark-solr framework handles turning a streaming expression into a SparkSQL schema automatically. Here’s another example the uses Solr’s facet/stats engine to compute the average rating per genre:
val facetExpr = """ facet(movielens_movies, q="*:*", buckets="genre", bucketSorts="count(*) desc", bucketSizeLimit=100, count(*)) """ val opts = Map( "zkhost" -> zkhost, "collection" -> "movielens_movies", "expr" -> facetExpr ) var genres = sqlContext.read.format("solr").options(opts).load genres.printSchema genres.show
Unlike the previous SQL example, the aggregation is pushed down into Solr’s aggregation engine and only a small set of aggregated rows are returned to Spark. Smaller RDDs can be cached and broadcast around the Spark cluster to perform in-memory computations, such as joining to a larger dataset.
There are a few caveats to be aware of when using streaming expressions and spark-solr. First, until Solr 6.2 is released, you cannot use the export handler to retrieve timestamp or boolean fields, see SOLR-9187. In addition, we don’t currently support the gatherNodes stream source as it’s unclear how to map the graph-oriented results into a DataFrame, but we’re always interested in use cases where gatherNodes might be useful.
So now you have the full power of Solr’s query, facet, and streaming expression engines available to Spark. Next, let’s look at one more cool feature that opens up analytics on your Solr data to any JDBC compliant BI / dashboard tool.
Accessing Solr from Spark’s distributed SQL Engine and JDBC
Spark provides a thrift-based distributed SQL engine (built on HiveServer2) to allow client applications to execute SQL against Spark using JDBC. Since the spark-solr framework exposes Solr as a SparkSQL data source, you can easily execute queries using JDBC against Solr. Of course we’re aware that Solr provides its own JDBC driver now, but it’s based on the Solr SQL implementation, which as we’ve discussed is still maturing and does not provide the data type and analytic support needed by most applications.
First, you’ll need to start the thrift server with the –jars option to add the spark-solr shaded JAR to the classpath. In addition, we recommend running the thrift server with the following configuration option to allow multiple JDBC connections (such as those served from a connection pool) to share cached data and temporary tables:
--conf spark.sql.hive.thriftServer.singleSession=true
For example, here’s how I started the thrift server on my Mac.
sbin/start-thriftserver.sh --master local[4] --jars spark-solr/target/spark-solr-2.1.0-shaded.jar --executor-memory 2g --conf spark.sql.hive.thriftServer.singleSession=true --conf spark.driver.extraJavaOptions="-Dsolr.zkhost=localhost:2181/solr610"
Notice I’m also using the spark.driver.extraJavaOptions config property to set the zkhost as a Java system property for the thrift server. This alleviates the need for client applications to pass in the zkhost as part of the options when loading the Solr data source.
Use the following SQL command to initialize the Solr data source to query the movielens_ratings collection:
CREATE TEMPORARY TABLE ratings USING solr OPTIONS ( collection "movielens_ratings" )
Note that the required zkhost property will be resolved from the Java System property I set when starting the thrift server above. We feel this is a better design in that your client application only needs to know the JDBC URL and not the Solr ZooKeeper connection string. Now you have a temporary table backed by the movielens_ratings collection in Solr that you can execute SQL statements against using Spark’s JDBC driver. Here’s some Java code that uses the JDBC API to connect to Spark’s distributed SQL engine and execute the same query we ran above from the spark-shell:
import java.sql.*; public class SparkJdbc { public static void main(String[] args) throws Exception { String driverName = "org.apache.hive.jdbc.HiveDriver"; String jdbcUrl = "jdbc:hive2://localhost:10000/default"; String jdbcUser = "???"; String jdbcPass = "???"; Class.forName(driverName); Connection conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPass); Statement stmt = null; ResultSet rs = null; try { stmt = conn.createStatement(); stmt.execute("CREATE TEMPORARY TABLE movies USING solr OPTIONS (collection "movielens_movies")"); stmt.execute("CREATE TEMPORARY TABLE users USING solr OPTIONS (collection "movielens_users")"); stmt.execute("CREATE TEMPORARY TABLE ratings USING solr OPTIONS (collection "movielens_ratings")"); rs = stmt.executeQuery("SELECT u.gender as gender, COUNT(*) as num_ratings, avg(r.rating) as avg_rating "+ "FROM ratings r, users u, movies m WHERE m.movie_id = r.movie_id AND r.user_id = u.user_id AND m.genre='romance' "+ " AND u.age > 30 GROUP BY gender ORDER BY num_ratings desc"); int rows = 0; while (rs.next()) { ++rows; // TODO: do something with each row } } finally { if (rs != null) rs.close(); if (stmt != null) stmt.close(); if (conn != null) conn.close(); } } }
Data Locality
If the Spark executor and Solr replica live on the same physical host, SolrRDD provides faster query execution time using the Data Locality feature. During the partition creation, SolrRDD provides the placement preference option of running on the same node where the replica exists. This saves the overhead of sending the data across the network between different nodes.
Performance
Before we wrap up this blog post, we wanted to share our results from running a performance experiment to see how well this solution scales. Specifically, we wanted to measure the time taken to index data from Spark to Solr and also the time taken to query Solr from Spark using the NYC green taxi trip dataset between 2013-2015. The data is loaded onto an Postgres RDS instance in AWS. We used the Solr scale toolkit (solr-scale-tk) to deploy a 3-node Lucidworks Fusion cluster, which includes Apache Spark and Solr. More details are available at https://gist.github.com/kiranchitturi/0be62fc13e4ec7f9ae5def53180ed181
Setup
- 3 EC2 nodes of r3.2xlarge instances running Amazon Linux and deployed in the same placement group
- Solr nodes and Spark worker processes are co-located together on the same host
- Solr collection ‘nyc-taxi’ created with 6 shards (no replication)
- Total number of rows ‘91748362’ in the database
Writing to Solr
The documents are loaded from the RDS instance and indexed to Solr using the spark-shell script. 91.49M rows are indexed to Solr in 49 minutes.
- Docs per second: 31.1K
- JDBC batch size: 5000
- Solr indexing batch size: 50000
- Partitions: 200
Reading from Solr
The full collection dump from Solr to Spark is performed in two ways. To be able to test the streaming expressions, we chose a simple query that only uses fields with docValues. The result set includes all the documents present in the ‘nyc-taxi’ collection (91.49M)
Deep paging with split queries using Cursor Marks
- Docs per second (per task): 6350
- Total time taken: 20 minutes
- Partitions: 120
Streaming using the export handler
- Docs per second (per task): 108.9k
- Total time taken: 2.3 minutes
- Partitions: 6
Full data dumps from Spark to Solr using the JDBC datasource is faster than the traditional DIH approach. Streaming using the export handler is ~10 times faster than the traditional deep paging. Using DocValues gives us this performance benefit.
We hope this post gave you some insights into using Apache Spark and Apache Solr for analytics. There are a number of other interesting features of the library that we could not cover in this post, so we encourage you to explore the code base in more detail: github.com/lucidworks/spark-solr. Also, if you’re interested in learning more about how to use Spark and Solr to address your big data needs, please attend Timothy Potter’s talk at this year’s Lucene Revolution: Your Big Data Stack is Too Big.
Best of the Month. Straight to Your Inbox!
Dive into the best content with our monthly Roundup Newsletter!
Each month, we handpick the top stories, insights, and updates to keep you in the know.