Покращення обробки даних за допомогою Spark 3.0 та Delta Lake

Вихідний вузол: 1013539

Збір, обробка та проведення аналізу потокове передавання даних, в таких галузях, як рекламна техніка, передбачає інтенсивне інжиніринг даних. Щоденно генеруються дані величезні (100 ГБ даних) і вимагають значного часу на обробку даних для подальших кроків.

Ще однією проблемою є об’єднання наборів даних для отримання уявлень. Кожен процес у середньому має більше 10 наборів даних і рівну кількість об’єднань з декількома ключами. Розмір розділу для кожного ключа непередбачуваний при кожному запуску.

І, нарешті, якщо в певних випадках кількість даних перевищує, у пам’яті може закінчитися пам’ять. Це означає, що процес загине в середині останніх записів, змусивши споживачів чітко читати фрейми вхідних даних.

У цьому блозі ми розглянемо огляд Дельта -озера, його переваги та способи подолання вищезазначених проблем, перейшовши на озеро Дельта та перейшовши на Spark 3.0 із Spark 2.4. 

Що таке озеро Дельта?

Розроблений в Databricks, “Delta Lake-це рівень зберігання даних з відкритим кодом, який працює на існуючому озері даних і повністю співпрацює з Apache Spark API. Поряд із можливістю реалізації транзакцій ACID та масштабованою обробкою метаданих, Delta Lakes також може уніфікувати потокову та пакетну обробку даних ». 

Delta Lake використовує версійні файли Parquet для зберігання даних у хмарі. Після того, як розташування хмари налаштовано, Delta Lake відстежує всі зміни, внесені до таблиці або каталогу магазинів blob, щоб забезпечити транзакції ACID. 

Переваги використання Дельта -озер 

Delta lake дозволяє паралельно виконувати тисячі даних, вирішувати проблеми оптимізації та розподілу, прискорювати операції з метаданими, вести журнал транзакцій та постійно оновлювати дані. Нижче ми розглянемо кілька основних переваг: 

Журнал транзакцій озера Дельта

Журнали транзакцій Delta Lake є файлом лише для додавання та містять упорядкований запис усіх транзакцій, виконаних у таблиці Delta Lake. Журнал транзакцій дозволяє різним користувачам паралельно читати та записувати у зазначену таблицю. Він діє як єдине джерело правди або центральне сховище, яке реєструє всі зміни, внесені до таблиці користувачем. Він підтримує атомність і постійно стежить за транзакціями, що виконуються на озері Дельта.

Як згадувалося вище, Spark перевіряє дельта -журнал на наявність нових транзакцій, після чого Delta Lake гарантує, що версія користувача завжди синхронізована з основним записом. Він також гарантує, що в таблицю не вносяться суперечливі зміни. Якщо процес припиняє роботу перед оновленням дельта -журналу, файли не будуть доступні для будь -яких процесів читання, оскільки читання завжди проходять через журнал транзакцій.

Робочий журнал транзакцій та атомні коміти

Дельта -озеро робить контрольний пункт кожні десять комітів. Файл контрольної точки містить поточний стан даних у форматі Parquet, який можна швидко прочитати. Коли кілька користувачів намагаються змінити таблицю одночасно, Delta Lake вирішує конфлікти, використовуючи оптимістичний контроль паралельності.

Схема метаданих така: 

КолонкатипОпис
форматрядокФормат таблиці, тобто "дельта".
idрядокУнікальний ідентифікатор таблиці
ім'ярядокНазва таблиці, визначеної в метасховищі
descriptionрядокОпис таблиці.
розташуваннярядокРозташування столу
створеновідмітка часуКоли таблиця була створена
lastModifiedвідмітка часуКоли остання зміна таблиці
partitionColumnsмасив рядківНазви стовпців розділів, якщо таблиця розділена
numFilesдовгоКількість файлів в останній версії таблиці
властивостіРядково-струнна картаУсі властивості, встановлені для цієї таблиці
minReaderVersionIntМінімальна версія читачів (згідно протоколу журналу), які можуть читати таблицю.
minWriterVersionIntМінімальна версія зчитувачів (згідно протоколу журналу), яка може записувати в таблицю.
джерело: GitHub

Додати та видалити файл

Щоразу, коли додається файл або видаляється наявний файл, ці дії реєструються. Шлях до файлу є унікальним і вважається первинним ключем для набору файлів всередині нього. Коли до шляху, який уже є у таблиці, додається новий файл, статистика та інші метадані на шляху оновлюються з попередньої версії. Аналогічно, дія видалення позначається міткою часу. Дія видалення залишається у таблиці як надгробний пам'ятник, поки не закінчиться термін її дії. Термін дії надгробок закінчується, коли TTL (Time-To-Live) перевищує.

Оскільки дії всередині даного файлу Delta не гарантуються належним чином, вони не дійсні для кількох операцій над файлами з однаковим шляхом, які існують в одній версії.

Прапор dataChange для будь -якого "додати" або "видалити" може бути встановлений як false, щоб мінімізувати конфлікти одночасних операцій.

Схема дії додавання така:

Назва поляТип данихОпис
шляхрядокВідносний шлях від кореня таблиці до файлу, який слід додати до таблиці
partitionValuesКарта [рядок, рядок]Карта зі стовпця розділу до значення для цього файлу. 
розмірДовгоРозмір цього файлу в байтах
час модифікаціїДовгоЧас створення цього файлу, як мілісекунди з епохи
dataChangeBooleanЯкщо значення false, файл уже повинен бути присутнім у таблиці, або записи у доданому файлі повинні міститися в одній або кількох діях видалення в тій же версії
статистикаСтатистична структураМістить статистику (наприклад, кількість, мінімальні/максимальні значення для стовпців) щодо даних у цьому файлі
тегиКарта [рядок, рядок]Карта, що містить метадані про цей файл

Схема дії видалення така:

Назва полядані типОпис
шляхрядокАбсолютний або відносний шлях до файлу, який слід видалити з таблиці
deletemeTimestampдовгоЧас видалення, представлений у мілісекундах від епохи
dataChangeBooleanЯкщо значення false, записи у видаленому файлі повинні міститися в одній або кількох діях файлу додавання в тій же версії
extendedFileMetadataBooleanЯкщо значення true, поля мають значення partitionValues, розмір та теги
partitionValuesКарта [рядок, рядок]Карта зі стовпця розділу до значення для цього файлу. Див. Також Серіалізація значення розділів
розмірДовгоРозмір цього файлу в байтах
тегиКарта [рядок, рядок]Карта, що містить метадані про цей файл
джерело: GitHub

Схема метаданих містить шлях до файлу для кожної дії додавання/видалення, а процес читання Spark не потребує повного сканування, щоб отримати список файлів.

Якщо запис не відбудеться без оновлення журналу транзакцій, оскільки читання споживача завжди буде проходити через метадані, ці файли будуть проігноровані. 

Переваги переходу на Spark 3.0

Окрім використання переваг Delta Lake, перехід на Spark 3.0 покращив обробку даних такими способами:

Оптимізація перекосу приєднання

Перекос даних - це стан, при якому дані таблиці розподіляються нерівномірно між розділами кластера і можуть серйозно знизити продуктивність запитів, особливо тих, що мають об’єднання. Перекос може призвести до крайнього дисбалансу в кластері, тим самим збільшивши час обробки даних.

Умова перекосу даних може бути вирішена переважно трьома підходами.

  1. Використання конфігурації “spark.sql.shuffle.partitions” для збільшення паралелізму щодо більш рівномірно розподілених даних.
  2. Збільшення порогу сполучення хеш -трансляції за допомогою конфігурації spark.sql.autoBroadcastJoinThreshold до максимального розміру в байтах для таблиці, яку потрібно передавати на всі робочі вузли під час виконання з'єднання.
  3. Соління ключів (додайте префікс до перекошених ключів, щоб той самий ключ відрізнявся, а потім відрегулюйте розподіл даних).

Spark 3.0 додав оптимізацію до автоматичної обробки косого з'єднання на основі статистики виконання за допомогою нової адаптивної основи виконання.

Умова перекосу перегородок

Виклик перекошених розділів, що існував у попередній версії Spark 2.4, мав величезний вплив на час роботи в мережі та час виконання певного завдання. Більш того, методи боротьби з цим були переважно ручними. Spark 3.0 долає ці виклики.

Перекошений розділ матиме вплив на мережевий трафік та на час виконання завдання, оскільки це конкретне завдання матиме набагато більше даних для обробки. Вам також потрібно знати, як це впливає на кібербезпеку, оскільки обсяг мережевого трафіку - це те, чим користуються хакери.

Перекошений розділ з'єднання обчислюється за розміром даних та кількістю рядків зі статистики карти виконання.

Оптимізація

Адаптовано з: Apache Spark Jira

З наведеної вище таблиці кампанії Dataframe об’єднуються з організаціями Dataframe. Один із розділів (Розділ 0) з організацій великий і перекошений. Розділ 0 є результатом 9 карт попереднього етапу (Карта-0 до Карта-8). Правило OptimizeSkewedJoin Spark розділить розділ на 3, а потім створить 3 окремі завдання, кожне з яких є частковим розділом з розділу 0 (карта-0 до карти-2, карта-3 до карти-5 і карта-6 до карти-9) та приєднується до Розділу Кампаній 0. Цей підхід призводить до додаткових витрат, прочитавши Розділ 0 табличних Кампаній, що дорівнює кількості часткових розділів з таблиць Організацій.

Кінцевий результат

Використовуючи Delta Lake і Spark 3.0, ми дозволили такі результати для фірми з рекламної техніки:

  • Час обробки даних скорочено з 15 годин до 5-6 годин
  • Зниження вартості AWS EMR на 50%
  • Запобігання втраті даних та загибелі процесів, що було частим явищем, коли система втрачала пам'ять або обробка припинялася через збій у системі
  • Функції моніторингу та оповіщення були встановлені для сповіщення у разі збою процесу
  • Повна оркестрація за допомогою Airflow для досягнення повної автоматизації та управління залежностями між процесами
Джерело: https://www.smartdatacollective.com/improving-data-processing-with-spark-3-delta-lake/

Часова мітка:

Більше від Колектив SmartData