Îmbunătățirea procesării datelor cu Spark 3.0 și Delta Lake

Nodul sursă: 1013539

Colectarea, prelucrarea și efectuarea analizei pe streaming de date, în industrii precum ad-tech implică o inginerie intensă a datelor. Datele generate zilnic sunt uriașe (100 de GB date) și necesită un timp de procesare semnificativ pentru a procesa datele pentru pașii următori.

O altă provocare este îmbinarea seturilor de date pentru a obține informații. Fiecare proces are în medie mai mult de 10 seturi de date și un număr egal de îmbinări cu chei multiple. Dimensiunea partiției pentru fiecare cheie este imprevizibilă la fiecare rulare.

Și, în sfârșit, dacă cantitatea de date depășește în anumite ocazii, stocarea poate rămâne fără memorie. Aceasta înseamnă că procesul ar muri în mijlocul scrierilor finale, făcând consumatorii să citească distinct cadrele de date de intrare.

În acest blog, vom acoperi o prezentare generală a Lacurile Deltei, avantajele sale și modul în care provocările de mai sus pot fi depășite prin mutarea în Delta Lake și migrarea la Spark 3.0 din Spark 2.4. 

Ce este Delta Lake?

Dezvoltat la Databricks, „Delta Lake este un strat de stocare de date open-source care rulează pe Data Lake existent și cooperează pe deplin cu Apache Spark API-uri. Pe lângă capacitatea de a implementa tranzacții ACID și gestionarea scalabilă a metadatelor, Delta Lakes poate, de asemenea, unifica fluxul și procesarea datelor în lot”. 

Delta Lake folosește fișiere Parquet versiunea pentru a stoca date în cloud. Odată ce locația cloud este configurată, Delta Lake urmărește toate modificările făcute în directorul de tabel sau magazin blob pentru a furniza tranzacții ACID. 

Avantajele folosirii Delta Lakes 

Delta lake permite mii de date să ruleze în paralel, soluționarea provocărilor de optimizare și partiție, operațiuni mai rapide de metadate, menține un jurnal tranzacțional și continuă să actualizeze datele. Mai jos discutăm câteva avantaje majore: 

Jurnalul tranzacțiilor Delta Lake

Jurnalele de tranzacții Delta Lake sunt un fișier care poate fi doar atașat și conțin o înregistrare ordonată a tuturor tranzacțiilor efectuate pe tabelul Delta Lake. Jurnalul de tranzacții permite diverșilor utilizatori să citească și să scrie în tabelul dat în paralel. Acționează ca o singură sursă de adevăr sau ca depozit central care înregistrează toate modificările aduse tabelului de către utilizator. Menține atomicitatea și urmărește continuu tranzacțiile efectuate pe Lacul Delta.

După cum sa menționat mai sus, Spark verifică jurnalul delta pentru orice tranzacții noi, după care Delta Lake se asigură că versiunea utilizatorului este întotdeauna sincronizată cu înregistrarea principală. De asemenea, asigură că nu se fac modificări conflictuale în tabel. Dacă procesul se blochează înainte de a actualiza jurnalul delta, fișierele nu vor fi disponibile pentru niciun proces de citire, deoarece citirile trec întotdeauna prin jurnalul de tranzacții.

Jurnal de tranzacții de lucru și comitere atomice

Lacul Delta face un punct de control la fiecare zece comisii. Fișierul cu puncte de control conține starea curentă a datelor în formatul Parquet care poate fi citit rapid. Când mai mulți utilizatori încearcă să modifice tabelul în același timp, Delta Lake rezolvă conflictele folosind un control optimist al concurenței.

Schema metadatelor este următoarea: 

ColoanăTipDescriere
formatşirFormatul tabelului, adică „delta”.
idşirID unic al tabelului
numeşirNumele tabelului, așa cum este definit în metastore
descriereşirDescrierea tabelului.
locaţieşirLocația mesei
creat latimestamp-ulCând a fost creat tabelul
modificat ultima datătimestamp-ulCând tabelul a fost modificat ultima dată
partitionColumnsmatrice de șiruriNumele coloanelor de partiție dacă tabelul este partiționat
numFileslungNumărul de fișiere din cea mai recentă versiune a tabelului
proprietăţiHartă șir-șirToate proprietățile setate pentru acest tabel
minReaderVersionintVersiune minimă de cititoare (conform protocolului de jurnal) care pot citi tabelul.
minWriterVersionintVersiune minimă de cititoare (conform protocolului de jurnal) care pot scrie în tabel.
Sursa: GitHub

Adăugați și eliminați fișierul

Ori de câte ori un fișier este adăugat sau un fișier existent este eliminat, aceste acțiuni sunt înregistrate. Calea fișierului este unică și este considerată cheia primară pentru setul de fișiere din interiorul acesteia. Când un fișier nou este adăugat pe o cale care este deja prezentă în tabel, statisticile și alte metadate de pe cale sunt actualizate față de versiunea anterioară. În mod similar, acțiunea de eliminare este indicată de marcaj de timp. O acțiune de eliminare rămâne în tabel ca piatră funerară până când expiră. O piatră funerară expiră atunci când TTL (Time-To-Live) depășește.

Deoarece acțiunile dintr-un anumit fișier Delta nu sunt garantate a fi aplicate în ordine, nu este valabil pentru mai multe operațiuni cu fișiere cu aceeași cale să existe într-o singură versiune.

Indicatorul dataChange fie pentru „adăugare” fie pentru „eliminare” poate fi setat la false pentru a minimiza conflictele de operațiuni concurente.

Schema acțiunii de adăugare este următoarea:

Numele domeniuluiTipul de dateDescriere
caleŞirO cale relativă, de la rădăcina tabelului, la un fișier care ar trebui adăugat la tabel
partitionValuesHartă[String,String]O hartă de la coloana partiției la valoarea pentru acest fișier. 
mărimeaLungDimensiunea acestui fișier în octeți
timp de modificareLungOra a fost creată acest fișier, ca milisecunde de la epocă
dataChangebooleanCând este fals, fișierul trebuie să fie deja prezent în tabel sau înregistrările din fișierul adăugat trebuie să fie conținute într-una sau mai multe acțiuni de eliminare în aceeași versiune
StatisticiStatistici StructConține statistici (de exemplu, număr, valori min/max pentru coloane) despre datele din acest fișier
tag-uriHartă[String,String]Hartă care conține metadate despre acest fișier

Schema acțiunii de eliminare este următoarea:

Numele domeniuluiDate TipDescriere
caleşirO cale absolută sau relativă către un fișier care ar trebui eliminată din tabel
deletionTimestamplungOra în care a avut loc ștergerea, reprezentată ca milisecunde de la epocă
dataChangebooleanCând sunt false, înregistrările din fișierul eliminat trebuie să fie conținute în una sau mai multe acțiuni de adăugare a fișierului în aceeași versiune
extendedFileMetadatabooleanCând este adevărat, câmpurile partitionValues, dimensiunea și etichetele sunt prezente
partitionValuesHartă[String, String]O hartă de la coloana partiției la valoarea pentru acest fișier. Consultați și Serializarea valorii partiției
mărimeaLungDimensiunea acestui fișier în octeți
tag-uriHartă[String, String]Hartă care conține metadate despre acest fișier
Sursa: GitHub

Schema metadatelor conține calea fișierului pentru fiecare acțiune de adăugare/eliminare, iar procesul de citire Spark nu trebuie să facă o scanare completă pentru a obține listele de fișiere.

Dacă o scriere eșuează fără actualizarea jurnalului de tranzacții, deoarece citirea consumatorului va trece întotdeauna prin metadate, acele fișiere vor fi ignorate. 

Avantajele migrării la Spark 3.0

Pe lângă valorificarea beneficiilor Delta Lake, migrarea la Spark 3.0 a îmbunătățit procesarea datelor în următoarele moduri:

Optimizare asociere înclinată

Deformarea datelor este o condiție în care datele unui tabel sunt distribuite inegal între partițiile din cluster și pot reduce grav performanța interogărilor, în special a celor cu îmbinări. Deformarea poate duce la un dezechilibru extrem în cluster, crescând astfel timpul de procesare a datelor.

Condiția de asimetrie a datelor poate fi gestionată în principal prin trei abordări.

  1. Utilizarea configurației „spark.sql.shuffle.partitions” pentru un paralelism sporit pe date mai uniform distribuite.
  2. Creșterea pragului de îmbinare hash de difuzare folosind configurația spark.sql.autoBroadcastJoinThreshold la dimensiunea maximă în octeți pentru tabelul care trebuie difuzat către toate nodurile de lucru în timpul efectuării unei îmbinări.
  3. Key Salting (Adăugați prefix la tastele oblice pentru a face aceeași cheie diferită și apoi ajustați distribuția datelor).

Spark 3.0 a adăugat o optimizare pentru gestionarea automată a asocierii oblice pe baza statisticilor de rulare cu noul cadru de execuție adaptiv.

Condiție de partiție oblică

Provocarea partițiilor distorsionate care exista în versiunea anterioară a Spark 2.4 a avut un impact uriaș asupra timpului de rețea și a timpului de execuție a unei anumite sarcini. Mai mult decât atât, metodele de a face față acesteia au fost în mare parte manuale. Spark 3.0 depășește aceste provocări.

Partiția înclinată va avea un impact asupra traficului de rețea și asupra timpului de execuție a sarcinii, deoarece această sarcină particulară va avea mult mai multe date de procesat. De asemenea, trebuie să știți cum afectează acest lucru securitatea cibernetică, deoarece volumul traficului de rețea este ceva de care profită hackerii.

Partiția de îmbinare înclinată este calculată prin dimensiunea datelor și numărul rândurilor din statisticile hărții de rulare.

Optimizare

Luat din:Apache Spark Jira

Din tabelul de mai sus, campaniile Dataframe se alătură organizațiilor Dataframe. Una dintre partițiile (Partiția 0) din Organizații este mare și înclinată. Partiția 0 este rezultatul a 9 hărți din etapa anterioară (Harta-0 la Harta-8). Regula OptimizeSkewedJoin a lui Spark va împărți partiția în 3 și apoi va crea 3 sarcini separate, fiecare fiind o partiție parțială de la Partiția 0 (Harta-0 la Harta-2, Harta-3 la Harta-5 și Harta-6 la Harta-9) și se alătură cu Partiția 0 de campanii. Această abordare are ca rezultat un cost suplimentar prin citirea Partiției 0 a campaniilor de tabel egal cu numărul de partiții parțiale din organizațiile tabelului.

Rezultat final

Folosind Delta Lake și Spark 3.0, am activat următoarele rezultate pentru firma de tehnologie publicitară:

  • Timpul de prelucrare a datelor a fost redus de la 15 ore la 5-6 ore
  • Reducere cu 50% a costului AWS EMR
  • Prevenirea pierderii datelor și a morții proceselor, care a fost o apariție frecventă când sistemul a ieșit din memorie sau procesarea s-a oprit din cauza unei erori în sistem
  • Funcțiile de monitorizare și alertă au fost instalate pentru a notifica în cazul în care procesul eșuează
  • Orchestrare completă folosind Airflow pentru a obține automatizarea completă și gestionarea dependenței între procese
Sursa: https://www.smartdatacollective.com/improving-data-processing-with-spark-3-delta-lake/

Timestamp-ul:

Mai mult de la Colectiv SmartData