Улучшение обработки данных с помощью Spark 3.0 и Delta Lake

Исходный узел: 1013539

Сбор, обработка и анализ потоковые данные, в таких отраслях, как рекламные технологии, требуется интенсивная обработка данных. Ежедневно генерируется огромный объем данных (сотни ГБ данных), и для обработки данных на последующих этапах требуется значительное время.

Еще одна проблема - объединение наборов данных для получения информации. Каждый процесс в среднем имеет более 10 наборов данных и равное количество объединений с несколькими ключами. Размер раздела для каждого ключа непредсказуем при каждом запуске.

И, наконец, если в определенных случаях объем данных превышает допустимый, для хранилища может не хватить памяти. Это означает, что процесс умрет в середине финальной записи, заставляя потребителей отчетливо читать фреймы входных данных.

В этом блоге мы рассмотрим обзор Дельта Лейкс, его преимущества и способы решения вышеуказанных проблем путем перехода на Delta Lake и перехода на Spark 3.0 с Spark 2.4. 

Что такое Дельта-Лейк?

Delta Lake, разработанный в Databricks, представляет собой уровень хранения данных с открытым исходным кодом, который работает на существующем Data Lake и полностью совместим с Apache Spark API. Наряду с возможностью реализации транзакций ACID и масштабируемой обработки метаданных, Delta Lakes также может унифицировать потоковую и пакетную обработку данных ». 

Delta Lake использует файлы Parquet с версией для хранения данных в облаке. После настройки облачного расположения Delta Lake отслеживает все изменения, внесенные в таблицу или каталог хранилища больших двоичных объектов, чтобы обеспечить транзакции ACID. 

Преимущества использования Delta Lakes 

Delta lake позволяет тысячам данных работать параллельно, решать проблемы оптимизации и разделения, ускорять операции с метаданными, вести журнал транзакций и постоянно обновлять данные. Ниже мы обсудим несколько основных преимуществ: 

Журнал транзакций Delta Lake

Журналы транзакций Delta Lake представляют собой файл только для добавления и содержат упорядоченную запись всех транзакций, выполненных в таблице Delta Lake. Журнал транзакций позволяет различным пользователям читать и записывать данные в данную таблицу параллельно. Он действует как единый источник истины или центральный репозиторий, в котором регистрируются все изменения, внесенные в таблицу пользователем. Он поддерживает атомарность и постоянно отслеживает транзакции, выполняемые на Delta Lake.

Как упоминалось выше, Spark проверяет дельта-журнал на наличие новых транзакций, после чего Delta Lake гарантирует, что версия пользователя всегда синхронизируется с основной записью. Это также гарантирует, что в таблицу не будут внесены конфликтующие изменения. Если процесс выйдет из строя перед обновлением дельта-журнала, файлы не будут доступны никаким процессам чтения, поскольку операции чтения всегда проходят через журнал транзакций.

Работа с журналом транзакций и атомарные фиксации

Delta Lake проверяет каждые десять коммитов. Файл с контрольными точками содержит текущее состояние данных в формате Parquet, которые можно быстро прочитать. Когда несколько пользователей пытаются изменить таблицу одновременно, Delta Lake разрешает конфликты с помощью оптимистичного управления параллелизмом.

Схема метаданных следующая: 

ColumnТипОписание
форматstringФормат таблицы, то есть «дельта».
idstringУникальный идентификатор таблицы
имяstringИмя таблицы, как определено в хранилище метаданных
описаниеstringОписание таблицы.
расположениеstringРасположение стола
создано вотметка времениКогда таблица была создана
Последнее изменениеотметка времениКогда таблица была в последний раз изменена
разделКолоннымассив строкИмена столбцов раздела, если таблица разделена
количество файловдлиннойКоличество файлов в последней версии таблицы
свойстваСтрока-строкаВсе свойства, установленные для этой таблицы
minReaderVersionIntМинимальная версия читателей (согласно протоколу журнала), которые могут читать таблицу.
minWriterVersionIntМинимальная версия считывателей (согласно протоколу журнала), которые могут записывать в таблицу.
Источник: GitHub

Добавить и удалить файл

Каждый раз, когда файл добавляется или удаляется существующий файл, эти действия регистрируются. Путь к файлу уникален и считается первичным ключом для набора файлов внутри него. Когда новый файл добавляется по пути, который уже присутствует в таблице, статистика и другие метаданные по пути обновляются по сравнению с предыдущей версией. Точно так же действие удаления обозначается меткой времени. Действие удаления остается в таблице как надгробие, пока не истечет срок его действия. Срок действия надгробия истекает, когда TTL (время жизни) превышает.

Поскольку не гарантируется, что действия в данном файле Delta будут применяться по порядку, недопустимо, чтобы несколько операций с файлами с одним и тем же путем существовали в одной версии.

Флаг dataChange для добавления или удаления может иметь значение false, чтобы минимизировать конфликты одновременных операций.

Схема действия добавления следующая:

Имя поляТип данныхОписание
путьстрокаОтносительный путь от корня таблицы до файла, который нужно добавить в таблицу.
разделЗначенияКарта [Строка, Строка]Сопоставление столбца раздела со значением для этого файла. 
размерДлинноеРазмер этого файла в байтах
модификацияВремяДлинноеВремя создания этого файла в миллисекундах с начала эпохи.
изменение данныхЛогическийЕсли false, файл уже должен присутствовать в таблице или записи в добавленном файле должны содержаться в одном или нескольких действиях удаления в той же версии.
СтатистикаСтруктура статистикиСодержит статистику (например, количество, минимальные / максимальные значения для столбцов) о данных в этом файле.
имеют тегиКарта [Строка, Строка]Карта с метаданными об этом файле

Схема действия удаления следующая:

Имя поляДанные ТипОписание
путьstringАбсолютный или относительный путь к файлу, который следует удалить из таблицы.
отметка времени удалениядлиннойВремя, когда произошло удаление, в миллисекундах с начала эпохи.
изменение данныхЛогическийЕсли false, записи в удаленном файле должны содержаться в одном или нескольких действиях добавления файла в той же версии.
расширенные метаданные файлаЛогическийЕсли true, присутствуют поля partitionValues, size и tags.
разделЗначенияКарта [Строка, Строка]Сопоставление столбца раздела со значением для этого файла. См. Также Сериализация значений раздела.
размерДлинноеРазмер этого файла в байтах
имеют тегиКарта [Строка, Строка]Карта с метаданными об этом файле
Источник: GitHub

Схема метаданных содержит путь к файлу для каждого действия добавления / удаления, и процессу чтения Spark не требуется выполнять полное сканирование для получения списков файлов.

Если запись не удалась без обновления журнала транзакций, поскольку чтение потребителем всегда будет проходить через метаданные, эти файлы будут проигнорированы. 

Преимущества перехода на Spark 3.0

Помимо использования преимуществ Delta Lake, переход на Spark 3.0 улучшил обработку данных следующими способами:

Оптимизация асимметричного соединения

Перекос данных - это состояние, при котором данные таблицы неравномерно распределены между разделами в кластере и могут серьезно снизить производительность запросов, особенно тех, которые содержат соединения. Асимметрия может привести к сильному дисбалансу в кластере, увеличивая время обработки данных.

С условием перекоса данных можно справиться в основном тремя способами.

  1. Использование конфигурации «spark.sql.shuffle.partitions» для увеличения параллелизма для более равномерно распределенных данных.
  2. Увеличение порога широковещательного хэш-соединения с помощью конфигурации spark.sql.autoBroadcastJoinThreshold до максимального размера в байтах для таблицы, которая должна быть широковещательно передана всем рабочим узлам во время выполнения соединения.
  3. Key Salting (добавьте префикс к перекошенным ключам, чтобы сделать один и тот же ключ другим, а затем скорректируйте распределение данных).

Spark 3.0 добавил оптимизацию автоматической обработки асимметричного соединения на основе статистики времени выполнения с новой адаптивной средой выполнения.

Состояние перекоса раздела

Проблема перекоса разделов, существовавшая в предыдущей версии Spark 2.4, оказала огромное влияние на сетевое время и время выполнения конкретной задачи. Более того, методы борьбы с этим в основном были ручными. Spark 3.0 решает эти проблемы.

Перекошенный раздел будет влиять на сетевой трафик и время выполнения задачи, так как эта конкретная задача будет иметь гораздо больше данных для обработки. Вам также необходимо знать, как это влияет на кибербезопасность, поскольку объем сетевого трафика - это то, чем хакеры пользуются.

Разделение с перекосом соединения рассчитывается по размеру данных и количеству строк из статистики карты времени выполнения.

Оптимизация

Адаптирован из: Apache Spark Jira

Из приведенной выше таблицы видно, что кампании Dataframe объединяются с организациями Dataframe. Один из разделов (Раздел 0) от Организации большой и перекошенный. Раздел 0 является результатом 9 карт с предыдущего этапа (от Map-0 до Map-8). Правило Spark OptimizeSkewedJoin разделит раздел на 3, а затем создаст 3 отдельные задачи, каждая из которых является частичным разделом из раздела 0 (карта-0 в карту-2, карта-3 в карту-5 и карта-6 в карту-9). и объединяется с разделом Campaigns. Этот подход приводит к дополнительным затратам за счет чтения Раздела 0 таблицы Campaigns, равного количеству частичных разделов из таблицы Organizations.

Конечный результат

Используя Delta Lake и Spark 3.0, мы добились следующих результатов для рекламной компании:

  • Время обработки данных уменьшено с 15 часов до 5-6 часов.
  • Снижение стоимости AWS EMR на 50%
  • Предотвращение потери данных и прекращения работы процессов, что было частым явлением, когда в системе не хватало памяти или обработка останавливалась из-за сбоя в системе.
  • Были установлены функции мониторинга и оповещения для уведомления в случае сбоя процесса
  • Полная оркестровка с помощью Airflow для достижения полной автоматизации и управления зависимостями между процессами
Источник: https://www.smartdatacollective.com/improving-data-processing-with-spark-3-delta-lake/

Отметка времени:

Больше от Коллектив SmartData