Сбор, обработка и анализ потоковые данные, в таких отраслях, как рекламные технологии, требуется интенсивная обработка данных. Ежедневно генерируется огромный объем данных (сотни ГБ данных), и для обработки данных на последующих этапах требуется значительное время.
Еще одна проблема - объединение наборов данных для получения информации. Каждый процесс в среднем имеет более 10 наборов данных и равное количество объединений с несколькими ключами. Размер раздела для каждого ключа непредсказуем при каждом запуске.
И, наконец, если в определенных случаях объем данных превышает допустимый, для хранилища может не хватить памяти. Это означает, что процесс умрет в середине финальной записи, заставляя потребителей отчетливо читать фреймы входных данных.
В этом блоге мы рассмотрим обзор Дельта Лейкс, его преимущества и способы решения вышеуказанных проблем путем перехода на Delta Lake и перехода на Spark 3.0 с Spark 2.4.
Что такое Дельта-Лейк?
Delta Lake, разработанный в Databricks, представляет собой уровень хранения данных с открытым исходным кодом, который работает на существующем Data Lake и полностью совместим с Apache Spark API. Наряду с возможностью реализации транзакций ACID и масштабируемой обработки метаданных, Delta Lakes также может унифицировать потоковую и пакетную обработку данных ».
Delta Lake использует файлы Parquet с версией для хранения данных в облаке. После настройки облачного расположения Delta Lake отслеживает все изменения, внесенные в таблицу или каталог хранилища больших двоичных объектов, чтобы обеспечить транзакции ACID.
Преимущества использования Delta Lakes
Delta lake позволяет тысячам данных работать параллельно, решать проблемы оптимизации и разделения, ускорять операции с метаданными, вести журнал транзакций и постоянно обновлять данные. Ниже мы обсудим несколько основных преимуществ:
Журнал транзакций Delta Lake
Журналы транзакций Delta Lake представляют собой файл только для добавления и содержат упорядоченную запись всех транзакций, выполненных в таблице Delta Lake. Журнал транзакций позволяет различным пользователям читать и записывать данные в данную таблицу параллельно. Он действует как единый источник истины или центральный репозиторий, в котором регистрируются все изменения, внесенные в таблицу пользователем. Он поддерживает атомарность и постоянно отслеживает транзакции, выполняемые на Delta Lake.
Как упоминалось выше, Spark проверяет дельта-журнал на наличие новых транзакций, после чего Delta Lake гарантирует, что версия пользователя всегда синхронизируется с основной записью. Это также гарантирует, что в таблицу не будут внесены конфликтующие изменения. Если процесс выйдет из строя перед обновлением дельта-журнала, файлы не будут доступны никаким процессам чтения, поскольку операции чтения всегда проходят через журнал транзакций.
Работа с журналом транзакций и атомарные фиксации
Delta Lake проверяет каждые десять коммитов. Файл с контрольными точками содержит текущее состояние данных в формате Parquet, которые можно быстро прочитать. Когда несколько пользователей пытаются изменить таблицу одновременно, Delta Lake разрешает конфликты с помощью оптимистичного управления параллелизмом.
Схема метаданных следующая:
Column | Тип | Описание |
формат | string | Формат таблицы, то есть «дельта». |
id | string | Уникальный идентификатор таблицы |
имя | string | Имя таблицы, как определено в хранилище метаданных |
описание | string | Описание таблицы. |
расположение | string | Расположение стола |
создано в | отметка времени | Когда таблица была создана |
Последнее изменение | отметка времени | Когда таблица была в последний раз изменена |
разделКолонны | массив строк | Имена столбцов раздела, если таблица разделена |
количество файлов | длинной | Количество файлов в последней версии таблицы |
свойства | Строка-строка | Все свойства, установленные для этой таблицы |
minReaderVersion | Int | Минимальная версия читателей (согласно протоколу журнала), которые могут читать таблицу. |
minWriterVersion | Int | Минимальная версия считывателей (согласно протоколу журнала), которые могут записывать в таблицу. |
Добавить и удалить файл
Каждый раз, когда файл добавляется или удаляется существующий файл, эти действия регистрируются. Путь к файлу уникален и считается первичным ключом для набора файлов внутри него. Когда новый файл добавляется по пути, который уже присутствует в таблице, статистика и другие метаданные по пути обновляются по сравнению с предыдущей версией. Точно так же действие удаления обозначается меткой времени. Действие удаления остается в таблице как надгробие, пока не истечет срок его действия. Срок действия надгробия истекает, когда TTL (время жизни) превышает.
Поскольку не гарантируется, что действия в данном файле Delta будут применяться по порядку, недопустимо, чтобы несколько операций с файлами с одним и тем же путем существовали в одной версии.
Флаг dataChange для добавления или удаления может иметь значение false, чтобы минимизировать конфликты одновременных операций.
Схема действия добавления следующая:
Имя поля | Тип данных | Описание |
путь | строка | Относительный путь от корня таблицы до файла, который нужно добавить в таблицу. |
разделЗначения | Карта [Строка, Строка] | Сопоставление столбца раздела со значением для этого файла. |
размер | Длинное | Размер этого файла в байтах |
модификацияВремя | Длинное | Время создания этого файла в миллисекундах с начала эпохи. |
изменение данных | Логический | Если false, файл уже должен присутствовать в таблице или записи в добавленном файле должны содержаться в одном или нескольких действиях удаления в той же версии. |
Статистика | Структура статистики | Содержит статистику (например, количество, минимальные / максимальные значения для столбцов) о данных в этом файле. |
имеют теги | Карта [Строка, Строка] | Карта с метаданными об этом файле |
Схема действия удаления следующая:
Имя поля | Данные Тип | Описание |
путь | string | Абсолютный или относительный путь к файлу, который следует удалить из таблицы. |
отметка времени удаления | длинной | Время, когда произошло удаление, в миллисекундах с начала эпохи. |
изменение данных | Логический | Если false, записи в удаленном файле должны содержаться в одном или нескольких действиях добавления файла в той же версии. |
расширенные метаданные файла | Логический | Если true, присутствуют поля partitionValues, size и tags. |
разделЗначения | Карта [Строка, Строка] | Сопоставление столбца раздела со значением для этого файла. См. Также Сериализация значений раздела. |
размер | Длинное | Размер этого файла в байтах |
имеют теги | Карта [Строка, Строка] | Карта с метаданными об этом файле |
Схема метаданных содержит путь к файлу для каждого действия добавления / удаления, и процессу чтения Spark не требуется выполнять полное сканирование для получения списков файлов.
Если запись не удалась без обновления журнала транзакций, поскольку чтение потребителем всегда будет проходить через метаданные, эти файлы будут проигнорированы.
Преимущества перехода на Spark 3.0
Помимо использования преимуществ Delta Lake, переход на Spark 3.0 улучшил обработку данных следующими способами:
Оптимизация асимметричного соединения
Перекос данных - это состояние, при котором данные таблицы неравномерно распределены между разделами в кластере и могут серьезно снизить производительность запросов, особенно тех, которые содержат соединения. Асимметрия может привести к сильному дисбалансу в кластере, увеличивая время обработки данных.
С условием перекоса данных можно справиться в основном тремя способами.
- Использование конфигурации «spark.sql.shuffle.partitions» для увеличения параллелизма для более равномерно распределенных данных.
- Увеличение порога широковещательного хэш-соединения с помощью конфигурации spark.sql.autoBroadcastJoinThreshold до максимального размера в байтах для таблицы, которая должна быть широковещательно передана всем рабочим узлам во время выполнения соединения.
- Key Salting (добавьте префикс к перекошенным ключам, чтобы сделать один и тот же ключ другим, а затем скорректируйте распределение данных).
Spark 3.0 добавил оптимизацию автоматической обработки асимметричного соединения на основе статистики времени выполнения с новой адаптивной средой выполнения.
Состояние перекоса раздела
Проблема перекоса разделов, существовавшая в предыдущей версии Spark 2.4, оказала огромное влияние на сетевое время и время выполнения конкретной задачи. Более того, методы борьбы с этим в основном были ручными. Spark 3.0 решает эти проблемы.
Перекошенный раздел будет влиять на сетевой трафик и время выполнения задачи, так как эта конкретная задача будет иметь гораздо больше данных для обработки. Вам также необходимо знать, как это влияет на кибербезопасность, поскольку объем сетевого трафика - это то, чем хакеры пользуются.
Разделение с перекосом соединения рассчитывается по размеру данных и количеству строк из статистики карты времени выполнения.
Оптимизация
Из приведенной выше таблицы видно, что кампании Dataframe объединяются с организациями Dataframe. Один из разделов (Раздел 0) от Организации большой и перекошенный. Раздел 0 является результатом 9 карт с предыдущего этапа (от Map-0 до Map-8). Правило Spark OptimizeSkewedJoin разделит раздел на 3, а затем создаст 3 отдельные задачи, каждая из которых является частичным разделом из раздела 0 (карта-0 в карту-2, карта-3 в карту-5 и карта-6 в карту-9). и объединяется с разделом Campaigns. Этот подход приводит к дополнительным затратам за счет чтения Раздела 0 таблицы Campaigns, равного количеству частичных разделов из таблицы Organizations.
Конечный результат
Используя Delta Lake и Spark 3.0, мы добились следующих результатов для рекламной компании:
- Время обработки данных уменьшено с 15 часов до 5-6 часов.
- Снижение стоимости AWS EMR на 50%
- Предотвращение потери данных и прекращения работы процессов, что было частым явлением, когда в системе не хватало памяти или обработка останавливалась из-за сбоя в системе.
- Были установлены функции мониторинга и оповещения для уведомления в случае сбоя процесса
- Полная оркестровка с помощью Airflow для достижения полной автоматизации и управления зависимостями между процессами
- &
- 9
- Absolute
- Действие
- Ad
- дополнительный
- плюс
- Все
- Все транзакции
- среди
- анализ
- апаш
- Apache Spark
- API
- автоматический
- автоматизация
- AWS
- Блог
- Кампании
- проведение
- вызов
- Проверки
- облако
- Column
- Потребители
- содержание
- кооперативный
- Текущий
- Текущее состояние
- Информационная безопасность
- данным
- Озеро данных
- обработка данных
- хранение данных
- Databricks
- сделка
- Delta
- Проект и
- выполнение
- Особенности
- Поля
- в заключение
- Фирма
- формат
- Рамки
- полный
- Сбой
- Хакеры
- Управляемость
- хэш
- Как
- HTTPS
- огромный
- Влияние
- промышленности
- размышления
- IT
- присоединиться
- Основные
- ключи
- последний
- вести
- Объявления
- расположение
- основной
- Создание
- управление
- карта
- Карты
- сеть
- сетевой трафик
- узлы
- Операционный отдел
- заказ
- Другие контрактные услуги
- производительность
- плагин
- представить
- читатели
- Reading
- учет
- Итоги
- Run
- сканирование
- набор
- Размер
- раскол
- SQL
- Область
- статистика
- диск
- магазин
- потоковый
- система
- технологии
- время
- трафик
- сделка
- Сделки
- пользователей
- ценностное
- объем
- в