Gegevensverwerking verbeteren met Spark 3.0 en Delta Lake

Bronknooppunt: 1013539

Verzamelen, verwerken en uitvoeren van analyses op gegevens streamen, in sectoren als ad-tech gaat het om intensieve data-engineering. De dagelijks gegenereerde gegevens zijn enorm (100 GB aan gegevens) en vereisen een aanzienlijke verwerkingstijd om de gegevens voor volgende stappen te verwerken.

Een andere uitdaging is het samenvoegen van datasets om inzichten te verkrijgen. Elk proces heeft gemiddeld meer dan 10 datasets en een gelijk aantal joins met meerdere sleutels. De partitiegrootte voor elke sleutel is bij elke uitvoering onvoorspelbaar.

En tot slot, als de hoeveelheid gegevens bij bepaalde gelegenheden overschrijdt, kan de opslagruimte onvoldoende geheugen hebben. Dit betekent dat het proces zou sterven in het midden van de laatste schrijfbewerkingen, waardoor consumenten de invoergegevensframes duidelijk zouden kunnen lezen.

In deze blog geven we een overzicht van Delta Meren, de voordelen ervan en hoe de bovenstaande uitdagingen kunnen worden overwonnen door naar Delta Lake te verhuizen en vanuit Spark 3.0 naar Spark 2.4 te migreren. 

Wat is Deltameer?

Delta Lake is ontwikkeld door Databricks en is een open-source dataopslaglaag die draait op het bestaande Data Lake en volledig samenwerkt met Apache Spark API's. Naast de mogelijkheid om ACID-transacties en schaalbare verwerking van metadata te implementeren, kan Delta Lakes ook de verwerking van streaming en batchgegevens verenigen.” 

Delta Lake gebruikt Parquet-bestanden met versiebeheer om gegevens in de cloud op te slaan. Zodra de cloudlocatie is geconfigureerd, houdt Delta Lake alle wijzigingen bij die zijn aangebracht in de tabel- of blobstore-directory om ACID-transacties te bieden. 

Voordelen van het gebruik van Delta Lakes 

Met Delta Lake kunnen duizenden gegevens parallel worden uitgevoerd, worden optimalisatie- en partitie-uitdagingen aangepakt, worden metadata sneller uitgevoerd, wordt een transactielogboek bijgehouden en worden de gegevens voortdurend bijgewerkt. Hieronder bespreken we enkele grote voordelen: 

Delta Lake-transactielogboek

Transactielogboeken van Delta Lake zijn een bestand dat alleen kan worden toegevoegd en bevatten een geordend record van alle transacties die zijn uitgevoerd op de Delta Lake-tabel. Het transactielogboek stelt verschillende gebruikers in staat om parallel te lezen en te schrijven naar de gegeven tabel. Het fungeert als een enkele bron van waarheid of de centrale opslagplaats die alle wijzigingen registreert die door de gebruiker in de tabel zijn aangebracht. Het handhaaft de atomiciteit en kijkt continu naar de transacties die op Delta Lake worden uitgevoerd.

Zoals hierboven vermeld, controleert Spark het deltalogboek op nieuwe transacties, waarna Delta Lake ervoor zorgt dat de gebruikersversie altijd synchroon loopt met het masterrecord. Het zorgt er ook voor dat er geen tegenstrijdige wijzigingen in de tabel worden aangebracht. Als het proces crasht voordat het deltalogboek wordt bijgewerkt, zijn de bestanden niet beschikbaar voor leesprocessen, omdat het lezen altijd door het transactielogboek gaat.

Transactielogboek Werken en Atomic Commits

Delta Lake doet een checkpoint bij elke tien commits. Het checkpointed-bestand bevat de huidige status van de gegevens in het Parquet-formaat dat snel kan worden gelezen. Wanneer meerdere gebruikers tegelijkertijd proberen de tabel aan te passen, lost Delta Lake de conflicten op met behulp van optimistische gelijktijdigheidscontrole.

Het schema van de metadata is als volgt: 

KolomTypeOmschrijving
formaatsnaarFormaat van de tabel, dat wil zeggen "delta".
idsnaarUnieke ID van de tafel
naamsnaarNaam van de tabel zoals gedefinieerd in de metastore
beschrijvingsnaarBeschrijving van de tafel.
plaatssnaarLocatie van de tafel
gemaakt bijtijdstempelToen de tabel werd gemaakt
laatst gewijzigdtijdstempelWanneer de tabel voor het laatst is gewijzigd
partitieKolommenreeks stringsNamen van de partitiekolommen als de tabel is gepartitioneerd
aantalBestandenlangAantal bestanden in de nieuwste versie van de tabel
vastgoedString-string kaartAlle eigenschappen ingesteld voor deze tabel
minReaderVersieintMinimale versie van lezers (volgens het logprotocol) die de tabel kunnen lezen.
minWriterVersieintMinimale versie van lezers (volgens het logprotocol) die naar de tabel kunnen schrijven.
Bron: GitHub

Bestand toevoegen en verwijderen

Telkens wanneer een bestand wordt toegevoegd of een bestaand bestand wordt verwijderd, worden deze acties vastgelegd. Het bestandspad is uniek en wordt beschouwd als de primaire sleutel voor de set bestanden erin. Wanneer een nieuw bestand wordt toegevoegd op een pad dat al aanwezig is in de tabel, worden statistieken en andere metadata op het pad bijgewerkt vanaf de vorige versie. Evenzo wordt actie verwijderen aangegeven door een tijdstempel. Een verwijderactie blijft in de tabel staan ​​als een grafsteen totdat deze is verlopen. Een grafsteen verloopt wanneer de TTL (Time-To-Live) overschrijdt.

Aangezien acties binnen een bepaald Delta-bestand niet gegarandeerd in de juiste volgorde worden toegepast, is het niet geldig voor meerdere bestandsbewerkingen met hetzelfde pad in een enkele versie.

De dataChange-vlag op een 'add' of 'remove' kan worden ingesteld op false om de conflicten met gelijktijdige bewerkingen te minimaliseren.

Het schema van de actie toevoegen is als volgt:

VeldnaamData typeOmschrijving
padDraadEen relatief pad, van de hoofdmap van de tabel, naar een bestand dat aan de tabel moet worden toegevoegd
partitieWaardenKaart[String,String]Een toewijzing van partitiekolom naar waarde voor dit bestand. 
groottelangDe grootte van dit bestand in bytes
modificatieTijdlangDe tijd dat dit bestand is gemaakt, in milliseconden sinds het tijdperk
dataWijzigenBooleanIndien onwaar moet het bestand al aanwezig zijn in de tabel of de records in het toegevoegde bestand moeten in een of meer verwijderacties in dezelfde versie voorkomen
statsStatistiekstructuurBevat statistieken (bijv. aantal, min/max-waarden voor kolommen) over de gegevens in dit bestand
labelsKaart[String,String]Kaart met metadata over dit bestand

Het schema van de verwijderactie is als volgt:

VeldnaamData TypeOmschrijving
padsnaarEen absoluut of relatief pad naar een bestand dat uit de tabel moet worden verwijderd
verwijderingTijdstempellangDe tijd dat de verwijdering plaatsvond, weergegeven als milliseconden sinds het tijdperk
dataWijzigenBooleanIndien onwaar moeten de records in het verwijderde bestand zijn opgenomen in een of meer acties voor het toevoegen van bestanden in dezelfde versie
uitgebreideFileMetadataBooleanIndien waar, zijn de velden partitionValues, size en tags aanwezig
partitieWaardenKaart[String, String]Een toewijzing van partitiekolom naar waarde voor dit bestand. Zie ook Partitiewaarde Serialisatie
groottelangDe grootte van dit bestand in bytes
labelsKaart[String, String]Kaart met metadata over dit bestand
Bron: GitHub

Het schema van de metagegevens bevat het bestandspad bij elke actie voor toevoegen/verwijderen en het Spark-leesproces hoeft geen volledige scan uit te voeren om de bestandslijsten te krijgen.

Als het schrijven mislukt zonder het transactielogboek bij te werken, aangezien de lezing van de consument altijd door de metadata gaat, worden die bestanden genegeerd. 

Voordelen van migreren naar Spark 3.0

Naast het benutten van de voordelen van Delta Lake, verbeterde de migratie naar Spark 3.0 de gegevensverwerking op de volgende manieren:

Scheve deelname optimalisatie

Gegevensscheefheid is een toestand waarin de gegevens van een tabel ongelijk verdeeld zijn over partities in het cluster en de prestaties van query's ernstig kunnen verlagen, vooral die met joins. Scheefheid kan leiden tot extreme onbalans in het cluster, waardoor de gegevensverwerkingstijd toeneemt.

De data-skew-conditie kan hoofdzakelijk worden afgehandeld door drie benaderingen.

  1. De configuratie "spark.sql.shuffle.partitions" gebruiken voor meer parallellisme op meer gelijkmatig verdeelde gegevens.
  2. Verhogen van de drempelwaarde voor broadcast-hash-join met behulp van de configuratie spark.sql.autoBroadcastJoinThreshold tot de maximale grootte in bytes voor de tabel die moet worden uitgezonden naar alle worker-knoop punten tijdens het uitvoeren van een join.
  3. Key Salting (Voeg een prefix toe aan de scheve sleutels om dezelfde sleutel anders te maken en pas vervolgens de gegevensverdeling aan).

Spark 3.0 heeft een optimalisatie toegevoegd voor het automatisch verwerken van scheef aansluiten op basis van de runtime-statistieken met het nieuwe adaptieve uitvoeringsraamwerk.

Scheve partitie conditie

De uitdaging van scheve partities die bestond in de vorige versie van de Spark 2.4 had een enorme impact op de netwerktijd en uitvoeringstijd van een bepaalde taak. Bovendien waren de methoden om ermee om te gaan meestal handmatig. Spark 3.0 overwint deze uitdagingen.

De scheve partitie zal een impact hebben op het netwerkverkeer en op de uitvoeringstijd van de taak, aangezien deze specifieke taak veel meer gegevens te verwerken zal hebben. U moet ook weten hoe dit van invloed is op cyberbeveiliging, want netwerkverkeersvolume is iets waar hackers misbruik van maken.

De scheve joinpartitie wordt berekend op basis van de gegevensgrootte en het aantal rijen uit de runtime-kaartstatistieken.

Optimization

Aangepast uit:Apache Spark Jira

Uit de bovenstaande tabel komen de Dataframe-campagnes samen met de Dataframe-organisaties. Een van de partities (Partitie 0) van Organisaties is groot en scheef. Partitie 0 is het resultaat van 9 kaarten uit de vorige fase (Map-0 tot Map-8). De OptimizeSkewedJoin-regel van Spark splitst de partitie in 3 en maakt vervolgens 3 afzonderlijke taken die elk een gedeeltelijke partitie zijn van partitie 0 (Map-0 tot Map-2, Map-3 tot Map-5 en Map-6 tot Map-9) en sluit aan bij de Campagnes Partitie 0. Deze aanpak resulteert in extra kosten door het lezen van Partitie 0 van tabel Campagnes gelijk aan het aantal gedeeltelijke partities uit de tabel Organisaties.

Eindresultaat

Met behulp van Delta Lake en Spark 3.0 hebben we de volgende resultaten voor het advertentietechnologiebedrijf mogelijk gemaakt:

  • De tijd van gegevensverwerking werd teruggebracht van 15 uur naar 5-6 uur
  • 50% verlaging van de AWS EMR-kosten
  • Voorkomen van gegevensverlies en het afsterven van processen, wat vaak voorkwam wanneer het geheugen van het systeem vol was of de verwerking werd gestopt vanwege een storing in het systeem
  • Monitoring & Alerting-functies zijn geïnstalleerd om te waarschuwen als het proces mislukt
  • Volledige orkestratie met behulp van Airflow om volledige automatisering en afhankelijkheidsbeheer tussen processen te bereiken
Bron: https://www.smartdatacollective.com/improving-data-processing-with-spark-3-delta-lake/

Tijdstempel:

Meer van SmartData Collectief