Solr als Apache Spark SQL-Datenquelle

Teil 1 von 2: Solr-Ergebnisse als DataFrame lesen Dieser Beitrag ist der erste einer zweiteiligen Serie, in der ich ein…

Teil 1 von 2: Solr-Ergebnisse als DataFrame lesen

Dieser Beitrag ist der erste einer zweiteiligen Serie, in der ich ein von Lucidworks entwickeltes Open-Source-Toolkit vorstelle, das Solr als Spark SQL DataSource zur Verfügung stellt. Die DataSource-API bietet eine saubere Abstraktionsschicht für Spark-Entwickler zum Lesen und Schreiben strukturierter Daten von/zu einer externen Datenquelle. In diesem ersten Beitrag beschreibe ich, wie Sie Daten aus Solr in Spark lesen können. Im nächsten Beitrag beschreibe ich, wie Sie strukturierte Daten aus Spark in Solr schreiben.

Um zu beginnen, müssen Sie das Projekt von Github klonen und mit Maven erstellen:

git clone https://github.com/Lucidworks/spark-solr.git
 cd spark-solr
 mvn clean package -DskipTests

Nach der Erstellung führen Sie das twitter-to-solr Beispiel aus, um Solr mit einigen Tweets zu füllen. Sie benötigen dazu Ihre eigenen Twitter-API-Schlüssel, die Sie anhand der hier dokumentierten Schritte erstellen können .

Starten Sie Solr im Cloud-Modus und erstellen Sie eine Sammlung namens „socialdata“, die in zwei Shards partitioniert ist:

bin/solr -c && bin/solr create -c socialdata -shards 2

In den übrigen Abschnitten dieses Dokuments wird davon ausgegangen, dass Solr im Cloud-Modus auf Port 8983 läuft und der eingebettete ZooKeeper auf localhost:9983 lauscht.

Um sicherzustellen, dass Sie die Tweets bei der Indizierung nahezu in Echtzeit sehen können, sollten Sie außerdem automatische Soft-Commits über die Config API von Solr aktivieren. Für diese Übung werden wir die Tweets alle 2 Sekunden übertragen.

curl -XPOST http://localhost:8983/solr/socialdata/config 
 -d '{"set-property":{"updateHandler.autoSoftCommit.maxTime":"2000"}}'

Lassen Sie uns nun Solr mithilfe von Spark Streaming mit Tweets befüllen:

$SPARK_HOME/bin/spark-submit --master $SPARK_MASTER 
 --conf "spark.executor.extraJavaOptions=-Dtwitter4j.oauth.consumerKey=? -Dtwitter4j.oauth.consumerSecret=? -Dtwitter4j.oauth.accessToken=? -Dtwitter4j.oauth.accessTokenSecret=?" 
 --class com.lucidworks.spark.SparkApp 
 ./target/spark-solr-1.0-SNAPSHOT-shaded.jar 
 twitter-to-solr -zkHost localhost:9983 -collection socialdata

Ersetzen Sie $SPARK_MASTER durch die URL Ihres Spark-Masterservers. Wenn Sie keinen Zugang zu einem Spark-Cluster haben, können Sie den Spark-Auftrag im lokalen Modus ausführen, indem Sie

--master local[2]

Wenn Sie jedoch im lokalen Modus arbeiten, gibt es keinen Executor. Daher müssen Sie die Twitter-Anmeldeinformationen im Parameter spark.driver.extraJavaOptions statt spark.executor.extraJavaOptions übergeben.

Die Tweets fließen nun in Solr ein. Lassen Sie den Streaming-Auftrag einige Minuten lang laufen, damit sich einige tausend Tweets in Ihrer Socialdata-Sammlung ansammeln. Sie können den Auftrag mit ctrl-C beenden.

Als nächstes starten wir die Spark Scala REPL-Shell, um eine interaktive Datenexploration mit unseren indizierten Tweets durchzuführen:

cd $SPARK_HOME
 ADD_JARS=$PROJECT_HOME/target/spark-solr-1.0-SNAPSHOT-shaded.jar bin/spark-shell

$PROJECT_HOME ist der Ort, an den Sie das spark-solr-Projekt geklont haben.

Lassen Sie uns als nächstes die socialdata-Sammlung in Spark laden, indem wir den folgenden Scala-Code in der Shell ausführen:

val tweets = sqlContext.load("solr",
 Map("zkHost" -> "localhost:9983", "collection" -> "socialdata")
 ).filter("provider_s='twitter'")

In Zeile 1 verwenden wir das sqlContext-Objekt, das von Spark automatisch in die Shell geladen wird, um eine DataSource namens „solr“ zu laden. Hinter den Kulissen findet Spark die Klasse solr.DefaultSource in der Projekt-JAR-Datei, die wir mit der Umgebungsvariablen ADD_JARS in die Shell geladen haben.

In Zeile 2 übergeben wir die Konfigurationsparameter, die die Solr DataSource für die Verbindung mit Solr über eine Scala Map benötigt. Zumindest müssen wir den ZooKeeper-Verbindungsstring (zkHost) und den Namen der Sammlung übergeben. Standardmäßig findet die DataSource alle Dokumente in der Sammlung, aber Sie können der DataSource mit dem optionalen Parameter „query“ eine Solr-Abfrage übergeben. Auf diese Weise können Sie die von der DataSource angezeigten Dokumente mithilfe einer Solr-Abfrage einschränken.

In Zeile 3 verwenden wir einen Filter, um nur Dokumente auszuwählen, die von Twitter stammen (provider_s=’twitter‘).

An diesem Punkt haben wir ein Spark SQL DataFrame-Objekt, das Tweets aus Solr lesen kann. In Spark ist ein DataFrame eine verteilte Sammlung von Daten, die in benannten Spalten organisiert sind (siehe: https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html). Vom Konzept her sind DataFrames ähnlich wie Tabellen in einer relationalen Datenbank, nur dass sie über mehrere Knoten in einem Spark-Cluster verteilt sind. Das folgende Diagramm zeigt, wie ein DataFrame durch die Abfrage unserer Socialdata-Sammlung mit zwei Shards in Solr unter Verwendung der DataSource API erstellt wird:

apache-solr-spark-datasource

Es ist wichtig zu verstehen, dass Spark die Socialdata-Sammlung zu diesem Zeitpunkt noch nicht in den Speicher lädt. Die eigentlichen Daten werden erst dann in Spark geladen, wenn sie später im Job für eine Berechnung benötigt werden. So kann Spark die notwendigen Spalten- und Partitionsbereinigungsoperationen durchführen, um den Datenzugriff in Solr zu optimieren.

Jeder DataFrame hat ein Schema. Sie können die Funktion printSchema() verwenden, um Informationen über die für den DataFrame tweets verfügbaren Felder zu erhalten:

tweets.printSchema()

Hinter den Kulissen verwendet unsere DataSource-Implementierung die Schema-API von Solr, um die Felder und Feldtypen für die Sammlung automatisch zu bestimmen.

scala> tweets.printSchema()
 root
 |-- _indexed_at_tdt: timestamp (nullable = true)
 |-- _version_: long (nullable = true)
 |-- accessLevel_i: integer (nullable = true)
 |-- author_s: string (nullable = true)
 |-- createdAt_tdt: timestamp (nullable = true)
 |-- currentUserRetweetId_l: long (nullable = true)
 |-- favorited_b: boolean (nullable = true)
 |-- id: string (nullable = true)
 |-- id_l: long (nullable = true)
 ...

Als nächstes registrieren wir den DataFrame tweets als temporäre Tabelle, damit wir ihn in SQL-Abfragen verwenden können:

tweets.registerTempTable("tweets")

Wir können zum Beispiel die Anzahl der Retweets zählen, indem wir dies tun:

sqlContext.sql("SELECT COUNT(type_s) FROM tweets WHERE type_s='echo'").show()

Wenn Sie Ihr Solr-Protokoll überprüfen, werden Sie sehen, dass die folgende Abfrage von der Solr DataSource generiert wurde, um die SQL-Anweisung zu verarbeiten (beachten Sie, dass ich die Zeilenumbrüche zwischen den Parametern hinzugefügt habe, um die Abfrage besser lesen zu können):

q=*:*&
 fq=provider_s:twitter&
 fq=type_s:echo&
 distrib=false&
 fl=type_s,provider_s&
 cursorMark=*&
 start=0&
 sort=id+asc&
 collection=socialdata&
 rows=1000

Es gibt einige interessante Aspekte dieser Abfrage. Beachten Sie zunächst, dass der Feldfilter provider_s, den wir bei der Deklaration des DataFrame verwendet haben, in einen Solr-Filter-Abfrageparameter (fq=provider_s:twitter) übersetzt wurde. Solr speichert eine effiziente Datenstruktur für diesen Filter, die in allen Abfragen wiederverwendet werden kann, was die Leistung beim Lesen von Daten aus Solr in Spark verbessert.

Außerdem enthielt die SQL-Anweisung eine WHERE-Klausel, die ebenfalls in eine zusätzliche Filterabfrage übersetzt wurde (fq=type_s:echo). Unsere DataSource-Implementierung übernimmt die Übersetzung von SQL-Klauseln in Solr-spezifische Abfragekonstrukte. Am Backend übernimmt Spark die Verteilung und Optimierung des logischen Plans zur Ausführung eines Auftrags, der auf Datenquellen zugreift.

Obwohl für jeden Tweet in unserer Sammlung viele Felder zur Verfügung stehen, stellt Spark sicher, dass nur die für die Abfrage benötigten Felder aus der Datenquelle abgerufen werden, in diesem Fall also nur type_s und provider_s. Im Allgemeinen ist es eine gute Idee, beim Lesen von Daten in Spark nur die spezifischen Felder abzufragen, auf die Sie Zugriff benötigen.

Die Abfrage verwendet außerdem Deep-Paging-Cursors, um effizient Dokumente tief in der Ergebnismenge zu lesen. Wenn Sie wissen möchten, wie Deep Paging Cursors in Solr funktionieren, lesen Sie bitte: https://lucidworks.com/post/coming-soon-to-solr-efficient-cursor-based-iteration-of-large-result-sets/. Außerdem werden übereinstimmende Dokumente von Solr zurückgestreamt, was die Leistung verbessert, da die Client-Seite (Spark-Task) nicht darauf warten muss, dass eine ganze Seite von Dokumenten (1000) auf der Solr-Seite aufgebaut wird, bevor sie Daten erhält. Mit anderen Worten: Die Dokumente werden von Solr zurückgestreamt, sobald der erste Treffer identifiziert wurde.

Der letzte interessante Aspekt dieser Abfrage ist der Parameter distrib=false. Hinter den Kulissen liest die Solr DataSource Daten aus allen Shards in einer Sammlung parallel von verschiedenen Spark Tasks. Mit anderen Worten: Wenn Sie eine Sammlung mit zehn Shards haben, verwendet die Solr DataSource-Implementierung 10 Spark-Tasks, um parallel von jedem Shard zu lesen. Der Parameter distrib=false sorgt dafür, dass jeder Shard die Abfrage nur lokal ausführt, anstatt sie an andere Shards zu verteilen.

Das parallele Lesen aus allen Shards funktioniert jedoch nicht für Anwendungsfälle des Typs Top N, bei denen Sie Dokumente aus Solr in geordneter Reihenfolge über alle Shards hinweg lesen müssen. Sie können die Parallelisierungsfunktion deaktivieren, indem Sie den Parameter parallel_shards auf false setzen. Wenn er auf false gesetzt ist, führt die Solr DataSource eine verteilte Standardabfrage aus. Daher sollten Sie bei der Deaktivierung dieser Funktion Vorsicht walten lassen, insbesondere wenn Sie sehr große Ergebnismengen aus Solr lesen.

Nicht nur SQL

Neben SQL bietet die Spark-API eine Reihe von funktionalen Operationen, die Sie mit einem DataFrame durchführen können. Wenn wir beispielsweise die Top-Autoren anhand der Anzahl der Beiträge ermitteln wollten, könnten wir die folgende SQL verwenden:

sqlContext.sql("select author_s, COUNT(author_s) num_posts from tweets where type_s='post' group by author_s order by num_posts desc limit 10").show()

Sie können auch die DataFrame-API verwenden, um dasselbe zu erreichen:

tweets.filter("type_s='post'").groupBy("author_s").count().
 orderBy(desc("count")).limit(10).show()

Ein weiterer subtiler Aspekt bei der Arbeit mit DataFrames ist, dass Sie als Entwickler entscheiden müssen, wann Sie den DataFrame zwischenspeichern, je nachdem, wie teuer die Erstellung war. Wenn Sie z.B. 10 Millionen Zeilen aus Solr laden und dann eine kostspielige Transformation durchführen, die Ihren DataFrame auf 10.000 Zeilen reduziert, wäre es klug, den kleineren DataFrame zu cachen, damit Sie nicht erneut Millionen von Zeilen aus Solr einlesen müssen. Andererseits ist das Zwischenspeichern der ursprünglichen Millionen von Zeilen, die von Solr abgerufen wurden, wahrscheinlich nicht sehr sinnvoll, da dies zu viel Arbeitsspeicher verbraucht. Generell empfehle ich, DataFrames zu cachen, wenn Sie sie für zusätzliche Berechnungen wiederverwenden müssen und ihre Erstellung einige Berechnungen erfordert.

Nachbereitung

Natürlich brauchen Sie nicht die Leistung von Spark, um eine einfache Zählung durchzuführen, wie ich es in meinem Beispiel getan habe. Das Wichtigste ist jedoch, dass es mit der Spark SQL DataSource API sehr einfach ist, die Ergebnisse einer Solr-Abfrage als DataFrame darzustellen. Dadurch können Sie unter anderem Daten aus Solr mit Daten aus anderen Unternehmenssystemen wie Hive oder Postgres kombinieren, um erweiterte Datenanalyseaufgaben in großem Umfang durchzuführen. Ein weiterer Vorteil der DataSource-API ist, dass sie es Entwicklern ermöglicht, mit einer Datenquelle in jeder von Spark unterstützten Sprache zu interagieren. So gibt es beispielsweise keine native R-Schnittstelle zu Solr, aber mit Spark SQL kann ein Datenwissenschaftler Daten aus Solr nahtlos in einen R-Job ziehen.

Im nächsten Beitrag werde ich Ihnen zeigen, wie Sie mit der DataSource-API einen DataFrame in Solr schreiben.

You Might Also Like

B2B-KI-Benchmarkstudie 2025: Was wir in den Schützengräben sehen

Laden Sie die B2B-KI-Benchmark-Highlights 2025 von Lucidworks herunter. Sehen Sie sich die...

Read More

Vom Suchunternehmen zum praktischen KI-Pionier: Unsere Vision für 2025 und darüber hinaus

CEO Mike Sinoway gibt Einblicke in die Zukunft der KI und stellt...

Read More

Wenn KI schief geht: Fehlschläge in der realen Welt und wie man sie vermeidet

Lassen Sie nicht zu, dass Ihr KI-Chatbot einen 50.000 Dollar teuren Tahoe...

Read More

Quick Links