Покращення обробки даних за допомогою Spark 3.0 та Delta Lake

Вихідний вузол: 1013539

Збір, обробка та проведення аналізу потокове передавання даних, в таких галузях, як рекламна техніка, передбачає інтенсивне інжиніринг даних. Щоденно генеруються дані величезні (100 ГБ даних) і вимагають значного часу на обробку даних для подальших кроків.

Ще однією проблемою є об’єднання наборів даних для отримання уявлень. Кожен процес у середньому має більше 10 наборів даних і рівну кількість об’єднань з декількома ключами. Розмір розділу для кожного ключа непередбачуваний при кожному запуску.

І, нарешті, якщо в певних випадках кількість даних перевищує, у пам’яті може закінчитися пам’ять. Це означає, що процес загине в середині останніх записів, змусивши споживачів чітко читати фрейми вхідних даних.

У цьому блозі ми розглянемо огляд Дельта -озера, його переваги та способи подолання вищезазначених проблем, перейшовши на озеро Дельта та перейшовши на Spark 3.0 із Spark 2.4. 

Що таке озеро Дельта?

Розроблений в Databricks, “Delta Lake-це рівень зберігання даних з відкритим кодом, який працює на існуючому озері даних і повністю співпрацює з Apache Spark API. Поряд із можливістю реалізації транзакцій ACID та масштабованою обробкою метаданих, Delta Lakes також може уніфікувати потокову та пакетну обробку даних ». 

Delta Lake використовує версійні файли Parquet для зберігання даних у хмарі. Після того, як розташування хмари налаштовано, Delta Lake відстежує всі зміни, внесені до таблиці або каталогу магазинів blob, щоб забезпечити транзакції ACID. 

Переваги використання Дельта -озер 

Delta lake дозволяє паралельно виконувати тисячі даних, вирішувати проблеми оптимізації та розподілу, прискорювати операції з метаданими, вести журнал транзакцій та постійно оновлювати дані. Нижче ми розглянемо кілька основних переваг: 

Журнал транзакцій озера Дельта

Журнали транзакцій Delta Lake є файлом лише для додавання та містять упорядкований запис усіх транзакцій, виконаних у таблиці Delta Lake. Журнал транзакцій дозволяє різним користувачам паралельно читати та записувати у зазначену таблицю. Він діє як єдине джерело правди або центральне сховище, яке реєструє всі зміни, внесені до таблиці користувачем. Він підтримує атомність і постійно стежить за транзакціями, що виконуються на озері Дельта.

Як згадувалося вище, Spark перевіряє дельта -журнал на наявність нових транзакцій, після чого Delta Lake гарантує, що версія користувача завжди синхронізована з основним записом. Він також гарантує, що в таблицю не вносяться суперечливі зміни. Якщо процес припиняє роботу перед оновленням дельта -журналу, файли не будуть доступні для будь -яких процесів читання, оскільки читання завжди проходять через журнал транзакцій.


Робочий журнал транзакцій та атомні коміти

Дельта -озеро робить контрольний пункт кожні десять комітів. Файл контрольної точки містить поточний стан даних у форматі Parquet, який можна швидко прочитати. Коли кілька користувачів намагаються змінити таблицю одночасно, Delta Lake вирішує конфлікти, використовуючи оптимістичний контроль паралельності.

Схема метаданих така: 

Колонка тип Опис
формат рядок Формат таблиці, тобто "дельта".
id рядок Унікальний ідентифікатор таблиці
ім'я рядок Назва таблиці, визначеної в метасховищі
description рядок Опис таблиці.
розташування рядок Розташування столу
створено відмітка часу Коли таблиця була створена
lastModified відмітка часу Коли остання зміна таблиці
partitionColumns масив рядків Назви стовпців розділів, якщо таблиця розділена
numFiles довго Кількість файлів в останній версії таблиці
властивості Рядково-струнна карта Усі властивості, встановлені для цієї таблиці
minReaderVersion Int Мінімальна версія читачів (згідно протоколу журналу), які можуть читати таблицю.
minWriterVersion Int Мінімальна версія зчитувачів (згідно протоколу журналу), яка може записувати в таблицю.
джерело: GitHub

Додати та видалити файл

Щоразу, коли додається файл або видаляється наявний файл, ці дії реєструються. Шлях до файлу є унікальним і вважається первинним ключем для набору файлів всередині нього. Коли до шляху, який уже є у таблиці, додається новий файл, статистика та інші метадані на шляху оновлюються з попередньої версії. Аналогічно, дія видалення позначається міткою часу. Дія видалення залишається у таблиці як надгробний пам'ятник, поки не закінчиться термін її дії. Термін дії надгробок закінчується, коли TTL (Time-To-Live) перевищує.

Оскільки дії всередині даного файлу Delta не гарантуються належним чином, вони не дійсні для кількох операцій над файлами з однаковим шляхом, які існують в одній версії.

Прапор dataChange для будь -якого "додати" або "видалити" може бути встановлений як false, щоб мінімізувати конфлікти одночасних операцій.

Схема дії додавання така:

Назва поля Тип даних Опис
шлях рядок Відносний шлях від кореня таблиці до файлу, який слід додати до таблиці
partitionValues Карта [рядок, рядок] Карта зі стовпця розділу до значення для цього файлу. 
розмір Довго Розмір цього файлу в байтах
час модифікації Довго Час створення цього файлу, як мілісекунди з епохи
dataChange Boolean Якщо значення false, файл уже повинен бути присутнім у таблиці, або записи у доданому файлі повинні міститися в одній або кількох діях видалення в тій же версії
статистика Статистична структура Містить статистику (наприклад, кількість, мінімальні/максимальні значення для стовпців) щодо даних у цьому файлі
теги Карта [рядок, рядок] Карта, що містить метадані про цей файл

Схема дії видалення така:

Назва поля дані тип Опис
шлях рядок Абсолютний або відносний шлях до файлу, який слід видалити з таблиці
deletemeTimestamp довго Час видалення, представлений у мілісекундах від епохи
dataChange Boolean Якщо значення false, записи у видаленому файлі повинні міститися в одній або кількох діях файлу додавання в тій же версії
extendedFileMetadata Boolean Якщо значення true, поля мають значення partitionValues, розмір та теги
partitionValues Карта [рядок, рядок] Карта зі стовпця розділу до значення для цього файлу. Див. Також Серіалізація значення розділів
розмір Довго Розмір цього файлу в байтах
теги Карта [рядок, рядок] Карта, що містить метадані про цей файл
джерело: GitHub

Схема метаданих містить шлях до файлу для кожної дії додавання/видалення, а процес читання Spark не потребує повного сканування, щоб отримати список файлів.

Якщо запис не відбудеться без оновлення журналу транзакцій, оскільки читання споживача завжди буде проходити через метадані, ці файли будуть проігноровані. 

Переваги переходу на Spark 3.0

Окрім використання переваг Delta Lake, перехід на Spark 3.0 покращив обробку даних такими способами:

Оптимізація перекосу приєднання

Перекос даних - це стан, при якому дані таблиці розподіляються нерівномірно між розділами кластера і можуть серйозно знизити продуктивність запитів, особливо тих, що мають об’єднання. Перекос може призвести до крайнього дисбалансу в кластері, тим самим збільшивши час обробки даних.

Умова перекосу даних може бути вирішена переважно трьома підходами.

  1. Використання конфігурації “spark.sql.shuffle.partitions” для збільшення паралелізму щодо більш рівномірно розподілених даних.
  2. Збільшення порогу сполучення хеш -трансляції за допомогою конфігурації spark.sql.autoBroadcastJoinThreshold до максимального розміру в байтах для таблиці, яку потрібно передавати на всі робочі вузли під час виконання з'єднання.
  3. Соління ключів (додайте префікс до перекошених ключів, щоб той самий ключ відрізнявся, а потім відрегулюйте розподіл даних).

Spark 3.0 додав оптимізацію до автоматичної обробки косого з'єднання на основі статистики виконання за допомогою нової адаптивної основи виконання.

Умова перекосу перегородок

Виклик перекошених розділів, що існував у попередній версії Spark 2.4, мав величезний вплив на час роботи в мережі та час виконання певного завдання. Більш того, методи боротьби з цим були переважно ручними. Spark 3.0 долає ці виклики.

Перекошений розділ матиме вплив на мережевий трафік та на час виконання завдання, оскільки це конкретне завдання матиме набагато більше даних для обробки. Вам також потрібно знати, як це впливає на кібербезпеку, оскільки обсяг мережевого трафіку - це те, чим користуються хакери.

Перекошений розділ з'єднання обчислюється за розміром даних та кількістю рядків зі статистики карти виконання.

Оптимізація

Адаптовано з: Apache Spark Jira

З наведеної вище таблиці кампанії Dataframe об’єднуються з організаціями Dataframe. Один із розділів (Розділ 0) з організацій великий і перекошений. Розділ 0 є результатом 9 карт попереднього етапу (Карта-0 до Карта-8). Правило OptimizeSkewedJoin Spark розділить розділ на 3, а потім створить 3 окремі завдання, кожне з яких є частковим розділом з розділу 0 (карта-0 до карти-2, карта-3 до карти-5 і карта-6 до карти-9) та приєднується до Розділу Кампаній 0. Цей підхід призводить до додаткових витрат, прочитавши Розділ 0 табличних Кампаній, що дорівнює кількості часткових розділів з таблиць Організацій.

Кінцевий результат

Використовуючи Delta Lake і Spark 3.0, ми дозволили такі результати для фірми з рекламної техніки:

  • Час обробки даних скорочено з 15 годин до 5-6 годин
  • Зниження вартості AWS EMR на 50%
  • Запобігання втраті даних та загибелі процесів, що було частим явищем, коли система втрачала пам'ять або обробка припинялася через збій у системі
  • Функції моніторингу та оповіщення були встановлені для сповіщення у разі збою процесу
  • Повна оркестрація за допомогою Airflow для досягнення повної автоматизації та управління залежностями між процесами

Джерело: https://www.smartdatacollective.com/improving-data-processing-with-spark-3-delta-lake/

Часова мітка:

Більше від Колектив SmartData