Verbesserung der Datenverarbeitung mit Spark 3.0 & Delta Lake

Quellknoten: 1013539

Erhebung, Verarbeitung und Durchführung von Analysen zu Streaming-Daten, in Branchen wie der Ad-Tech erfordert intensives Daten-Engineering. Die täglich generierten Daten sind riesig (100s GB Daten) und erfordern eine erhebliche Verarbeitungszeit, um die Daten für die nachfolgenden Schritte zu verarbeiten.

Eine weitere Herausforderung ist das Zusammenführen von Datensätzen, um Erkenntnisse abzuleiten. Jeder Prozess hat im Durchschnitt mehr als 10 Datensätze und eine gleiche Anzahl von Joins mit mehreren Schlüsseln. Die Partitionsgröße für jeden Schlüssel ist bei jeder Ausführung unvorhersehbar.

Und schließlich, wenn die Datenmenge in bestimmten Fällen überschritten wird, kann der Speicher nicht mehr ausreichend sein. Dies bedeutet, dass der Prozess mitten in den letzten Schreibvorgängen sterben würde, was dazu führt, dass die Verbraucher die Eingabedatenrahmen deutlich lesen.

In diesem Blog geben wir einen Überblick über Delta-Seen, seine Vorteile und wie die oben genannten Herausforderungen durch den Wechsel zu Delta Lake und die Migration von Spark 3.0 zu Spark 2.4 gemeistert werden können. 

Was ist Deltasee?

„Delta Lake ist eine Open-Source-Datenspeicherschicht, die auf dem bestehenden Data Lake läuft und vollständig mit ihnen kooperiert Apache Funken APIs. Neben der Möglichkeit, ACID-Transaktionen und skalierbare Metadatenverarbeitung zu implementieren, kann Delta Lakes auch die Streaming- und Batch-Datenverarbeitung vereinheitlichen.“ 

Delta Lake verwendet versionierte Parquet-Dateien, um Daten in der Cloud zu speichern. Sobald der Cloudspeicherort konfiguriert ist, verfolgt Delta Lake alle Änderungen, die an der Tabelle oder dem Blobspeicherverzeichnis vorgenommen wurden, um ACID-Transaktionen bereitzustellen. 

Vorteile der Nutzung von Delta Lakes 

Delta Lake ermöglicht die parallele Ausführung von Tausenden von Daten, die Bewältigung von Optimierungs- und Partitionsherausforderungen, schnellere Metadatenvorgänge, führt ein Transaktionsprotokoll und aktualisiert die Daten kontinuierlich. Im Folgenden besprechen wir einige wichtige Vorteile: 

Delta Lake-Transaktionsprotokoll

Delta-Lake-Transaktionsprotokolle sind eine Datei, die nur angehängt werden kann und eine geordnete Aufzeichnung aller Transaktionen enthält, die in der Delta-Lake-Tabelle ausgeführt wurden. Das Transaktionsprotokoll ermöglicht es verschiedenen Benutzern, die angegebene Tabelle parallel zu lesen und zu schreiben. Es fungiert als Single Source of Truth oder als zentrales Repository, das alle vom Benutzer an der Tabelle vorgenommenen Änderungen protokolliert. Es behält die Atomarität bei und überwacht kontinuierlich die Transaktionen, die auf Delta Lake durchgeführt werden.

Wie oben erwähnt, überprüft Spark das Delta-Log auf neue Transaktionen, woraufhin Delta Lake sicherstellt, dass die Version des Benutzers immer mit dem Master-Datensatz synchron ist. Es stellt auch sicher, dass keine widersprüchlichen Änderungen an der Tabelle vorgenommen werden. Wenn der Prozess abstürzt, bevor das Delta-Log aktualisiert wird, stehen die Dateien keinem Leseprozess zur Verfügung, da die Lesevorgänge immer das Transaktionslog durchlaufen.

Funktionieren des Transaktionsprotokolls und atomare Commits

Delta Lake führt alle zehn Commits einen Checkpoint durch. Die Checkpoint-Datei enthält den aktuellen Stand der Daten im Parquet-Format, der schnell gelesen werden kann. Wenn mehrere Benutzer gleichzeitig versuchen, die Tabelle zu ändern, löst Delta Lake die Konflikte durch eine optimistische Parallelitätssteuerung.

Das Schema der Metadaten sieht wie folgt aus: 

KolonneTypBeschreibung
FormatSchnurFormat der Tabelle, d. h. „Delta“.
idSchnurEindeutige ID der Tabelle
NameSchnurName der Tabelle wie im Metastore definiert
BeschreibungSchnurBeschreibung der Tabelle.
StandorteSchnurLage des Tisches
hergestellt inZeitstempelWenn die Tabelle erstellt wurde
zuletzt bearbeitetZeitstempelWann die Tabelle zuletzt geändert wurde
partitionColumnsArray von ZeichenfolgenNamen der Partitionsspalten, wenn die Tabelle partitioniert ist
numDateienlangeAnzahl der Dateien in der neuesten Version der Tabelle
immobilienString-String-MapAlle Eigenschaften für diese Tabelle festgelegt
minReaderVersionintMindestversion der Leser (laut Protokollprotokoll), die die Tabelle lesen können.
minWriterVersionintMindestversion der Leser (gemäß Protokollprotokoll), die in die Tabelle schreiben können.
Quelle: GitHub

Datei hinzufügen und entfernen

Immer wenn eine Datei hinzugefügt oder eine vorhandene Datei entfernt wird, werden diese Aktionen protokolliert. Der Dateipfad ist eindeutig und wird als Primärschlüssel für die darin enthaltenen Dateien betrachtet. Wenn eine neue Datei zu einem Pfad hinzugefügt wird, der bereits in der Tabelle vorhanden ist, werden Statistiken und andere Metadaten zum Pfad von der vorherigen Version aktualisiert. Ebenso wird die Aktion zum Entfernen durch den Zeitstempel angezeigt. Eine Aktion zum Entfernen verbleibt als Tombstone in der Tabelle, bis sie abgelaufen ist. Ein Tombstone verfällt, wenn die TTL (Time-To-Live) überschritten wird.

Da nicht garantiert wird, dass Aktionen innerhalb einer bestimmten Delta-Datei in der richtigen Reihenfolge angewendet werden, ist es nicht gültig, dass mehrere Dateioperationen mit demselben Pfad in einer einzigen Version vorhanden sind.

Das dataChange-Flag bei 'Add' oder 'Remove' kann auf false gesetzt werden, um die Konflikte bei gleichzeitigen Operationen zu minimieren.

Das Schema der Add-Aktion sieht wie folgt aus:

FeldnameDateitypBeschreibung
WegSchnurEin relativer Pfad vom Stamm der Tabelle zu einer Datei, die der Tabelle hinzugefügt werden soll
PartitionswerteKarte[String,String]Eine Zuordnung von Partitionsspalte zu Wert für diese Datei. 
GrößeLangDie Größe dieser Datei in Bytes
ÄnderungszeitLangDie Zeit, zu der diese Datei erstellt wurde, in Millisekunden seit der Epoche
DatenänderungBooleanBei false muss die Datei bereits in der Tabelle vorhanden sein oder die Datensätze in der hinzugefügten Datei müssen in einer oder mehreren Löschaktionen in derselben Version enthalten sein
StatistikStatistikstrukturEnthält Statistiken (z. B. Anzahl, Min/Max-Werte für Spalten) zu den Daten in dieser Datei
TagsKarte[String,String]Karte mit Metadaten zu dieser Datei

Das Schema der Aktion zum Entfernen sieht wie folgt aus:

FeldnameDatum TypBeschreibung
WegSchnurEin absoluter oder relativer Pfad zu einer Datei, die aus der Tabelle entfernt werden soll
LöschungZeitstempellangeDie Zeit, zu der die Löschung erfolgte, dargestellt als Millisekunden seit der Epoche
DatenänderungBooleanBei false müssen die Datensätze in der entfernten Datei in einer oder mehreren Aktionen zum Hinzufügen von Dateien in derselben Version enthalten sein
ExtendedFileMetadataBooleanBei true sind die Felder partitionValues, size und tags vorhanden
PartitionswerteKarte[String, String]Eine Zuordnung von Partitionsspalte zu Wert für diese Datei. Siehe auch Partitionswert-Serialisierung
GrößeLangDie Größe dieser Datei in Bytes
TagsKarte[String, String]Karte mit Metadaten zu dieser Datei
Quelle: GitHub

Das Schema der Metadaten enthält den Dateipfad bei jeder Aktion zum Hinzufügen/Entfernen und der Spark-Leseprozess muss keinen vollständigen Scan durchführen, um die Dateilisten abzurufen.

Wenn ein Schreibvorgang fehlschlägt, ohne das Transaktionsprotokoll zu aktualisieren, werden diese Dateien ignoriert, da der Verbraucher beim Lesen immer die Metadaten durchläuft. 

Vorteile der Migration zu Spark 3.0

Neben der Nutzung der Vorteile von Delta Lake verbesserte die Migration zu Spark 3.0 die Datenverarbeitung auf folgende Weise:

Schiefe Join-Optimierung

Datenschiefe ist ein Zustand, bei dem die Daten einer Tabelle ungleichmäßig auf die Partitionen im Cluster verteilt sind und die Leistung von Abfragen, insbesondere von Abfragen mit Joins, stark beeinträchtigen kann. Schiefe kann zu einem extremen Ungleichgewicht im Cluster führen, wodurch die Datenverarbeitungszeit verlängert wird.

Die Datenversatzbedingung kann hauptsächlich durch drei Ansätze gehandhabt werden.

  1. Verwenden der Konfiguration „spark.sql.shuffle.partitions“ für mehr Parallelität bei gleichmäßiger verteilten Daten.
  2. Erhöhen des Broadcast-Hash-Join-Schwellenwerts mithilfe der Konfiguration spark.sql.autoBroadcastJoinThreshold auf die maximale Größe in Byte für die Tabelle, die beim Ausführen eines Joins an alle Worker-Knoten gesendet werden muss.
  3. Key Salting (Fügen Sie den schiefen Schlüsseln ein Präfix hinzu, um den gleichen Schlüssel anders zu machen, und passen Sie dann die Datenverteilung an).

Spark 3.0 hat mit dem neuen adaptiven Ausführungs-Framework die automatische Handhabung von Schrägverknüpfungen basierend auf den Laufzeitstatistiken optimiert.

Schiefe Partitionsbedingung

Die Herausforderung der verzerrten Partitionen, die in der vorherigen Version von Spark 2.4 bestand, hatte einen großen Einfluss auf die Netzwerkzeit und die Ausführungszeit einer bestimmten Aufgabe. Darüber hinaus waren die Methoden, um damit umzugehen, meist manuell. Spark 3.0 meistert diese Herausforderungen.

Die schiefe Partition wirkt sich auf den Netzwerkverkehr und die Ausführungszeit der Task aus, da diese spezielle Task viel mehr Daten zu verarbeiten hat. Sie müssen auch wissen, wie sich dies auf die Cybersicherheit auswirkt, da Das Netzwerkverkehrsvolumen ist etwas, das Hacker ausnutzen.

Die schiefe Join-Partition wird anhand der Datengröße und der Zeilenanzahl aus den Laufzeit-Map-Statistiken berechnet.

OPTIMIERUNG

Angepasst von:Apache-Spark-Jira

Aus der obigen Tabelle werden die Dataframe-Kampagnen mit den Dataframe-Organisationen verbunden. Eine der Partitionen (Partition 0) von Organizations ist groß und verzerrt. Partition 0 ist das Ergebnis von 9 Maps aus der vorherigen Phase (Map-0 bis Map-8). Die OptimizeSkewedJoin-Regel von Spark teilt die Partition in 3 auf und erstellt dann 3 separate Aufgaben, von denen jede eine Teilpartition von Partition 0 ist (Map-0 bis Map-2, Map-3 bis Map-5 und Map-6 bis Map-9) und mit der Campaigns-Partition 0 verknüpft. Dieser Ansatz führt zu zusätzlichen Kosten, indem Partition 0 der Tabelle Campaigns gleich der Anzahl der Teilpartitionen aus der Tabelle Organizations gelesen wird.

Endresultat

Mit Delta Lake und Spark 3.0 haben wir die folgenden Ergebnisse für das Ad-Tech-Unternehmen ermöglicht:

  • Die Zeit der Datenverarbeitung wurde von 15 Stunden auf 5-6 Stunden reduziert
  • 50% Reduzierung der AWS EMR-Kosten
  • Verhindern von Datenverlust und dem Tod von Prozessen, was häufig vorkam, wenn das System nicht mehr genügend Arbeitsspeicher hatte oder die Verarbeitung aufgrund eines Fehlers im System angehalten wurde
  • Überwachungs- und Warnfunktionen wurden installiert, um zu benachrichtigen, falls der Prozess fehlschlägt
  • Vollständige Orchestrierung mit Airflow, um eine vollständige Automatisierung und das Abhängigkeitsmanagement zwischen Prozessen zu erreichen
Quelle: https://www.smartdatacollective.com/improving-data-processing-with-spark-3-delta-lake/

Zeitstempel:

Mehr von SmartData-Kollektiv