Az adatfeldolgozás javítása a Spark 3.0 és a Delta Lake segítségével

Forrás csomópont: 1013539

Összegyűjtése, feldolgozása és elemzése adatfolyam, az olyan iparágakban, mint az reklámtechnológia, intenzív adatkezeléssel jár. A naponta generált adatok hatalmasak (100 GB-os adat), és jelentős feldolgozási időt igényel az adatok további feldolgozása.

Egy másik kihívás az adatkészletek összekapcsolása, hogy betekintést nyerjünk. Minden folyamat átlagosan több mint 10 adatkészlettel és azonos számú összekapcsolással rendelkezik több kulccsal. Az egyes kulcsok partíciómérete minden futtatáskor megjósolhatatlan.

És végül, ha az adatok mennyisége bizonyos esetekben túllépi, előfordulhat, hogy a tárolóban elfogy a memória. Ez azt jelenti, hogy a folyamat az utolsó írások közepén elhal, így a fogyasztók egyértelműen elolvassák a bemeneti adatkereteket.

Ebben a blogban egy áttekintést nyújtunk Delta Lakes, előnyei, és hogyan lehet leküzdeni a fenti kihívásokat a Delta Lake-re való költözéssel és a Spark 3.0-ről a Spark 2.4-ra való migrációval. 

Mi az a Delta Lake?

A Databricksnél kifejlesztett „Delta Lake egy nyílt forráskódú adattároló réteg, amely a meglévő Data Lake-en fut, és teljes mértékben együttműködik Apache Spark API-k. Az ACID-tranzakciók megvalósításának és a méretezhető metaadat-kezelésnek köszönhetően a Delta Lakes egyesíteni tudja a streaming és a kötegelt adatfeldolgozást is. 

A Delta Lake verziószámú Parquet fájlokat használ az adatok felhőben való tárolására. A felhő helyének konfigurálása után a Delta Lake nyomon követi a tábla- vagy blobtároló-könyvtárban végrehajtott összes módosítást, hogy biztosítsa az ACID-tranzakciókat. 

A Delta Lakes használatának előnyei 

A Delta lake több ezer adat párhuzamos futtatását teszi lehetővé, kezeli az optimalizálási és partíciós kihívásokat, gyorsabb metaadat-műveleteket, tranzakciós naplót vezet és folyamatosan frissíti az adatokat. Az alábbiakban bemutatunk néhány fő előnyt: 

Delta Lake tranzakciós napló

A Delta Lake tranzakciós naplók csak hozzáfűzhető fájlok, és a Delta Lake táblán végrehajtott összes tranzakció rendezett rekordját tartalmazzák. A tranzakciós napló lehetővé teszi, hogy a különböző felhasználók párhuzamosan olvassák és írják az adott táblát. Az igazság egyetlen forrásaként vagy központi adattárként működik, amely naplózza a táblán a felhasználó által végzett összes módosítást. Fenntartja az atomitást, és folyamatosan figyeli a Delta Lake-en végrehajtott tranzakciókat.

Ahogy fentebb említettük, a Spark ellenőrzi a delta naplót minden új tranzakció után, ami után a Delta Lake biztosítja, hogy a felhasználó verziója mindig szinkronban legyen a főrekorddal. Azt is biztosítja, hogy ne kerüljön sor egymásnak ellentmondó módosításokra a táblázatban. Ha a folyamat összeomlik a delta napló frissítése előtt, a fájlok nem lesznek elérhetők egyetlen olvasási folyamat számára sem, mivel az olvasás mindig a tranzakciónaplón megy keresztül.

Tranzakciós napló munka és Atomic Commits

A Delta Lake minden tizedik elkövetésnél ellenőrzőpontot végez. Az ellenőrzőpontos fájl Parquet formátumban tartalmazza az adatok pillanatnyi állapotát, amely gyorsan olvasható. Ha egyszerre több felhasználó próbálja módosítani a táblát, a Delta Lake optimista párhuzamossági vezérléssel oldja meg az ütközéseket.

A metaadatok sémája a következő: 

OszloptípusLeírás
formátumhúrA táblázat formátuma, azaz „delta”.
idhúrA táblázat egyedi azonosítója
névhúrA tábla neve a metastore-ban meghatározottak szerint
leíráshúrA táblázat leírása.
elhelyezkedéshúrAz asztal helye
létrehozvaidőbélyegAmikor a táblázat készült
utoljára módosítvaidőbélyegMikor legutóbb módosították a táblázatot
partícióColumnskarakterláncok tömbjeA partíciós oszlopok neve, ha a tábla particionálva van
numFileshosszúA fájlok száma a táblázat legújabb verziójában
ingatlanaitString-string térképMinden tulajdonság beállítva ehhez a táblázathoz
minReaderVersionintAz olvasók minimális verziója (naplóprotokoll szerint), amely képes olvasni a táblázatot.
minWriterVersionintAz olvasók minimális verziója (naplóprotokoll szerint), amely képes írni a táblába.
Forrás: GitHub

Fájl hozzáadása és eltávolítása

Amikor egy fájlt ad hozzá vagy egy meglévő fájlt eltávolít, ezek a műveletek naplózásra kerülnek. A fájl elérési útja egyedi, és a benne lévő fájlok elsődleges kulcsának tekinthető. Ha új fájlt adnak hozzá egy olyan elérési úthoz, amely már szerepel a táblázatban, az elérési út statisztikái és egyéb metaadatai az előző verzióhoz képest frissülnek. Hasonlóképpen, az eltávolítási műveletet időbélyeg jelzi. Az eltávolítási művelet sírkőként marad a táblázatban, amíg le nem jár. A sírkő akkor jár le, amikor a TTL (Time-To-Live) túllépi.

Mivel egy adott Delta-fájlon belüli műveletek nem garantáltan sorrendben kerülnek végrehajtásra, nem érvényes, hogy több fájlművelet ugyanazzal az elérési úttal létezik egyetlen verzióban.

Az 'add' vagy 'remove' esetén a dataChange jelző hamis értékre állítható az egyidejű műveletek ütközésének minimalizálása érdekében.

A hozzáadási művelet sémája a következő:

Mező neveAdattípusLeírás
ösvényHúrRelatív elérési út a tábla gyökerétől a táblához hozzáadandó fájlig
partitionValuesTérkép[karakterlánc,karakterlánc]Leképezés a partíció oszlopától az értékig ehhez a fájlhoz. 
méretHosszúA fájl mérete bájtban
módosítási időHosszúA fájl létrehozásának ideje, ezredmásodpercben a korszak óta
dataChangelogikaiHa hamis, akkor a fájlnak már jelen kell lennie a táblázatban, vagy a hozzáadott fájl rekordjait egy vagy több eltávolítási műveletnek kell tartalmaznia ugyanabban a verzióban
statisztikaStatisztikai szerkezetStatisztikát tartalmaz (pl. szám, oszlopok min/max értékei) a fájl adatairól
címkékTérkép[karakterlánc,karakterlánc]A fájl metaadatait tartalmazó térkép

Az eltávolítási művelet sémája a következő:

Mező nevedátum típusLeírás
ösvényhúrEgy fájl abszolút vagy relatív elérési útja, amelyet el kell távolítani a táblából
törlésIdőbélyeghosszúA törlés időpontja, ezredmásodpercként a korszak óta
dataChangelogikaiHa hamis, az eltávolított fájl rekordjait egy vagy több fájl hozzáadása műveletnek tartalmaznia kell ugyanabban a verzióban
kiterjesztettFileMetadatalogikaiHa igaz, a partitionValues, size és tags mezők jelen vannak
partitionValuesTérkép[karakterlánc, karakterlánc]Leképezés a partíció oszlopától az értékig ehhez a fájlhoz. Lásd még Partícióértékek szerializálása
méretHosszúA fájl mérete bájtban
címkékTérkép[karakterlánc, karakterlánc]A fájl metaadatait tartalmazó térkép
Forrás: GitHub

A metaadatok sémája minden egyes hozzáadási/eltávolítási műveletnél tartalmazza a fájl elérési útját, és a Spark olvasási folyamatának nem kell teljes vizsgálatot végeznie a fájllisták eléréséhez.

Ha az írás sikertelen a tranzakciós napló frissítése nélkül, mivel a fogyasztó olvasata mindig átmegy a metaadatokon, ezeket a fájlokat figyelmen kívül hagyja. 

A Spark 3.0-ra való átállás előnyei

A Delta Lake előnyeinek kihasználásán túl a Spark 3.0-ra való átállás a következő módokon javította az adatfeldolgozást:

Ferde csatlakozás optimalizálás

Az adatok torzulása egy olyan állapot, amelyben a tábla adatai egyenetlenül oszlanak el a fürt partíciói között, és súlyosan ronthatja a lekérdezések teljesítményét, különösen a csatlakozásokkal rendelkezőkét. A ferdeség szélsőséges kiegyensúlyozatlansághoz vezethet a fürtben, ami növeli az adatfeldolgozási időt.

Az adatok torzítási feltétele elsősorban három megközelítéssel kezelhető.

  1. A „spark.sql.shuffle.partitions” konfiguráció használata az egyenletesebben elosztott adatok párhuzamosságának növelése érdekében.
  2. A szórásos hash csatlakozási küszöbérték növelése a spark.sql.autoBroadcastJoinThreshold konfiguráció segítségével a tábla maximális méretére bájtban, amelyet az összes dolgozó csomópontnak szét kell küldeni az összekapcsolás során.
  3. Billentyűsózás (Adjon előtagot a ferde billentyűkhöz, hogy ugyanazt a kulcsot különböztesse meg, majd állítsa be az adateloszlást).

A Spark 3.0 az új adaptív végrehajtási keretrendszerrel a futásidejű statisztikákon alapuló optimalizálást adott a ferde csatlakozás automatikus kezeléséhez.

Ferde partíció állapota

A ferde partíciók kihívása, amely a Spark 2.4 előző verziójában létezett, óriási hatással volt egy adott feladat hálózati és végrehajtási idejére. Sőt, a kezelési módszerek többnyire manuálisak voltak. A Spark 3.0 legyőzi ezeket a kihívásokat.

A ferde partíció hatással lesz a hálózati forgalomra és a feladat végrehajtási idejére, mivel ennél a feladatnál sokkal több feldolgozandó adat lesz. Azt is tudnia kell, hogy ez hogyan érinti a kiberbiztonságot, hiszen A hálózati forgalom nagysága olyasvalami, amit a hackerek kihasználnak.

A ferde csatlakozási partíciót a rendszer az adatméret és a futásidejű térképstatisztikák sorszáma alapján számítja ki.

Optimalizálás

Átvett: Apache Spark Jira

A fenti táblázatból a Dataframe-kampányok csatlakoznak a Dataframe-szervezetekhez. A Szervezetek egyik partíciója (0. partíció) nagy és ferde. A 0. partíció 9 térkép eredménye az előző szakaszból (Map-0 - Map-8). A Spark OptimizeSkewedJoin szabálya 3 részre osztja a partíciót, majd 3 különálló feladatot hoz létre, amelyek mindegyike egy részleges partíció a 0-s partícióról (Map-0 a Map-2-re, Map-3 a Map-5-re és Map-6 a Map-9-re). Ez a megközelítés többletköltséget eredményez a Kampányok tábla 0. partíciójának olvasásával, amely megegyezik a Szervezetek tábla részleges partícióinak számával.

Végeredmény

A Delta Lake és a Spark 3.0 használatával a következő eredményeket engedélyeztük a hirdetéstechnológiai cég számára:

  • Az adatfeldolgozási idő 15 óráról 5-6 órára csökkent
  • 50%-os csökkenés az AWS EMR költségében
  • Az adatvesztés és a folyamatok halálának megelőzése, ami gyakran előfordult, amikor a rendszer memóriája megfogyott, vagy a feldolgozás leállt egy rendszerhiba miatt
  • Felügyeleti és riasztási funkciókat telepítettek, hogy értesítsenek, ha a folyamat meghiúsul
  • Teljes hangszerelés az Airflow használatával a folyamatok közötti teljes automatizálás és függőségkezelés elérése érdekében
Forrás: https://www.smartdatacollective.com/improving-data-processing-with-spark-3-delta-lake/

Időbélyeg:

Még több SmartData Collective