Десять новых визуальных преобразований в AWS Glue Studio

Десять новых визуальных преобразований в AWS Glue Studio

Исходный узел: 2641422

Клей-студия AWS — это графический интерфейс, упрощающий создание, запуск и мониторинг заданий извлечения, преобразования и загрузки (ETL) в Клей AWS. Он позволяет визуально составлять рабочие процессы преобразования данных, используя узлы, представляющие различные этапы обработки данных, которые впоследствии автоматически преобразуются в код для выполнения.

Клей-студия AWS недавно выпущена Еще 10 визуальных преобразований, позволяющих создавать более сложные задания визуальным способом без навыков программирования. В этом посте мы обсудим возможные варианты использования, которые отражают общие потребности ETL.

Новые преобразования, которые будут продемонстрированы в этом посте: объединение, разделение строки, массив в столбцы, добавление текущей метки времени, сводные строки в столбцы, разведение столбцов в строки, поиск, разнесение массива или сопоставление в столбцы, производный столбец и обработка автобаланса. .

Обзор решения

В этом случае у нас есть несколько файлов JSON с операциями с фондовыми опционами. Мы хотим сделать некоторые преобразования перед сохранением данных, чтобы их было легче анализировать, а также мы хотим создать отдельную сводку набора данных.

В этом наборе данных каждая строка представляет собой сделку по опционным контрактам. Опционы — это финансовые инструменты, дающие право — но не обязательство — покупать или продавать акции по фиксированной цене (называемой  цена исполнения) до установленной даты истечения срока действия.

Входные данные

Данные следуют следующей схеме:

  • номер заказа – Уникальный идентификатор
  • символ – Код, обычно состоящий из нескольких букв, идентифицирующий корпорацию, которая выпускает базовые акции.
  • инструмент – Имя, которое идентифицирует конкретный покупаемый или продаваемый опцион.
  • валюта – Код валюты ISO, в которой выражена цена
  • цена – Сумма, которая была уплачена за покупку каждого опционного контракта (на большинстве бирж один контракт позволяет купить или продать 100 акций)
  • обмена – Код обменного центра или площадки, на которой была совершена сделка по опциону
  • проданный – Список количества контрактов, которые были выделены для исполнения ордера на продажу, когда это сделка на продажу.
  • купил – Список количества контрактов, выделенных для исполнения ордера на покупку, когда это сделка на покупку.

Ниже приведен пример синтетических данных, сгенерированных для этого поста:

{"order_id": 1679931512485, "symbol": "AMZN", "instrument": "AMZN MAR 24 23 102 PUT", "currency": "usd", "price": 17.18, "exchange": "EDGX", "bought": [18, 38]}
{"order_id": 1679931512486, "symbol": "BMW.DE", "instrument": "BMW.DE MAR 24 23 96 PUT", "currency": "eur", "price": 2.98, "exchange": "XETR", "bought": [28]}
{"order_id": 1679931512487, "symbol": "BMW.DE", "instrument": "BMW.DE APR 28 23 101 CALL", "currency": "eur", "price": 14.71, "exchange": "XETR", "sold": [9, 59, 54]}
{"order_id": 1679931512489, "symbol": "JPM", "instrument": "JPM JUN 30 23 140 CALL", "currency": "usd", "price": 11.83, "exchange": "EDGX", "bought": [33, 42, 55, 67]}
{"order_id": 1679931512490, "symbol": "SIE.DE", "instrument": "SIE.DE MAR 24 23 149 CALL", "currency": "eur", "price": 13.68, "exchange": "XETR", "bought": [96, 89, 82]}
{"order_id": 1679931512491, "symbol": "NKE", "instrument": "NKE MAR 24 23 112 CALL", "currency": "usd", "price": 3.23, "exchange": "EDGX", "sold": [67]}
{"order_id": 1679931512492, "symbol": "AMZN", "instrument": "AMZN MAY 26 23 95 CALL", "currency": "usd", "price": 11.44, "exchange": "EDGX", "sold": [41, 62, 12]}
{"order_id": 1679931512493, "symbol": "JPM", "instrument": "JPM MAR 24 23 121 PUT", "currency": "usd", "price": 1.0, "exchange": "EDGX", "bought": [61, 34]}
{"order_id": 1679931512494, "symbol": "SAP.DE", "instrument": "SAP.DE MAR 24 23 132 CALL", "currency": "eur", "price": 15.9, "exchange": "XETR", "bought": [69, 33]}

ETL-требования

Эти данные имеют ряд уникальных характеристик, часто встречающихся в старых системах, которые затрудняют их использование.

Ниже приведены требования к ETL:

  • Название прибора содержит ценную информацию, предназначенную для понимания людьми; мы хотим нормализовать его в отдельные столбцы для облегчения анализа.
  • Атрибуты bought и sold взаимоисключающие; мы можем объединить их в один столбец с номерами контрактов и иметь еще один столбец, указывающий, были ли контракты куплены или проданы в этом порядке.
  • Мы хотим сохранить информацию о распределении отдельных контрактов, но в виде отдельных строк, вместо того, чтобы заставлять пользователей иметь дело с массивом чисел. Мы могли бы сложить цифры, но потеряли бы информацию о том, как был исполнен ордер (что свидетельствует о ликвидности рынка). Вместо этого мы решили денормализировать таблицу, чтобы в каждой строке было одно количество контрактов, разбивая заказы с несколькими номерами на отдельные строки. В сжатом формате столбцов дополнительный размер набора данных этого повторения часто невелик, когда применяется сжатие, поэтому допустимо упростить запрос к набору данных.
  • Мы хотим создать сводную таблицу объема для каждого типа опциона (колл и пут) для каждой акции. Это дает представление о настроении рынка для каждой акции и рынка в целом (жадность против страха).
  • Чтобы включить общие торговые сводки, мы хотим предоставить для каждой операции общий итог и стандартизировать валюту по долларам США, используя приблизительную ссылку конвертации.
  • Мы хотим добавить дату, когда произошли эти преобразования. Это может быть полезно, например, чтобы иметь справку о том, когда была произведена конвертация валюты.

На основе этих требований задание выдаст два результата:

  • CSV-файл со сводкой по количеству контрактов для каждого символа и типа.
  • Таблица каталога для хранения истории заказа после выполнения указанных преобразований
    Схема данных

Предпосылки

Вам понадобится собственное ведро S3, чтобы следовать этому варианту использования. Чтобы создать новый сегмент, см. Создание ведра.

Создание синтетических данных

Чтобы следовать этому сообщению (или поэкспериментировать с такими данными самостоятельно), вы можете создать этот набор данных синтетическим путем. Следующий скрипт Python можно запустить в среде Python с установленным Boto3 и доступом к Простой сервис хранения Amazon (Amazon S3).

Чтобы сгенерировать данные, выполните следующие шаги:

  1. В AWS Glue Studio создайте новое задание с параметром Редактор сценариев оболочки Python.
  2. Дайте заданию имя и на Детали работы вкладку, выберите подходящая роль и имя для скрипта Python.
  3. В Детали работы раздел, развернуть Дополнительные свойства и прокрутите вниз до Параметры работы.
  4. Введите параметр с именем --bucket и назначьте в качестве значения имя корзины, которую вы хотите использовать для хранения демонстрационных данных.
  5. Введите следующий скрипт в редактор оболочки AWS Glue:
    import argparse
    import boto3
    from datetime import datetime
    import io
    import json
    import random
    import sys # Configuration
    parser = argparse.ArgumentParser()
    parser.add_argument('--bucket')
    args, ignore = parser.parse_known_args()
    if not args.bucket: raise Exception("This script requires an argument --bucket with the value specifying the S3 bucket where to store the files generated") data_bucket = args.bucket
    data_path = "transformsblog/inputdata"
    samples_per_file = 1000 # Create a single file with synthetic data samples
    s3 = boto3.client('s3')
    buff = io.BytesIO() sample_stocks = [("AMZN", 95, "usd"), ("NKE", 120, "usd"), ("JPM", 130, "usd"), ("KO", 130, "usd"), ("BMW.DE", 95, "eur"), ("SIE.DE", 140, "eur"), ("SAP.DE", 115, "eur")]
    option_type = ["PUT", "CALL"]
    operations = ["sold", "bought"]
    dates = ["MAR 24 23", "APR 28 23", "MAY 26 23", "JUN 30 23"]
    for i in range(samples_per_file): stock = random.choice(sample_stocks) symbol = stock[0] ref_price = stock[1] currency = stock[2] strike_price = round(ref_price * 0.9 + ref_price * random.uniform(0.01, 0.3)) sample = { "order_id": int(datetime.now().timestamp() * 1000) + i, "symbol": stock[0], "instrument":f"{symbol} {random.choice(dates)} {strike_price} {random.choice(option_type)}", "currency": currency, "price": round(random.uniform(0.5, 20.1), 2), "exchange": "EDGX" if currency == "usd" else "XETR" } sample[random.choice(operations)] = [random.randrange(1,100) for i in range(random.randrange(1,5))] buff.write(json.dumps(sample).encode()) buff.write("n".encode()) s3.put_object(Body=buff.getvalue(), Bucket=data_bucket, Key=f"{data_path}/{int(datetime.now().timestamp())}.json")

  6. Запустите задание и подождите, пока оно не отобразится как успешно выполненное на вкладке «Выполнения» (это должно занять всего несколько секунд).

Каждый запуск будет генерировать файл JSON с 1,000 строками в указанном сегменте и префиксе transformsblog/inputdata/. Вы можете запустить задание несколько раз, если хотите протестировать большее количество входных файлов.
Каждая строка в синтетических данных представляет собой строку данных, представляющую объект JSON, как показано ниже:

{ "order_id":1681986991888, "symbol":"AMZN", "instrument":"AMZN APR 28 23 100 PUT", "currency":"usd", "price":2.89, "exchange":"EDGX", "sold":[88,49]
}

Создание визуального задания AWS Glue

Чтобы создать визуальное задание AWS Glue, выполните следующие действия:

  1. Перейдите в AWS Glue Studio и создайте задание, используя опцию Визуальный с пустым холстом.
  2. Редактировать Untitled job дать ему имя и назначить роль, подходящая для AWS Glue на Детали работы меню.
  3. Добавьте источник данных S3 (вы можете назвать его JSON files source) и введите URL-адрес S3, под которым хранятся файлы (например, s3://<your bucket name>/transformsblog/inputdata/), затем выберите JSON как формат данных.
  4. Выберите Вывод схемы поэтому он устанавливает схему вывода на основе данных.

Из этого исходного узла вы будете продолжать цепочку преобразований. При добавлении каждого преобразования убедитесь, что выбранный узел добавлен последним, чтобы он был назначен родительским, если в инструкциях не указано иное.

Если вы выбрали не того родителя, вы всегда можете отредактировать родителя, выбрав его и выбрав другого родителя на панели конфигурации.

Конфигурация родительского узла

Каждому добавленному узлу вы даете конкретное имя (чтобы назначение узла отображалось на графике) и конфигурацию на Transform меню.

Каждый раз, когда преобразование изменяет схему (например, добавляет новый столбец), необходимо обновить выходную схему, чтобы она была видна нижестоящим преобразованиям. Вы можете вручную отредактировать схему вывода, но практичнее и безопаснее делать это с помощью предварительного просмотра данных.
Кроме того, таким образом вы можете убедиться, что преобразование работает так, как ожидалось. Для этого откройте Предварительный просмотр данных вкладку с выбранным преобразованием и начать сеанс предварительного просмотра. После того, как вы убедились, что преобразованные данные выглядят так, как ожидалось, перейдите к Выходная схема и выберите Использовать схему предварительного просмотра данных для автоматического обновления схемы.

По мере добавления новых видов преобразований предварительный просмотр может отображать сообщение об отсутствующей зависимости. Когда это произойдет, выберите Конец сессии и запустите новый, поэтому предварительный просмотр улавливает новый тип узла.

Извлечь информацию о приборе

Давайте начнем с обработки информации об имени инструмента, чтобы нормализовать ее в столбцах, к которым легче получить доступ в результирующей выходной таблице.

  1. Добавить Разделить строку узел и назовите его Split instrument, который будет токенизировать столбец инструмента с помощью регулярного выражения с пробелами: s+ (в этом случае подойдет один пробел, но этот способ более гибкий и визуально более четкий).
  2. Мы хотим сохранить исходную информацию об инструменте как есть, поэтому введите новое имя столбца для разделенного массива: instrument_arr.
    Разделить конфигурацию
  3. Добавьте Массив в столбцы узел и назовите его Instrument columns для преобразования только что созданного столбца массива в новые поля, за исключением symbol, для которого у нас уже есть столбец.
  4. Выберите столбец instrument_arr, пропустите первый токен и скажите ему извлечь выходные столбцы month, day, year, strike_price, type с использованием индексов 2, 3, 4, 5, 6 (пробелы после запятых для удобства чтения, они не влияют на конфигурацию).
    Конфигурация массива

Извлеченный год выражается только двумя цифрами; давайте положим временную паузу, чтобы предположить, что это в этом столетии, если они просто используют две цифры.

  1. Добавить Производный столбец узел и назовите его Four digits year.
  2. Enter year в качестве производного столбца, чтобы он переопределял его, и введите следующее выражение SQL:
    CASE WHEN length(year) = 2 THEN ('20' || year) ELSE year END
    Конфигурация столбца, производного от года

Для удобства строим expiration_date поле, которое пользователь может иметь в качестве ссылки на последнюю дату, когда опцион может быть исполнен.

  1. Добавить Объединить столбцы узел и назовите его Build expiration date.
  2. Назовите новый столбец expiration_date, выберите столбцы year, monthи day (именно в таком порядке) и дефис в качестве разделителя.
    Объединенная конфигурация даты

Пока что диаграмма должна выглядеть как следующий пример.

DAG

Предварительный просмотр данных новых столбцов на данный момент должен выглядеть так, как показано на следующем снимке экрана.

Предварительный просмотр данных

Нормировать количество контрактов

В каждой строке данных указано количество контрактов каждого опциона, которые были куплены или проданы, и партии, по которым были исполнены ордера. Без потери информации об отдельных пакетах мы хотим, чтобы каждая сумма была в отдельной строке с одним значением суммы, в то время как остальная информация реплицируется в каждой созданной строке.

Во-первых, давайте объединим суммы в один столбец.

  1. Добавьте Развернуть столбцы в строки узел и назовите его Unpivot actions.
  2. Выберите столбцы bought и sold чтобы развернуть и сохранить имена и значения в столбцах с именами action и contracts, Соответственно.
    Развернуть конфигурацию
    Обратите внимание, что в предварительном просмотре новый столбец contracts остается массивом чисел после этого преобразования.
  1. Добавьте Разбить массив или карту на строки строка с именем Explode contracts.
  2. Выберите contracts столбца и введите contracts как новый столбец, чтобы переопределить его (нам не нужно сохранять исходный массив).

Предварительный просмотр теперь показывает, что каждая строка имеет один contracts сумма, а остальные поля одинаковы.

Это также означает, что order_id больше не является уникальным ключом. Для ваших собственных вариантов использования вам нужно решить, как моделировать ваши данные и хотите ли вы денормализации или нет.
Развернуть конфигурацию

На следующем снимке экрана показан пример того, как выглядят новые столбцы после преобразований.
Предварительный просмотр данных

Создайте сводную таблицу

Теперь вы создаете сводную таблицу с количеством контрактов, торгуемых для каждого типа и каждого символа акции.

Давайте для наглядности предположим, что обработанные файлы относятся к одному дню, поэтому эта сводка дает бизнес-пользователям информацию о том, каковы рыночные интересы и настроения в этот день.

  1. Добавить Выбрать поля узел и выберите следующие столбцы, чтобы сохранить сводку: symbol, typeи contracts.
    Выбранные поля
  2. Добавить Сводные строки в столбцы узел и назовите его Pivot summary.
  3. Совокупность по contracts столбец с использованием sum и выберите преобразование type колонка.
    Сводная конфигурация

Обычно вы храните его во внешней базе данных или файле для справки; в этом примере мы сохраняем его как файл CSV на Amazon S3.

  1. Добавьте Обработка автобаланса узел и назовите его Single output file.
  2. Хотя этот тип преобразования обычно используется для оптимизации параллелизма, здесь мы используем его для сокращения вывода до одного файла. Поэтому введите 1 в конфигурации количества разделов.
    Конфигурация автобаланса
  3. Добавьте цель S3 и назовите ее CSV Contract summary.
  4. Выберите CSV в качестве формата данных и введите путь S3, где роли задания разрешено хранить файлы.

Последняя часть задания теперь должна выглядеть так, как показано в следующем примере.
DAG

  1. Сохраните и запустите задание. Использовать Работает вкладку, чтобы проверить успешное завершение.
    По этому пути вы найдете файл в формате CSV, несмотря на отсутствие такого расширения. Вам, вероятно, потребуется добавить расширение после его загрузки, чтобы открыть его.
    В инструменте, который может читать CSV, сводка должна выглядеть примерно так, как показано в следующем примере.
    Таблица

Очистить временные столбцы

При подготовке к сохранению заказов в исторической таблице для будущего анализа давайте очистим некоторые временные столбцы, созданные по ходу дела.

  1. Добавить Перетащите поля узел с Explode contracts узел, выбранный в качестве его родителя (мы разветвляем конвейер данных для создания отдельного вывода).
  2. Выберите поля, которые нужно удалить: instrument_arr, month, dayи year.
    Остальные мы хотим сохранить, чтобы они были сохранены в исторической таблице, которую мы создадим позже.
    Перетащите поля

Стандартизация валюты

Эти синтетические данные содержат вымышленные операции с двумя валютами, но в реальной системе вы могли бы получать валюты с рынков по всему миру. Полезно стандартизировать обрабатываемые валюты в единую базовую валюту, чтобы их можно было легко сравнивать и объединять для отчетности и анализа.

МЫ ИСПОЛЬЗУЕМ Амазонка Афина для имитации таблицы с приблизительными преобразованиями валют, которая периодически обновляется (здесь мы предполагаем, что обрабатываем заказы достаточно своевременно, чтобы преобразование было разумным репрезентативным для целей сравнения).

  1. Откройте консоль Athena в том же регионе, где вы используете AWS Glue.
  2. Выполните следующий запрос, чтобы создать таблицу, задав местоположение S3, где ваши роли Athena и AWS Glue могут читать и записывать. Кроме того, вы можете захотеть сохранить таблицу в другой базе данных, чем default (если вы это сделаете, соответственно обновите квалифицированное имя таблицы в приведенных примерах).
    CREATE EXTERNAL TABLE default.exchange_rates(currency string, exchange_rate double)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    STORED AS TEXTFILE
    LOCATION 's3://<enter some bucket>/exchange_rates/';

  3. Введите несколько примеров преобразования в таблицу:
    INSERT INTO default.exchange_rates VALUES ('usd', 1.0), ('eur', 1.09), ('gbp', 1.24);
  4. Теперь вы должны иметь возможность просматривать таблицу со следующим запросом:
    SELECT * FROM default.exchange_rates
  5. Вернувшись к визуальному заданию AWS Glue, добавьте Поиск узел (как дочерний элемент Drop Fields) и назовите это Exchange rate.
  6. Введите полное имя только что созданной таблицы, используя currency в качестве ключа и выберите exchange_rate поле для использования.
    Поскольку имя поля одинаково и в данных, и в таблице поиска, мы можем просто ввести имя currency и не нужно определять сопоставление.Конфигурация поиска
    На момент написания этой статьи преобразование «Уточняющий запрос» не поддерживается в предварительном просмотре данных, и оно покажет ошибку, что таблица не существует. Это только для предварительного просмотра данных и не мешает правильному выполнению задания. Несколько оставшихся шагов поста не требуют обновления схемы. Если вам нужно запустить предварительный просмотр данных на других узлах, вы можете временно удалить узел поиска, а затем вернуть его обратно.
  7. Добавить Производный столбец узел и назовите его Total in usd.
  8. Назовите производный столбец total_usd и используйте следующее выражение SQL:
    round(contracts * price * exchange_rate, 2)
    Конфигурация конвертации валюты
  9. Добавить Добавить текущую метку времени узел и имя столбца ingest_date.
  10. Используйте формат %Y-%m-%d для вашей метки времени (в демонстрационных целях мы просто используем дату; вы можете сделать ее более точной, если хотите).
    Конфигурация метки времени

Сохраните таблицу исторических заказов

Чтобы сохранить таблицу исторических заказов, выполните следующие шаги:

  1. Добавьте целевой узел S3 и назовите его Orders table.
  2. Настройте формат Parquet с быстрым сжатием и укажите целевой путь S3 для хранения результатов (отдельно от сводки).
  3. Выберите Создайте таблицу в каталоге данных и при последующих запусках обновите схему и добавьте новые разделы..
  4. Введите целевую базу данных и имя для новой таблицы, например: option_orders.
    Конфигурация настольной раковины

Последняя часть диаграммы теперь должна выглядеть примерно так, как показано ниже, с двумя ответвлениями для двух отдельных выходов.
DAG

После успешного запуска задания вы можете использовать такой инструмент, как Athena, для просмотра данных, полученных заданием, путем запроса новой таблицы. Вы можете найти таблицу в списке Athena и выбрать Таблица предварительного просмотра или просто запустите запрос SELECT (обновив имя таблицы на имя и каталог, которые вы использовали):

SELECT * FROM default.option_orders limit 10

Содержимое вашей таблицы должно выглядеть примерно так, как показано на следующем снимке экрана.
Содержание таблицы

Убирать

Если вы не хотите сохранять этот пример, удалите два созданных вами задания, две таблицы в Athena и пути S3, где хранились входные и выходные файлы.

Заключение

В этом посте мы показали, как новые преобразования в AWS Glue Studio могут помочь вам выполнять более сложные преобразования с минимальной конфигурацией. Это означает, что вы можете реализовать больше случаев использования ETL без необходимости писать и поддерживать какой-либо код. Новые преобразования уже доступны в AWS Glue Studio, поэтому вы можете использовать новые преобразования уже сегодня в своих визуальных задачах.


Об авторе

Гонсало Эррерос является старшим архитектором больших данных в команде AWS Glue.

Отметка времени:

Больше от AWS Большие данные