Forbedre databehandling med Spark 3.0 og Delta Lake

Kilde node: 1013539

Innsamling, bearbeiding og gjennomføring av analyser på streaming av data, i bransjer som ad-tech involverer intens datateknikk. Dataene som genereres daglig er enorme (100-vis med GB-data) og krever betydelig behandlingstid for å behandle dataene for påfølgende trinn.

En annen utfordring er sammenføyningen av datasett for å utlede innsikt. Hver prosess har i gjennomsnitt mer enn 10 datasett og like mange sammenføyninger med flere nøkler. Partisjonsstørrelsen for hver nøkkel er uforutsigbar ved hver kjøring.

Og til slutt, hvis datamengden overskrider ved visse anledninger, kan lagringen gå tom for minne. Dette betyr at prosessen vil dø midt i den siste skrivingen, noe som får forbrukerne til å lese inndatarammene tydelig.

I denne bloggen vil vi dekke en oversikt over Delta Lakes, fordelene og hvordan utfordringene ovenfor kan overvinnes ved å flytte til Delta Lake og migrere til Spark 3.0 fra Spark 2.4. 

Hva er Delta Lake?

Utviklet hos Databricks, "Delta Lake er et åpen kildekode-datalagringslag som kjører på den eksisterende Data Lake og er fullt samarbeidende med Apache Spark APIer. Sammen med muligheten til å implementere ACID-transaksjoner og skalerbar metadatahåndtering, kan Delta Lakes også forene streaming og batchdatabehandling. 

Delta Lake bruker versjonerte Parquet-filer for å lagre data i skyen. Når skyplasseringen er konfigurert, sporer Delta Lake alle endringene som er gjort i tabellen eller blob-butikkkatalogen for å gi ACID-transaksjoner. 

Fordeler med å bruke Delta Lakes 

Delta lake lar tusenvis av data kjøre parallelt, adresserer optimaliserings- og partisjonsutfordringer, raskere metadataoperasjoner, opprettholder en transaksjonslogg og fortsetter å oppdatere dataene. Nedenfor diskuterer vi noen store fordeler: 

Delta Lake Transaksjonslogg

Delta Lake transaksjonslogger er en fil som kun kan legges til og inneholder en ordnet registrering av alle transaksjoner utført på Delta Lake-tabellen. Transaksjonsloggen lar ulike brukere lese og skrive til den gitte tabellen parallelt. Den fungerer som en enkelt kilde til sannhet eller det sentrale depotet som logger alle endringer som er gjort i tabellen av brukeren. Den opprettholder atomitet og ser kontinuerlig på transaksjonene som utføres på Delta Lake.

Som nevnt ovenfor, sjekker Spark deltaloggen for nye transaksjoner, hvoretter Delta Lake sikrer at brukerens versjon alltid er synkronisert med hovedposten. Det sikrer også at det ikke blir gjort noen motstridende endringer i tabellen. Hvis prosessen krasjer før deltaloggen oppdateres, vil ikke filene være tilgjengelige for noen leseprosesser, da lesningene alltid går gjennom transaksjonsloggen.

Transaksjonsloggarbeid og Atomic Commits

Delta lake gjør et sjekkpunkt for hver tiende commit. Den sjekkpunktfilen inneholder den nåværende tilstanden til dataene i Parkett-formatet som kan leses raskt. Når flere brukere prøver å endre tabellen samtidig, løser Delta Lake konfliktene ved å bruke optimistisk samtidighetskontroll.

Skjemaet for metadataene er som følger: 

KolonnetypenBeskrivelse
formatstringFormatet på tabellen, det vil si "delta".
idstringUnik ID for bordet
navnstringNavnet på tabellen som definert i metalageret
beskrivelsestringBeskrivelse av tabellen.
plasseringstringPlassering av bordet
opprettet kltidsstempelNår tabellen ble opprettet
sist endrettidsstempelNår tabellen sist ble endret
partisjonskolonnerrekke strengerNavn på partisjonskolonnene hvis tabellen er partisjonert
numFileslangAntall filene i den nyeste versjonen av tabellen
egenskaperStrenge-streng kartAlle egenskaper angitt for denne tabellen
minReaderVersjonintMinimum versjon av lesere (i henhold til loggprotokollen) som kan lese tabellen.
minWriterVersionintMinimum versjon av lesere (i henhold til loggprotokollen) som kan skrive til tabellen.
kilde: GitHub

Legg til og fjern fil

Når en fil legges til eller en eksisterende fil fjernes, logges disse handlingene. Filbanen er unik og anses som hovednøkkelen for settet med filer i den. Når en ny fil legges til på en bane som allerede finnes i tabellen, oppdateres statistikk og andre metadata på banen fra forrige versjon. På samme måte indikeres fjerningshandlingen med tidsstempel. En fjerningshandling forblir i tabellen som en gravstein til den har utløpt. En gravstein utløper når TTL (Time-To-Live) overskrider.

Siden handlinger innenfor en gitt Delta-fil ikke er garantert å bli brukt i rekkefølge, er det ikke gyldig for flere filoperasjoner med samme bane å eksistere i en enkelt versjon.

DataChange-flagget på enten en "legg til" eller "fjern" kan settes til falsk for å minimere samtidige operasjonskonflikter.

Skjemaet for add-handlingen er som følger:

FeltnavnData-typeBeskrivelse
banenStringEn relativ bane, fra roten av tabellen, til en fil som skal legges til tabellen
partisjonsverdierKart[String,String]Et kart fra partisjonskolonne til verdi for denne filen. 
størrelseLangStørrelsen på denne filen i byte
modifikasjonstidLangTiden denne filen ble opprettet, som millisekunder siden epoken
dataendringbooleanNår false må filen allerede være tilstede i tabellen, eller postene i den tilføyde filen må inneholde en eller flere fjerningshandlinger i samme versjon
statsStatistikkstrukturInneholder statistikk (f.eks. antall, min/maks verdier for kolonner) om dataene i denne filen
tagsKart[String,String]Kart som inneholder metadata om denne filen

Skjemaet for fjerningshandlingen er som følger:

FeltnavnData typenBeskrivelse
banenstringEn absolutt eller relativ bane til en fil som bør fjernes fra tabellen
slettingTimestamplangTiden slettingen skjedde, representert som millisekunder siden epoken
dataendringbooleanNår det er usann, må postene i den fjernede filen være inneholdt i en eller flere add-filhandlinger i samme versjon
utvidet filmetadatabooleanNår sant er feltene partisjonsverdier, størrelse og koder tilstede
partisjonsverdierKart[String, String]Et kart fra partisjonskolonne til verdi for denne filen. Se også partisjonsverdiserialisering
størrelseLangStørrelsen på denne filen i byte
tagsKart[String, String]Kart som inneholder metadata om denne filen
kilde: GitHub

Skjemaet til metadataene inneholder filbanen for hver legg til/fjern handling, og Spark-leseprosessen trenger ikke å gjøre en full skanning for å få filoppføringene.

Hvis en skriving mislykkes uten å oppdatere transaksjonsloggen, siden forbrukerens lesing alltid vil gå gjennom metadataene, vil disse filene bli ignorert. 

Fordeler med å migrere til Spark 3.0

Bortsett fra å utnytte fordelene med Delta Lake, forbedret migrering til Spark 3.0 databehandling på følgende måter:

Optimalisering av skjev sammenføyning

Dataskjevhet er en tilstand der en tabells data er ujevnt fordelt mellom partisjoner i klyngen og kan kraftig nedgradere ytelsen til spørringer, spesielt de med sammenføyninger. Skjevheter kan føre til ekstrem ubalanse i klyngen og dermed øke databehandlingstiden.

Dataskjevtilstanden kan håndteres hovedsakelig ved tre tilnærminger.

  1. Bruke konfigurasjonen "spark.sql.shuffle.partitions" for økt parallellitet på mer jevnt fordelte data.
  2. Å øke broadcast-hash join-terskelen ved å bruke konfigurasjonen spark.sql.autoBroadcastJoinThreshold til maksimal størrelse i byte for tabellen som må kringkastes til alle arbeidernoder under utføring av en sammenføyning.
  3. Key Salting (Legg til prefiks til de skjeve tastene for å gjøre den samme nøkkelen forskjellig og juster deretter datafordelingen).

Spark 3.0 har lagt til en optimalisering for automatisk håndtering av skjev sammenføyning basert på kjøretidsstatistikken med det nye rammeverket for adaptiv utførelse.

Skjev partisjonstilstand

Utfordringen med skjeve partisjoner som eksisterte i den forrige versjonen av Spark 2.4 hadde en enorm innvirkning på nettverkstiden og utførelsestiden for en bestemt oppgave. Dessuten var metodene for å håndtere det for det meste manuelle. Spark 3.0 overvinner disse utfordringene.

Den skjeve partisjonen vil ha en innvirkning på nettverkstrafikken og på oppgavegjennomføringstiden, siden denne spesielle oppgaven vil ha mye mer data å behandle. Du må også vite hvordan dette påvirker cybersikkerhet, siden nettverkstrafikkvolum er noe hackere drar nytte av.

Den skjeve sammenføyningspartisjonen beregnes av datastørrelsen og radantallet fra kjøretidskartstatistikken.

Optimalisering

Tilpasset fra: Apache Spark Jira

Fra tabellen ovenfor kobles datarammekampanjene sammen med datarammeorganisasjonene. En av partisjonene (partisjon 0) fra Organisasjoner er stor og skjev. Partisjon 0 er resultatet av 9 kart fra forrige trinn (Kart-0 til Kart-8). Sparks OptimizeSkewedJoin-regel vil dele partisjonen i 3 og deretter lage 3 separate oppgaver som hver er en delvis partisjon fra partisjon 0 (Map-0 til Map-2, Map-3 til Map-5, og Map-6 to Map-9) og slutter seg til Kampanjepartisjon 0. Denne tilnærmingen resulterer i ekstra kostnader ved å lese partisjon 0 av tabellkampanjer lik antall delpartisjoner fra tabellorganisasjonene.

Sluttresultat

Ved å bruke Delta Lake og Spark 3.0, aktivert vi følgende resultater for annonseteknologifirmaet:

  • Tiden for databehandling ble redusert fra 15 timer til 5-6 timer
  • 50 % reduksjon i AWS EMR-kostnad
  • Forhindre tap av data og død av prosesser som var en hyppig forekomst når systemet gikk tom for minne eller behandlingen stoppet på grunn av en feil i systemet
  • Overvåkings- og varslingsfunksjoner ble installert for å varsle i tilfelle prosessen mislykkes
  • Fullfør orkestrering med Airflow for å oppnå full automatisering og avhengighetsstyring mellom prosessene
Kilde: https://www.smartdatacollective.com/improving-data-processing-with-spark-3-delta-lake/

Tidstempel:

Mer fra SmartData Collective