Verzamelen, verwerken en uitvoeren van analyses op gegevens streamen, in sectoren als ad-tech gaat het om intensieve data-engineering. De dagelijks gegenereerde gegevens zijn enorm (100 GB aan gegevens) en vereisen een aanzienlijke verwerkingstijd om de gegevens voor volgende stappen te verwerken.
Een andere uitdaging is het samenvoegen van datasets om inzichten te verkrijgen. Elk proces heeft gemiddeld meer dan 10 datasets en een gelijk aantal joins met meerdere sleutels. De partitiegrootte voor elke sleutel is bij elke uitvoering onvoorspelbaar.
En tot slot, als de hoeveelheid gegevens bij bepaalde gelegenheden overschrijdt, kan de opslagruimte onvoldoende geheugen hebben. Dit betekent dat het proces zou sterven in het midden van de laatste schrijfbewerkingen, waardoor consumenten de invoergegevensframes duidelijk zouden kunnen lezen.
In deze blog geven we een overzicht van Delta Meren, de voordelen ervan en hoe de bovenstaande uitdagingen kunnen worden overwonnen door naar Delta Lake te verhuizen en vanuit Spark 3.0 naar Spark 2.4 te migreren.
Wat is Deltameer?
Delta Lake is ontwikkeld door Databricks en is een open-source dataopslaglaag die draait op het bestaande Data Lake en volledig samenwerkt met Apache Spark API's. Naast de mogelijkheid om ACID-transacties en schaalbare verwerking van metadata te implementeren, kan Delta Lakes ook de verwerking van streaming en batchgegevens verenigen.”
Delta Lake gebruikt Parquet-bestanden met versiebeheer om gegevens in de cloud op te slaan. Zodra de cloudlocatie is geconfigureerd, houdt Delta Lake alle wijzigingen bij die zijn aangebracht in de tabel- of blobstore-directory om ACID-transacties te bieden.
Voordelen van het gebruik van Delta Lakes
Met Delta Lake kunnen duizenden gegevens parallel worden uitgevoerd, worden optimalisatie- en partitie-uitdagingen aangepakt, worden metadata sneller uitgevoerd, wordt een transactielogboek bijgehouden en worden de gegevens voortdurend bijgewerkt. Hieronder bespreken we enkele grote voordelen:
Delta Lake-transactielogboek
Transactielogboeken van Delta Lake zijn een bestand dat alleen kan worden toegevoegd en bevatten een geordend record van alle transacties die zijn uitgevoerd op de Delta Lake-tabel. Het transactielogboek stelt verschillende gebruikers in staat om parallel te lezen en te schrijven naar de gegeven tabel. Het fungeert als een enkele bron van waarheid of de centrale opslagplaats die alle wijzigingen registreert die door de gebruiker in de tabel zijn aangebracht. Het handhaaft de atomiciteit en kijkt continu naar de transacties die op Delta Lake worden uitgevoerd.
Zoals hierboven vermeld, controleert Spark het deltalogboek op nieuwe transacties, waarna Delta Lake ervoor zorgt dat de gebruikersversie altijd synchroon loopt met het masterrecord. Het zorgt er ook voor dat er geen tegenstrijdige wijzigingen in de tabel worden aangebracht. Als het proces crasht voordat het deltalogboek wordt bijgewerkt, zijn de bestanden niet beschikbaar voor leesprocessen, omdat het lezen altijd door het transactielogboek gaat.
Transactielogboek Werken en Atomic Commits
Delta Lake doet een checkpoint bij elke tien commits. Het checkpointed-bestand bevat de huidige status van de gegevens in het Parquet-formaat dat snel kan worden gelezen. Wanneer meerdere gebruikers tegelijkertijd proberen de tabel aan te passen, lost Delta Lake de conflicten op met behulp van optimistische gelijktijdigheidscontrole.
Het schema van de metadata is als volgt:
Kolom | Type | Omschrijving |
formaat | snaar | Formaat van de tabel, dat wil zeggen "delta". |
id | snaar | Unieke ID van de tafel |
naam | snaar | Naam van de tabel zoals gedefinieerd in de metastore |
beschrijving | snaar | Beschrijving van de tafel. |
plaats | snaar | Locatie van de tafel |
gemaakt bij | tijdstempel | Toen de tabel werd gemaakt |
laatst gewijzigd | tijdstempel | Wanneer de tabel voor het laatst is gewijzigd |
partitieKolommen | reeks strings | Namen van de partitiekolommen als de tabel is gepartitioneerd |
aantalBestanden | lang | Aantal bestanden in de nieuwste versie van de tabel |
vastgoed | String-string kaart | Alle eigenschappen ingesteld voor deze tabel |
minReaderVersie | int | Minimale versie van lezers (volgens het logprotocol) die de tabel kunnen lezen. |
minWriterVersie | int | Minimale versie van lezers (volgens het logprotocol) die naar de tabel kunnen schrijven. |
Bestand toevoegen en verwijderen
Telkens wanneer een bestand wordt toegevoegd of een bestaand bestand wordt verwijderd, worden deze acties vastgelegd. Het bestandspad is uniek en wordt beschouwd als de primaire sleutel voor de set bestanden erin. Wanneer een nieuw bestand wordt toegevoegd op een pad dat al aanwezig is in de tabel, worden statistieken en andere metadata op het pad bijgewerkt vanaf de vorige versie. Evenzo wordt actie verwijderen aangegeven door een tijdstempel. Een verwijderactie blijft in de tabel staan als een grafsteen totdat deze is verlopen. Een grafsteen verloopt wanneer de TTL (Time-To-Live) overschrijdt.
Aangezien acties binnen een bepaald Delta-bestand niet gegarandeerd in de juiste volgorde worden toegepast, is het niet geldig voor meerdere bestandsbewerkingen met hetzelfde pad in een enkele versie.
De dataChange-vlag op een 'add' of 'remove' kan worden ingesteld op false om de conflicten met gelijktijdige bewerkingen te minimaliseren.
Het schema van de actie toevoegen is als volgt:
Veldnaam | Data type | Omschrijving |
pad | Draad | Een relatief pad, van de hoofdmap van de tabel, naar een bestand dat aan de tabel moet worden toegevoegd |
partitieWaarden | Kaart[String,String] | Een toewijzing van partitiekolom naar waarde voor dit bestand. |
grootte | lang | De grootte van dit bestand in bytes |
modificatieTijd | lang | De tijd dat dit bestand is gemaakt, in milliseconden sinds het tijdperk |
dataWijzigen | Boolean | Indien onwaar moet het bestand al aanwezig zijn in de tabel of de records in het toegevoegde bestand moeten in een of meer verwijderacties in dezelfde versie voorkomen |
stats | Statistiekstructuur | Bevat statistieken (bijv. aantal, min/max-waarden voor kolommen) over de gegevens in dit bestand |
labels | Kaart[String,String] | Kaart met metadata over dit bestand |
Het schema van de verwijderactie is als volgt:
Veldnaam | Data Type | Omschrijving |
pad | snaar | Een absoluut of relatief pad naar een bestand dat uit de tabel moet worden verwijderd |
verwijderingTijdstempel | lang | De tijd dat de verwijdering plaatsvond, weergegeven als milliseconden sinds het tijdperk |
dataWijzigen | Boolean | Indien onwaar moeten de records in het verwijderde bestand zijn opgenomen in een of meer acties voor het toevoegen van bestanden in dezelfde versie |
uitgebreideFileMetadata | Boolean | Indien waar, zijn de velden partitionValues, size en tags aanwezig |
partitieWaarden | Kaart[String, String] | Een toewijzing van partitiekolom naar waarde voor dit bestand. Zie ook Partitiewaarde Serialisatie |
grootte | lang | De grootte van dit bestand in bytes |
labels | Kaart[String, String] | Kaart met metadata over dit bestand |
Het schema van de metagegevens bevat het bestandspad bij elke actie voor toevoegen/verwijderen en het Spark-leesproces hoeft geen volledige scan uit te voeren om de bestandslijsten te krijgen.
Als het schrijven mislukt zonder het transactielogboek bij te werken, aangezien de lezing van de consument altijd door de metadata gaat, worden die bestanden genegeerd.
Voordelen van migreren naar Spark 3.0
Naast het benutten van de voordelen van Delta Lake, verbeterde de migratie naar Spark 3.0 de gegevensverwerking op de volgende manieren:
Scheve deelname optimalisatie
Gegevensscheefheid is een toestand waarin de gegevens van een tabel ongelijk verdeeld zijn over partities in het cluster en de prestaties van query's ernstig kunnen verlagen, vooral die met joins. Scheefheid kan leiden tot extreme onbalans in het cluster, waardoor de gegevensverwerkingstijd toeneemt.
De data-skew-conditie kan hoofdzakelijk worden afgehandeld door drie benaderingen.
- De configuratie "spark.sql.shuffle.partitions" gebruiken voor meer parallellisme op meer gelijkmatig verdeelde gegevens.
- Verhogen van de drempelwaarde voor broadcast-hash-join met behulp van de configuratie spark.sql.autoBroadcastJoinThreshold tot de maximale grootte in bytes voor de tabel die moet worden uitgezonden naar alle worker-knoop punten tijdens het uitvoeren van een join.
- Key Salting (Voeg een prefix toe aan de scheve sleutels om dezelfde sleutel anders te maken en pas vervolgens de gegevensverdeling aan).
Spark 3.0 heeft een optimalisatie toegevoegd voor het automatisch verwerken van scheef aansluiten op basis van de runtime-statistieken met het nieuwe adaptieve uitvoeringsraamwerk.
Scheve partitie conditie
De uitdaging van scheve partities die bestond in de vorige versie van de Spark 2.4 had een enorme impact op de netwerktijd en uitvoeringstijd van een bepaalde taak. Bovendien waren de methoden om ermee om te gaan meestal handmatig. Spark 3.0 overwint deze uitdagingen.
De scheve partitie zal een impact hebben op het netwerkverkeer en op de uitvoeringstijd van de taak, aangezien deze specifieke taak veel meer gegevens te verwerken zal hebben. U moet ook weten hoe dit van invloed is op cyberbeveiliging, want netwerkverkeersvolume is iets waar hackers misbruik van maken.
De scheve joinpartitie wordt berekend op basis van de gegevensgrootte en het aantal rijen uit de runtime-kaartstatistieken.
Optimization
Uit de bovenstaande tabel komen de Dataframe-campagnes samen met de Dataframe-organisaties. Een van de partities (Partitie 0) van Organisaties is groot en scheef. Partitie 0 is het resultaat van 9 kaarten uit de vorige fase (Map-0 tot Map-8). De OptimizeSkewedJoin-regel van Spark splitst de partitie in 3 en maakt vervolgens 3 afzonderlijke taken die elk een gedeeltelijke partitie zijn van partitie 0 (Map-0 tot Map-2, Map-3 tot Map-5 en Map-6 tot Map-9) en sluit aan bij de Campagnes Partitie 0. Deze aanpak resulteert in extra kosten door het lezen van Partitie 0 van tabel Campagnes gelijk aan het aantal gedeeltelijke partities uit de tabel Organisaties.
Eindresultaat
Met behulp van Delta Lake en Spark 3.0 hebben we de volgende resultaten voor het advertentietechnologiebedrijf mogelijk gemaakt:
- De tijd van gegevensverwerking werd teruggebracht van 15 uur naar 5-6 uur
- 50% verlaging van de AWS EMR-kosten
- Voorkomen van gegevensverlies en het afsterven van processen, wat vaak voorkwam wanneer het geheugen van het systeem vol was of de verwerking werd gestopt vanwege een storing in het systeem
- Monitoring & Alerting-functies zijn geïnstalleerd om te waarschuwen als het proces mislukt
- Volledige orkestratie met behulp van Airflow om volledige automatisering en afhankelijkheidsbeheer tussen processen te bereiken
- &
- 9
- absoluut
- Actie
- Ad
- Extra
- Voordeel
- Alles
- Alle transacties
- onder
- analyse
- apache
- Apache Spark
- APIs
- auto
- Automatisering
- AWS
- Blog
- Campagnes
- vervoer
- uitdagen
- Controles
- Cloud
- Kolom
- Consumenten
- content
- coöperatieve
- Actueel
- Huidige toestand
- Cybersecurity
- gegevens
- Datameer
- gegevensverwerking
- gegevensopslag
- Databricks
- transactie
- Delta
- Engineering
- uitvoering
- Voordelen
- Velden
- Tot slot
- Stevig
- formaat
- Achtergrond
- vol
- glitch
- Kopen Google Reviews
- Hackers
- Behandeling
- hachee
- Hoe
- HTTPS
- reusachtig
- Impact
- industrieën
- inzichten
- IT
- mee
- sleutel
- toetsen
- laatste
- leiden
- Meldingen
- plaats
- groot
- maken
- management
- kaart
- Maps
- netwerk
- netwerk verkeer
- knooppunten
- Operations
- bestellen
- Overige
- prestatie
- inpluggen
- presenteren
- lezers
- lezing
- archief
- Resultaten
- lopen
- aftasten
- reeks
- Maat
- spleet
- SQL
- Land
- statistiek
- mediaopslag
- shop
- streaming
- system
- tech
- niet de tijd of
- verkeer
- transactie
- Transacties
- gebruikers
- waarde
- volume
- binnen