Solr als SparkSQL-Datenquelle, Teil II
Solr als SparkSQL-Datenquelle Teil II Gemeinsam verfasst mit Kiran Chitturi, Lucidworks Data Engineer Im vergangenen August haben wir Ihnen das…
Solr als SparkSQL-Datenquelle Teil II
Gemeinsam verfasst mit Kiran Chitturi, Lucidworks Data Engineer
Im vergangenen August haben wir Ihnen das Open-Source-Projekt spark-solr von Lucidworks zur Integration von Apache Spark und Apache Solr vorgestellt, siehe Teil I: Teil I. Zur Erinnerung: Wir haben Solr als SparkSQL-Datenquelle vorgestellt und uns dabei hauptsächlich auf Lese- und Abfrageoperationen konzentriert. In diesem Beitrag zeigen wir Ihnen, wie Sie mit der SparkSQL DataFrame API Daten in Solr schreiben können. Außerdem gehen wir etwas tiefer in die fortgeschrittenen Funktionen der Bibliothek ein, wie z.B. Datenlokalität und Unterstützung von Streaming-Ausdrücken.
Daten in Solr schreiben
Für diesen Beitrag werden wir den Movielens 100K-Datensatz verwenden, den Sie unter: http://grouplens.org/datasets/movielens/ finden. Nachdem Sie die Zip-Datei heruntergeladen haben, entpacken Sie sie lokal und notieren Sie sich das Verzeichnis, z. B. /tmp/ml-100k.
Solr und Spark einrichten
Laden Sie Solr 6.x herunter (6.1 ist derzeit die neueste Version) und entpacken Sie das Archiv in ein Verzeichnis, das im Folgenden als $SOLR_INSTALL bezeichnet wird. Starten Sie es im Cloud-Modus, indem Sie Folgendes tun:
cd $SOLR_INSTALL bin/solr -cloud
Erstellen Sie einige Sammlungen, die unsere Movielens-Daten enthalten:
bin/solr create -c movielens_ratings bin/solr create -c movielens_movies bin/solr create -c movielens_users
Vergewissern Sie sich außerdem, dass Sie Apache Spark 1.6.2 installiert haben. Weitere Informationen finden Sie in der Anleitung für die ersten Schritte von Spark. Spark-Dokumentation
Daten mit spark-shell laden
Starten Sie die spark-shell mit dem spark-solr JAR, das dem Klassenpfad hinzugefügt wurde:
cd $SPARK_HOME ./bin/spark-shell --packages "com.lucidworks.spark:spark-solr:2.1.0"
Lassen Sie uns die movielens-Daten in Solr laden, indem wir die integrierte Unterstützung von SparkSQL für das Lesen von CSV-Dateien verwenden. Den Großteil des Ladecodes stellen wir Ihnen weiter unten zur Verfügung, aber Sie müssen zunächst einige umgebungsspezifische Variablen angeben. Geben Sie insbesondere den Pfad zu dem Verzeichnis an, in das Sie die movielens-Daten extrahiert haben, z.B:
val dataDir = "/tmp/ml-100k"
Stellen Sie außerdem sicher, dass zkhost val auf den richtigen Wert für Ihren Solr-Server eingestellt ist.
val zkhost = "localhost:9983"
Als nächstes geben Sie :paste in die Spark-Shell ein, so dass Sie den folgenden Scala-Block einfügen können:
sqlContext.udf.register("toInt", (str: String) => str.toInt) var userDF = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter","|").option("header", "false").load(s"${dataDir}/u.user") userDF.registerTempTable("user") userDF = sqlContext.sql("select C0 as user_id,toInt(C1) as age,C2 as gender,C3 as occupation,C4 as zip_code from user") var writeToSolrOpts = Map("zkhost" -> zkhost, "collection" -> "movielens_users") userDF.write.format("solr").options(writeToSolrOpts).save var itemDF = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter","|").option("header", "false").load(s"${dataDir}/u.item") itemDF.registerTempTable("item") val selectMoviesSQL = """ | SELECT C0 as movie_id, C1 as title, C1 as title_txt_en, | C2 as release_date, C3 as video_release_date, C4 as imdb_url, | C5 as genre_unknown, C6 as genre_action, C7 as genre_adventure, | C8 as genre_animation, C9 as genre_children, C10 as genre_comedy, | C11 as genre_crime, C12 as genre_documentary, C13 as genre_drama, | C14 as genre_fantasy, C15 as genre_filmnoir, C16 as genre_horror, | C17 as genre_musical, C18 as genre_mystery, C19 as genre_romance, | C20 as genre_scifi, C21 as genre_thriller, C22 as genre_war, | C23 as genre_western | FROM item """.stripMargin itemDF = sqlContext.sql(selectMoviesSQL) itemDF.registerTempTable("item") val concatGenreListSQL = """ | SELECT *, | concat(genre_unknown,genre_action,genre_adventure,genre_animation, | genre_children,genre_comedy,genre_crime,genre_documentary, | genre_drama,genre_fantasy,genre_filmnoir,genre_horror, | genre_musical,genre_mystery,genre_romance,genre_scifi, | genre_thriller,genre_war,genre_western) as genre_list | FROM item """.stripMargin itemDF = sqlContext.sql(concatGenreListSQL) // build a multi-valued string field of genres for each movie sqlContext.udf.register("genres", (genres: String) => { var list = scala.collection.mutable.ListBuffer.empty[String] var arr = genres.toCharArray val g = List("unknown","action","adventure","animation","children", "comedy","crime","documentary","drama","fantasy", "filmnoir","horror","musical","mystery","romance", "scifi","thriller","war","western") for (i <- arr.indices) { if (arr(i) == '1') list += g(i) } list }) itemDF.registerTempTable("item") itemDF = sqlContext.sql("select *, genres(genre_list) as genre from item") itemDF = itemDF.drop("genre_list") writeToSolrOpts = Map("zkhost" -> zkhost, "collection" -> "movielens_movies") itemDF.write.format("solr").options(writeToSolrOpts).save sqlContext.udf.register("secs2ts", (secs: Long) => new java.sql.Timestamp(secs*1000)) var ratingDF = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter","t").option("header", "false").load(s"${dataDir}/u.data") ratingDF.registerTempTable("rating") ratingDF = sqlContext.sql("select C0 as user_id, C1 as movie_id, toInt(C2) as rating, secs2ts(C3) as rating_timestamp from rating") ratingDF.printSchema writeToSolrOpts = Map("zkhost" -> zkhost, "collection" -> "movielens_ratings") ratingDF.write.format("solr").options(writeToSolrOpts).save
Drücken Sie ctrl-d, um den Scala-Code im Einfügeblock auszuführen. An diesem Code gibt es einige interessante Aspekte zu beachten. Erstens verwende ich SQL, um die Felder auszuwählen und zu benennen, die ich aus dem aus den CSV-Dateien erstellten DataFrame in Solr einfügen möchte. Außerdem kann ich gängige SQL-Funktionen wie CONCAT verwenden, um die Daten vor dem Einfügen in Solr grundlegend umzuwandeln. Ich verwende auch einige benutzerdefinierte Funktionen (UDF), um benutzerdefinierte Transformationen durchzuführen, wie z.B. das Zusammenfassen der Genre-Felder in ein mehrwertiges String-Feld, das sich besser für die Facettierung eignet, indem ich eine UDF namens „genres“ verwende. Kurz gesagt, Sie haben alle Möglichkeiten von Scala und SQL, um Daten für die Indizierung vorzubereiten.
Beachten Sie auch, dass ich die Daten in drei separaten Sammlungen speichere und nicht alle diese Daten in eine einzige Sammlung auf der Solr-Seite de-normalisiere, wie es beim Aufbau eines Suchindexes üblich ist. Mit SparkSQL und Streaming-Ausdrücken in Solr können wir schnell mehrere Sammlungen zusammenführen, so dass wir nicht de-normalisieren müssen, um analytische Fragen zu unterstützen, die wir mit diesem Datensatz beantworten möchten. Natürlich kann es immer noch sinnvoll sein, zu de-normalisieren, um schnelle Top-N-Abfragen zu unterstützen, bei denen Sie es sich nicht leisten können, Joins in Echtzeit durchzuführen, aber für diesen Blogbeitrag ist das nicht erforderlich. Das Wichtigste dabei ist, dass Sie jetzt mehr Flexibilität bei der Verknüpfung von Sammlungen in Solr sowie bei der Verknüpfung mit anderen Datenquellen mit SparkSQL haben.
Beachten Sie, dass wir die resultierenden DataFrames mit Code wie diesem in Solr schreiben:
var writeToSolrOpts = Map("zkhost" -> zkhost, "collection" -> "movielens_users") userDF.write.format("solr").options(writeToSolrOpts).save
Hinter den Kulissen verwendet das Projekt spark-solr das Schema des Quell-DataFrame, um Felder in Solr mithilfe der Schema-API zu definieren. Wenn Sie spezielle Anforderungen an bestimmte Felder haben (z.B. eine benutzerdefinierte Textanalyse), müssen Sie diese natürlich vordefinieren, bevor Sie Spark verwenden, um Zeilen in Solr einzufügen.
Dies setzt außerdem voraus, dass automatische Soft-Commits für Ihre Solr-Sammlungen aktiviert sind. Wenn automatische Soft-Commits nicht aktiviert sind, können Sie dies über die Solr Config API einstellen oder einfach die Option soft_commit_secs beim Schreiben in Solr angeben, z.B.:
var writeToSolrOpts = Map("zkhost" -> zkhost, "collection" -> "movielens_users", "soft_commit_secs" -> "10")
Wenn das Schema des DataFrame, den Sie indizieren, nicht korrekt ist, wird der spark-solr Code das Feld in Solr mit dem falschen Feldtyp erstellen. Ich habe zum Beispiel das Feld für die Bewertung bei der ersten Iteration nicht in einen numerischen Typ umgewandelt, so dass es als String indiziert wurde. Infolgedessen konnte ich auf der Solr-Seite keine Aggregationen durchführen, wie z.B. die Berechnung der durchschnittlichen Bewertung von Actionfilmen für weibliche Rezensenten in Boston. Nachdem ich das Problem auf der Spark-Seite behoben hatte, war das Feld in Solr bereits falsch definiert, so dass ich die Solr Schema API verwenden musste, um die Felddefinition zu löschen und mit dem richtigen Datentyp neu zu erstellen. Die wichtigste Erkenntnis hier ist, dass scheinbar unbedeutende Datentypprobleme in den Quelldaten zu verwirrenden Problemen bei der Arbeit mit den Daten in Solr führen können.
In diesem Beispiel verwenden wir die CSV-DataSource von Spark, aber Sie können jeden DataFrame in Solr schreiben. Das bedeutet, dass Sie Daten aus jeder SparkSQL DataSource, wie Cassandra oder MongoDB, lesen und mit dem gleichen Ansatz wie hier gezeigt in Solr schreiben können. Sie können SparkSQL sogar als leistungsfähigeren Ersatz für den Data Import Handler (DIH) von Solr zur Indizierung von Daten aus einem RDBMS verwenden; ein Beispiel dafür finden Sie im Abschnitt Leistung weiter unten.
Ok, jetzt haben Sie also einige Daten in Solr geladen und alles korrekt für die Abfrage aus Spark eingerichtet. Lassen Sie uns nun einige der zusätzlichen Funktionen der spark-solr Bibliothek kennenlernen, die wir im vorherigen Blogbeitrag nicht behandelt haben.
Analysieren von Solr-Daten mit Spark
Bevor Sie die Daten in Solr analysieren können, müssen Sie sie als DataFrame in Spark laden, was im ersten Blogbeitrag dieser Serie behandelt wurde. Führen Sie den folgenden Code in der Spark-Shell aus, um die movielens-Daten aus Solr zu lesen:
var ratings = sqlContext.read.format("solr").options(Map("zkhost" -> zkhost, "collection" -> "movielens_ratings")).load ratings.printSchema ratings.registerTempTable("ratings") var users = sqlContext.read.format("solr").options(Map("zkhost" -> zkhost, "collection" -> "movielens_users")).load users.printSchema users.registerTempTable("users") sqlContext.cacheTable("users") var movies = sqlContext.read.format("solr").options(Map("zkhost" -> zkhost, "collection" -> "movielens_movies")).load movies.printSchema movies.registerTempTable("movies") sqlContext.cacheTable("movies")
Verknüpfung von Solr-Daten mit SQL
Hier ist eine Beispielabfrage, die Sie von der Spark-Shell an Solr senden können, um den Datensatz zu erkunden:
sqlContext.sql(""" | SELECT u.gender as gender, COUNT(*) as num_ratings, avg(r.rating) as avg_rating | FROM ratings r, users u, movies m | WHERE m.movie_id = r.movie_id | AND r.user_id = u.user_id | AND m.genre='romance' AND u.age > 30 | GROUP BY gender | ORDER BY num_ratings desc """.stripMargin).show
HINWEIS: Möglicherweise bemerken Sie eine leichte Verzögerung bei der Ausführung dieser Abfrage, da Spark die spark-solr-Bibliothek an den/die ausführenden Prozess(e) verteilen muss.
In dieser Abfrage fügen wir Daten aus drei verschiedenen Solr-Sammlungen zusammen und führen eine Aggregation mit dem Ergebnis durch. Wir laden also die Zeilen aller drei Solr-Sammlungen in Spark und verlassen uns dann auf Spark, um die Verknüpfung und Aggregation der Rohzeilen durchzuführen.
Solr 6.x bietet auch die Möglichkeit, einfache SQL-Anweisungen auszuführen. Zum Zeitpunkt des Verfassens dieses Artikels ist der Funktionsumfang jedoch noch nicht groß genug, um allgemein als Analysetool nützlich zu sein. Sie sollten jedoch SparkSQL und die parallele SQL-Engine von Solr als komplementäre Technologien betrachten, da es in der Regel effizienter ist, Aggregationsanfragen in die Engine zu verlagern, in der sich die Daten befinden, insbesondere wenn die Aggregation mit der Facetten-Engine von Solr berechnet werden kann. Betrachten Sie zum Beispiel die folgende SQL-Abfrage, die eine Verknüpfung mit den Ergebnissen einer Unterabfrage durchführt, die aggregierte Zeilen liefert.
SELECT m.title as title, solr.aggCount as aggCount FROM movies m INNER JOIN (SELECT movie_id, COUNT(*) as aggCount FROM ratings WHERE rating >= 4 GROUP BY movie_id ORDER BY aggCount desc LIMIT 10) as solr ON solr.movie_id = m.movie_id ORDER BY aggCount DESC
Es stellt sich heraus, dass die Unterabfrage, die hier als „solr“ bezeichnet wird, auf der Solr-Seite mit Hilfe der Facetten-Engine ausgewertet werden kann, die, wie wir alle wissen, eine der leistungsstärksten und ausgereiftesten Funktionen in Solr ist. Die Unterabfrage:
SELECT movie_id, COUNT(*) as aggCount FROM ratings WHERE rating='[4 TO *]' GROUP BY movie_id ORDER BY aggCount desc LIMIT 10
Ist praktisch dasselbe wie tun:
/select?q=*:* &fq=rating_i:[4 TO *] &facet=true &facet.limit=10 &facet.mincount=1 &facet.field=movie_id
Es wäre daher schön, wenn die spark-solr Bibliothek erkennen könnte, wann Aggregationen in Solr gepusht werden können, um das Laden der Rohzeilen in Spark zu vermeiden. Leider wird diese Funktionalität von Spark noch nicht unterstützt, siehe: SPARK-12449. Wenn sich diese Funktion in Spark weiterentwickelt, werden wir sie zu spark-solr hinzufügen. Wir untersuchen jedoch auch die Verwendung einiger der experimentellen APIs von Spark, um Push-Down-Optimierungen in den Abfrageplanungsprozess einzubinden. In der Zwischenzeit können Sie diese Optimierung in Ihrer Client-Anwendung durchführen, indem Sie feststellen, wann Unterabfragen in die parallele SQL-Engine von Solr verschoben werden können, und dann Ihre Abfragen so umschreiben, dass sie die Ergebnisse der Push-Down-Operation verwenden. Wir belassen es vorerst bei einer Übung für den Benutzer und fahren mit der Verwendung von Streaming-Ausdrücken von Spark fort.
Streaming-Ausdrücke
Streaming-Ausdrücke sind eine der aufregendsten Funktionen in Solr 6.x. Wir verweisen Sie auf das Solr-Referenzhandbuch, um mehr über Streaming-Ausdrücke zu erfahren, aber sehen wir uns ein Beispiel an, das zeigt, wie Sie Streaming-Ausdrücke mit Spark verwenden können:
val streamingExpr = """ parallel(movielens_ratings, hashJoin( search(movielens_ratings, q="*:*", fl="movie_id,user_id,rating", sort="movie_id asc", qt="/export", partitionKeys="movie_id"), hashed=search(movielens_movies, q="*:*", fl="movie_id,title", sort="movie_id asc", qt="/export", partitionKeys="movie_id"), on="movie_id" ), workers="1", sort="movie_id asc" ) """ var opts = Map( "zkhost" -> zkhost, "collection" -> "movielens_ratings", "expr" -> streamingExpr ) var ratings = sqlContext.read.format("solr").options(opts).load ratings.printSchema ratings.show
Beachten Sie, dass wir nicht einfach alle Zeilen aus der Sammlung movielens_ratings lesen, sondern das Spark-Solr-Framework bitten, einen Streaming-Ausdruck auszuführen und die Ergebnisse dann als DataFrame bereitzustellen. In diesem Fall bitten wir Solr, einen HashJoin der Sammlung movies mit der Sammlung ratings durchzuführen, um eine neue Relation zu erzeugen, die movie_id, title, user_id und rating enthält. Sie erinnern sich, dass ein DataFrame ein RDD[Row] und ein Schema ist. Das spark-solr Framework wandelt einen Streaming-Ausdruck automatisch in ein SparkSQL-Schema um. Hier ist ein weiteres Beispiel, das die Facetten-/Statistik-Engine von Solr verwendet, um die durchschnittliche Bewertung pro Genre zu berechnen:
val facetExpr = """ facet(movielens_movies, q="*:*", buckets="genre", bucketSorts="count(*) desc", bucketSizeLimit=100, count(*)) """ val opts = Map( "zkhost" -> zkhost, "collection" -> "movielens_movies", "expr" -> facetExpr ) var genres = sqlContext.read.format("solr").options(opts).load genres.printSchema genres.show
Im Gegensatz zum vorherigen SQL-Beispiel wird die Aggregation in die Aggregations-Engine von Solr verlagert und nur ein kleiner Satz aggregierter Zeilen wird an Spark zurückgegeben. Kleinere RDDs können zwischengespeichert und über den Spark-Cluster verteilt werden, um In-Memory-Berechnungen durchzuführen, z. B. die Verknüpfung mit einem größeren Datensatz.
Bei der Verwendung von Streaming-Ausdrücken und spark-solr gibt es ein paar Einschränkungen zu beachten. Erstens können Sie bis zur Veröffentlichung von Solr 6.2 den Export-Handler nicht verwenden, um Zeitstempel oder boolesche Felder abzurufen, siehe SOLR-9187. Außerdem unterstützen wir derzeit die Stream-Quelle gatherNodes nicht, da unklar ist, wie die graph-orientierten Ergebnisse in einen DataFrame gemappt werden können, aber wir sind immer an Anwendungsfällen interessiert, in denen gatherNodes nützlich sein könnte.
Jetzt haben Sie also die volle Leistung der Abfrage-, Facetten- und Streaming Expression-Engines von Solr für Spark zur Verfügung. Als Nächstes sehen wir uns eine weitere coole Funktion an, die die Analyse Ihrer Solr-Daten für jedes JDBC-konforme BI-/Dashboard-Tool öffnet.
Zugriff auf Solr über die verteilte SQL-Engine von Spark und JDBC
Spark bietet eine Thrift-basierte verteilte SQL-Engine (aufbauend auf HiveServer2), die es Client-Anwendungen ermöglicht, über JDBC SQL gegen Spark auszuführen. Da das spark-solr-Framework Solr als SparkSQL-Datenquelle bereitstellt, können Sie mit JDBC problemlos Abfragen gegen Solr ausführen. Natürlich wissen wir, dass Solr jetzt einen eigenen JDBC-Treiber bereitstellt, aber dieser basiert auf der Solr SQL-Implementierung, die, wie wir bereits erwähnt haben, noch nicht ausgereift ist und nicht die Datentypen und die analytische Unterstützung bietet, die die meisten Anwendungen benötigen.
Zunächst müssen Sie den thrift Server mit der Option –jars starten, um das spark-solr shaded JAR zum Klassenpfad hinzuzufügen. Außerdem empfehlen wir, den Thrift-Server mit der folgenden Konfigurationsoption zu starten, damit mehrere JDBC-Verbindungen (z.B. solche, die von einem Verbindungspool bedient werden) die zwischengespeicherten Daten und temporären Tabellen gemeinsam nutzen können:
--conf spark.sql.hive.thriftServer.singleSession=true
So habe ich zum Beispiel den Thrift-Server auf meinem Mac gestartet.
sbin/start-thriftserver.sh --master local[4] --jars spark-solr/target/spark-solr-2.1.0-shaded.jar --executor-memory 2g --conf spark.sql.hive.thriftServer.singleSession=true --conf spark.driver.extraJavaOptions="-Dsolr.zkhost=localhost:2181/solr610"
Beachten Sie, dass ich auch die Konfigurationseigenschaft spark.driver.extraJavaOptions verwende, um den zkhost als Java-Systemeigenschaft für den Thrift-Server festzulegen. Dadurch müssen Client-Anwendungen den zkhost nicht mehr als Teil der Optionen beim Laden der Solr-Datenquelle übergeben.
Verwenden Sie den folgenden SQL-Befehl, um die Solr-Datenquelle zu initialisieren und die Sammlung movielens_ratings abzufragen:
CREATE TEMPORARY TABLE ratings USING solr OPTIONS ( collection "movielens_ratings" )
Beachten Sie, dass die erforderliche zkhost-Eigenschaft aus der Java-System-Eigenschaft aufgelöst wird, die ich oben beim Start des Thrift-Servers festgelegt habe. Wir halten dies für ein besseres Design, da Ihre Client-Anwendung nur die JDBC-URL und nicht den Solr ZooKeeper-Verbindungsstring kennen muss. Jetzt haben Sie eine temporäre Tabelle, die auf der movielens_ratings-Sammlung in Solr basiert und gegen die Sie mit dem JDBC-Treiber von Spark SQL-Anweisungen ausführen können. Hier finden Sie einen Java-Code, der die JDBC-API verwendet, um sich mit der verteilten SQL-Engine von Spark zu verbinden und die gleiche Abfrage auszuführen, die wir oben von der Spark-Shell aus ausgeführt haben:
import java.sql.*; public class SparkJdbc { public static void main(String[] args) throws Exception { String driverName = "org.apache.hive.jdbc.HiveDriver"; String jdbcUrl = "jdbc:hive2://localhost:10000/default"; String jdbcUser = "???"; String jdbcPass = "???"; Class.forName(driverName); Connection conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPass); Statement stmt = null; ResultSet rs = null; try { stmt = conn.createStatement(); stmt.execute("CREATE TEMPORARY TABLE movies USING solr OPTIONS (collection "movielens_movies")"); stmt.execute("CREATE TEMPORARY TABLE users USING solr OPTIONS (collection "movielens_users")"); stmt.execute("CREATE TEMPORARY TABLE ratings USING solr OPTIONS (collection "movielens_ratings")"); rs = stmt.executeQuery("SELECT u.gender as gender, COUNT(*) as num_ratings, avg(r.rating) as avg_rating "+ "FROM ratings r, users u, movies m WHERE m.movie_id = r.movie_id AND r.user_id = u.user_id AND m.genre='romance' "+ " AND u.age > 30 GROUP BY gender ORDER BY num_ratings desc"); int rows = 0; while (rs.next()) { ++rows; // TODO: do something with each row } } finally { if (rs != null) rs.close(); if (stmt != null) stmt.close(); if (conn != null) conn.close(); } } }
Daten Lokalisierung
Wenn sich der Spark-Executor und die Solr-Replik auf demselben physischen Host befinden, sorgt SolrRDD mit der Funktion Data Locality für eine schnellere Abfrageausführungszeit. Während der Erstellung der Partition bietet SolrRDD die Option, die Ausführung auf demselben Knoten zu bevorzugen, auf dem auch die Replik vorhanden ist. Dies spart den Overhead, der durch das Senden der Daten über das Netzwerk zwischen verschiedenen Knoten entsteht.
Leistung
Bevor wir diesen Blog-Beitrag abschließen, möchten wir unsere Ergebnisse aus einem Performance-Experiment mitteilen, um zu sehen, wie gut diese Lösung skaliert. Konkret wollten wir die Zeit messen, die für die Indizierung der Daten von Spark zu Solr benötigt wird, sowie die Zeit, die für die Abfrage von Solr von Spark aus benötigt wird, wobei wir den Datensatz der grünen Taxifahrten in NYC zwischen 2013 und 2015 verwendet haben. Die Daten wurden auf eine Postgres RDS-Instanz in AWS geladen. Wir haben das Solr Scale Toolkit (solr-scale-tk) verwendet, um einen Lucidworks Fusion-Cluster mit 3 Knoten einzurichten, der Apache Spark und Solr enthält. Weitere Einzelheiten finden Sie unter https://gist.github.com/kiranchitturi/0be62fc13e4ec7f9ae5def53180ed181.
Einrichtung
- 3 EC2-Knoten mit r3.2xlarge-Instanzen, auf denen Amazon Linux läuft und die in derselben Platzierungsgruppe eingesetzt werden
- Solr-Knoten und Spark-Arbeitsprozesse befinden sich gemeinsam auf demselben Host
- Solr-Sammlung ’nyc-taxi‘ mit 6 Shards erstellt (keine Replikation)
- Gesamtzahl der Zeilen ‚91748362‘ in der Datenbank
Schreiben in Solr
Die Dokumente werden von der RDS-Instanz geladen und mithilfe des Spark-Shell-Skripts in Solr indiziert. 91,49 Millionen Zeilen werden in 49 Minuten in Solr indiziert.
- Dokumente pro Sekunde: 31.1K
- JDBC-Stapelgröße: 5000
- Batchgröße für die Solr-Indizierung: 50000
- Partitionen: 200
Lesen aus Solr
Der vollständige Sammel-Dump von Solr zu Spark wird auf zwei Arten durchgeführt. Um die Streaming-Ausdrücke testen zu können, haben wir eine einfache Abfrage gewählt, die nur Felder mit docValues verwendet. Die Ergebnismenge umfasst alle Dokumente in der Sammlung ’nyc-taxi‘ (91.49M)
Tiefes Paging mit geteilten Abfragen unter Verwendung von Cursor-Markierungen
- Dokumente pro Sekunde (pro Aufgabe): 6350
- Insgesamt benötigte Zeit: 20 Minuten
- Partitionen: 120
Streaming mit dem Export-Handler
- Dokumente pro Sekunde (pro Aufgabe): 108.9k
- Insgesamt benötigte Zeit: 2.3 Minuten
- Partitionen: 6
Vollständige Datendumps von Spark nach Solr unter Verwendung der JDBC-Datenquelle sind schneller als der traditionelle DIH-Ansatz. Streaming mit dem Export-Handler ist ~10 Mal schneller als das traditionelle Deep Paging. Die Verwendung von DocValues verschafft uns diesen Leistungsvorteil.
Wir hoffen, dass dieser Beitrag Ihnen einige Einblicke in die Verwendung von Apache Spark und Apache Solr für Analysen gegeben hat. Es gibt eine Reihe weiterer interessanter Funktionen der Bibliothek, die wir in diesem Beitrag nicht abdecken konnten. Wir möchten Sie daher ermutigen, die Codebasis im Detail zu erkunden: github.com/lucidworks/spark-solr. Wenn Sie mehr darüber erfahren möchten, wie Sie Spark und Solr für Ihre Big Data-Anforderungen nutzen können, sollten Sie sich den Vortrag von Timothy Potter auf der diesjährigen Lucene Revolution ansehen: Ihr Big Data Stack ist zu groß.