AWS Glue Studio の XNUMX 個の新しい視覚的変換

AWS Glue Studio の XNUMX 個の新しい視覚的変換

ソースノード: 2641422

AWS グルースタジオ での抽出、変換、ロード (ETL) ジョブの作成、実行、監視を容易にするグラフィカル インターフェイスです。 AWSグルー. これにより、さまざまなデータ処理ステップを表すノードを使用して、データ変換ワークフローを視覚的に構成できます。これらのステップは、後で実行するコードに自動的に変換されます。

AWS グルースタジオ 最近リリースされた コーディング スキルがなくても、より高度なジョブを視覚的に作成できるようにする 10 個のビジュアル トランスフォーム。 この投稿では、一般的な ETL のニーズを反映した潜在的なユース ケースについて説明します。

この記事で紹介する新しい変換は、連結、文字列の分割、列への配列、現在のタイムスタンプの追加、行から列へのピボット、列から行へのピボット解除、ルックアップ、配列の展開または列へのマップ、派生列、自動バランス処理です。 .

ソリューションの概要

このユース ケースでは、ストック オプション操作を含む JSON ファイルがいくつかあります。 分析を容易にするために、データを保存する前にいくつかの変換を行い、別のデータセットの要約も作成したいと考えています。

このデータセットでは、各行がオプション契約の取引を表しています。 オプションは、固定価格で株式を売買する権利を提供する金融商品ですが、義務ではありません。  行使価格) 定義された有効期限の前。

入力データ

データは次のスキーマに従います。

  • 注文ID – 一意の ID
  • シンボル – 通常、基礎となる株式を発行する企業を識別するための数文字に基づくコード
  • 楽器 – 売買される特定のオプションを識別する名前
  • 通貨 – 価格を表す ISO 通貨コード
  • ブランド – 各オプション コントラクトの購入に対して支払われた金額 (ほとんどの取引所では、100 つのコントラクトで XNUMX 株を売買できます)
  • 交換 – オプションが取引された交換センターまたは会場のコード
  • 売ら – これが売り取引である場合に売り注文を約定するために割り当てられた契約数のリスト
  • 買った – これが買い取引である場合、買い注文を約定するために割り当てられた契約数のリスト

以下は、この記事のために生成された合成データのサンプルです。

{"order_id": 1679931512485, "symbol": "AMZN", "instrument": "AMZN MAR 24 23 102 PUT", "currency": "usd", "price": 17.18, "exchange": "EDGX", "bought": [18, 38]}
{"order_id": 1679931512486, "symbol": "BMW.DE", "instrument": "BMW.DE MAR 24 23 96 PUT", "currency": "eur", "price": 2.98, "exchange": "XETR", "bought": [28]}
{"order_id": 1679931512487, "symbol": "BMW.DE", "instrument": "BMW.DE APR 28 23 101 CALL", "currency": "eur", "price": 14.71, "exchange": "XETR", "sold": [9, 59, 54]}
{"order_id": 1679931512489, "symbol": "JPM", "instrument": "JPM JUN 30 23 140 CALL", "currency": "usd", "price": 11.83, "exchange": "EDGX", "bought": [33, 42, 55, 67]}
{"order_id": 1679931512490, "symbol": "SIE.DE", "instrument": "SIE.DE MAR 24 23 149 CALL", "currency": "eur", "price": 13.68, "exchange": "XETR", "bought": [96, 89, 82]}
{"order_id": 1679931512491, "symbol": "NKE", "instrument": "NKE MAR 24 23 112 CALL", "currency": "usd", "price": 3.23, "exchange": "EDGX", "sold": [67]}
{"order_id": 1679931512492, "symbol": "AMZN", "instrument": "AMZN MAY 26 23 95 CALL", "currency": "usd", "price": 11.44, "exchange": "EDGX", "sold": [41, 62, 12]}
{"order_id": 1679931512493, "symbol": "JPM", "instrument": "JPM MAR 24 23 121 PUT", "currency": "usd", "price": 1.0, "exchange": "EDGX", "bought": [61, 34]}
{"order_id": 1679931512494, "symbol": "SAP.DE", "instrument": "SAP.DE MAR 24 23 132 CALL", "currency": "eur", "price": 15.9, "exchange": "XETR", "bought": [69, 33]}

ETL 要件

古いシステムでよく見られるように、このデータには多くの固有の特性があり、データの使用を困難にしています。

ETL 要件は次のとおりです。

  • インストルメント名には、人間が理解できるように意図された貴重な情報が含まれています。 分析を容易にするために、それを個別の列に正規化します。
  • 属性 bought & sold 相互に排他的です。 契約番号を含む単一の列にそれらを統合し、契約がこの順序で売買されたかどうかを示す別の列を持つことができます。
  • 個々のコントラクト割り当てに関する情報を保持したいのですが、ユーザーに数値の配列を処理させるのではなく、個々の行として保持したいと考えています。 数字を合計することはできますが、注文がどのように約定されたか (市場の流動性を示す) に関する情報は失われます。 代わりに、テーブルを非正規化することを選択して、各行に単一の契約数が含まれるようにし、複数の番号を持つ注文を別々の行に分割します。 圧縮された列形式では、圧縮が適用されると、この繰り返しの余分なデータセット サイズが小さくなることが多いため、データセットをクエリしやすくすることは許容されます。
  • 各株式の各オプション タイプ (コールとプット) の出来高の要約テーブルを生成したいと考えています。 これは、各株式の市場センチメントと一般的な市場 (貪欲と恐怖) の指標を提供します。
  • 全体的な取引の要約を可能にするために、各操作に総計を提供し、おおよその換算参照を使用して通貨を米ドルに標準化します。
  • これらの変換が行われた日付を追加します。 これは、たとえば、いつ通貨換算が行われたかを参照するのに役立ちます。

これらの要件に基づいて、ジョブは XNUMX つの出力を生成します。

  • シンボルとタイプごとの契約数の概要を含む CSV ファイル
  • 指示された変換を行った後、注文の履歴を保持するためのカタログ テーブル
    データスキーマ

前提条件

このユースケースに従うには、独自の S3 バケットが必要です。 新しいバケットを作成するには、次を参照してください。 バケットを作成する.

合成データを生成する

この投稿に従う (またはこの種のデータを自分で実験する) ために、このデータセットを合成的に生成できます。 次の Python スクリプトは、Boto3 がインストールされ、アクセスできる Python 環境で実行できます。 Amazon シンプル ストレージ サービス (Amazon S3)。

データを生成するには、次の手順を実行します。

  1. AWS Glue Studio で、オプションを使用して新しいジョブを作成します Python シェル スクリプト エディタ.
  2. ジョブに名前を付けて、 仕事の詳細 タブで、 適切な役割 および Python スクリプトの名前。
  3. 仕事の詳細 セクション、展開 高度なプロパティ 下にスクロールして ジョブパラメータ.
  4. という名前のパラメータを入力してください --bucket サンプルデータの保存に使用するバケットの名前を値として割り当てます。
  5. 次のスクリプトを AWS Glue シェル エディタに入力します。
    import argparse
    import boto3
    from datetime import datetime
    import io
    import json
    import random
    import sys # Configuration
    parser = argparse.ArgumentParser()
    parser.add_argument('--bucket')
    args, ignore = parser.parse_known_args()
    if not args.bucket: raise Exception("This script requires an argument --bucket with the value specifying the S3 bucket where to store the files generated") data_bucket = args.bucket
    data_path = "transformsblog/inputdata"
    samples_per_file = 1000 # Create a single file with synthetic data samples
    s3 = boto3.client('s3')
    buff = io.BytesIO() sample_stocks = [("AMZN", 95, "usd"), ("NKE", 120, "usd"), ("JPM", 130, "usd"), ("KO", 130, "usd"), ("BMW.DE", 95, "eur"), ("SIE.DE", 140, "eur"), ("SAP.DE", 115, "eur")]
    option_type = ["PUT", "CALL"]
    operations = ["sold", "bought"]
    dates = ["MAR 24 23", "APR 28 23", "MAY 26 23", "JUN 30 23"]
    for i in range(samples_per_file): stock = random.choice(sample_stocks) symbol = stock[0] ref_price = stock[1] currency = stock[2] strike_price = round(ref_price * 0.9 + ref_price * random.uniform(0.01, 0.3)) sample = { "order_id": int(datetime.now().timestamp() * 1000) + i, "symbol": stock[0], "instrument":f"{symbol} {random.choice(dates)} {strike_price} {random.choice(option_type)}", "currency": currency, "price": round(random.uniform(0.5, 20.1), 2), "exchange": "EDGX" if currency == "usd" else "XETR" } sample[random.choice(operations)] = [random.randrange(1,100) for i in range(random.randrange(1,5))] buff.write(json.dumps(sample).encode()) buff.write("n".encode()) s3.put_object(Body=buff.getvalue(), Bucket=data_bucket, Key=f"{data_path}/{int(datetime.now().timestamp())}.json")

  6. ジョブを実行し、[実行] タブで正常に完了したと表示されるまで待ちます (数秒かかります)。

実行ごとに、指定されたバケットとプレフィックスの下に 1,000 行を含む JSON ファイルが生成されます transformsblog/inputdata/. より多くの入力ファイルでテストする場合は、ジョブを複数回実行できます。
合成データの各行は、次のような JSON オブジェクトを表すデータ行です。

{ "order_id":1681986991888, "symbol":"AMZN", "instrument":"AMZN APR 28 23 100 PUT", "currency":"usd", "price":2.89, "exchange":"EDGX", "sold":[88,49]
}

AWS Glue ビジュアル ジョブを作成する

AWS Glue ビジュアル ジョブを作成するには、次の手順を実行します。

  1. AWS Glue Studio に移動し、オプションを使用してジョブを作成します 真っ白なキャンバスを使ったビジュアル.
  2. 編集 Untitled job 名前を付けて割り当てる AWS Glue に適したロール 仕事の詳細 タブには何も表示されないことに注意してください。
  3. S3 データ ソースを追加します (名前を付けることができます JSON files source) を入力し、ファイルが保存されている S3 URL を入力します (たとえば、 s3://<your bucket name>/transformsblog/inputdata/)、次に選択します JSONの データ形式として。
  4. 選択 スキーマを推測する そのため、データに基づいて出力スキーマを設定します。

このソース ノードから、トランスフォームをチェーンし続けます。 各トランスフォームを追加するときは、選択したノードが最後に追加されたノードであることを確認してください。これにより、手順で特に指定されていない限り、親として割り当てられます。

適切な親を選択しなかった場合は、親を選択し、構成ペインで別の親を選択することで、いつでも親を編集できます。

ノードの親構成

追加されたノードごとに、特定の名前を付けて (ノードの目的がグラフに表示されるように)、 最適化の適用 タブには何も表示されないことに注意してください。

変換によってスキーマが変更される (たとえば、新しい列が追加される) たびに、出力スキーマを更新して、下流の変換から見えるようにする必要があります。 出力スキーマを手動で編集することもできますが、データ プレビューを使用して行う方が実用的で安全です。
さらに、そのようにして、変換が期待どおりに機能していることを確認できます。 これを行うには、 データプレビュー タブをクリックして変換を選択し、プレビュー セッションを開始します。 変換されたデータが期待どおりであることを確認したら、 出力スキーマ タブを選択して データ プレビュー スキーマを使用する スキーマを自動的に更新します。

新しい種類の変換を追加すると、不足している依存関係に関するメッセージがプレビューに表示される場合があります。 こうなったら選ぶ セッションの終了 新しいノードを開始すると、プレビューは新しい種類のノードを選択します。

機器情報の抽出

計測器名に関する情報を処理して、結果の出力テーブルでアクセスしやすい列に正規化することから始めましょう。

  1. 加える 分割文字列 ノードに名前を付けます Split instrument、空白の正規表現を使用して計器列をトークン化します。 s+ (この場合は XNUMX つのスペースで十分ですが、この方法の方が柔軟で視覚的にも明確です)。
  2. 元の楽器情報をそのまま保持したいので、分割配列の新しい列名を入力します。 instrument_arr.
    分割構成
  3. 追加する 配列から列へ ノードに名前を付けます Instrument columns 作成したばかりの配列列を新しいフィールドに変換します。 symbol、既に列があります。
  4. 列を選択します instrument_arr、最初のトークンをスキップして、出力列を抽出するように指示します month, day, year, strike_price, type 索引の使用 2, 3, 4, 5, 6 (カンマの後のスペースは読みやすくするためのもので、構成には影響しません)。
    アレイ構成

抽出された年は XNUMX 桁のみで表されます。 彼らがXNUMX桁だけを使用する場合、それが今世紀にあると仮定するために一時しのぎを置きましょう.

  1. 加える 派生列 ノードに名前を付けます Four digits year.
  2. 入力します year 派生列としてオーバーライドして、次の SQL 式を入力します。
    CASE WHEN length(year) = 2 THEN ('20' || year) ELSE year END
    年派生列の構成

便宜上、 expiration_date オプションを行使できる最後の日付の参照としてユーザーが持つことができるフィールド。

  1. 加える 列の連結 ノードに名前を付けます Build expiration date.
  2. 新しい列に名前を付ける expiration_date、列を選択します year, month, day (この順序で)、およびスペーサーとしてのハイフン。
    連結日付構成

ここまでの図は、次の例のようになります。

DAG

これまでの新しい列のデータ プレビューは、次のスクリーンショットのようになります。

データプレビュー

契約数の正規化

データの各行は、売買された各オプションの契約数と、注文が約定されたバッチを示しています。 個々のバッチに関する情報を失わずに、個々の行にそれぞれの金額を XNUMX つの金額値で保持し、残りの情報は生成された各行に複製する必要があります。

まず、金額を XNUMX つの列にマージしましょう。

  1. 追加する 行への列のピボット解除 ノードに名前を付けます Unpivot actions.
  2. 列を選択する bought & sold ピボットを解除して、名前と値を名前付きの列に保存します action & contractsそれぞれ。
    構成のピボットを解除
    プレビューで、新しい列が contracts この変換後もまだ数値の配列です。
  1. 追加する 配列の分解または行へのマップ 名前付きの行 Explode contracts.
  2. 選択する contracts 列と入力 contracts それをオーバーライドする新しい列として (元の配列を保持する必要はありません)。

プレビューに、各行に XNUMX つの行があることが示されるようになりました contracts 残りのフィールドは同じです。

これはまた、 order_id は一意のキーではなくなりました。 独自のユースケースでは、データをモデル化する方法と、非正規化するかどうかを決定する必要があります。
分解設定

次のスクリーンショットは、これまでの変換後の新しい列の例です。
データプレビュー

集計表を作成する

ここで、タイプごと、銘柄ごとに取引された契約数を含むサマリー テーブルを作成します。

説明のために、処理されたファイルが XNUMX 日のものであると仮定してみましょう。したがって、この要約は、その日の市場の関心とセンチメントに関する情報をビジネス ユーザーに提供します。

  1. 加える フィールドを選択 ノードを開き、次の列を選択して要約用に保持します。 symbol, type, contracts.
    選択したフィールド
  2. 加える 行を列にピボットする ノードに名前を付けます Pivot summary.
  3. で集計 contracts 使用する列 sum 変換することを選択します type コラム。
    ピボット構成

通常、参照用に外部データベースまたはファイルに保存します。 この例では、Amazon S3 に CSV ファイルとして保存します。

  1. 追加する オートバランス処理 ノードに名前を付けます Single output file.
  2. その変換タイプは通常、並列処理を最適化するために使用されますが、ここでは出力を XNUMX つのファイルに減らすために使用します。 したがって、 1 パーティション構成の数で。
    自動バランス構成
  3. S3 ターゲットを追加して名前を付ける CSV Contract summary.
  4. データ形式として CSV を選択し、ジョブロールがファイルを保存できる S3 パスを入力します。

ジョブの最後の部分は、次の例のようになります。
DAG

  1. ジョブを保存して実行します。 使用 Active Runs タブをクリックして、正常に終了したことを確認します。
    そのパスの下に、その拡張子がないにもかかわらず、CSV であるファイルが見つかります。 ダウンロード後に拡張機能を追加して開く必要がある場合があります。
    CSV を読み取ることができるツールでは、概要は次の例のようになります。
    スプレッドシート

一時列のクリーンアップ

将来の分析のために注文を履歴テーブルに保存する準備として、途中で作成されたいくつかの一時的な列をクリーンアップしましょう。

  1. 加える ドロップ フィールド のノード Explode contracts 親として選択されたノード (別の出力を生成するためにデータ パイプラインを分岐しています)。
  2. ドロップするフィールドを選択します。 instrument_arr, month, day, year.
    保持したい残りの部分は、後で作成する履歴テーブルに保存されます。
    ドロップ フィールド

通貨の標準化

この合成データには、XNUMX つの通貨に対する架空の操作が含まれていますが、実際のシステムでは、世界中の市場から通貨を取得できます。 扱う通貨を単一の参照通貨に標準化すると、レポートや分析のために簡単に比較および集計できるため便利です。

を使用しております アマゾンアテナ 定期的に更新されるおおよその通貨換算でテーブルをシミュレートします (ここでは、換算が比較の目的で合理的な代表となるように、注文をタイムリーに処理すると想定しています)。

  1. AWS Glue を使用しているのと同じリージョンで Athena コンソールを開きます。
  2. 次のクエリを実行して、Athena と AWS Glue の両方のロールが読み書きできる S3 の場所を設定してテーブルを作成します。 また、テーブルを別のデータベースに保存することもできます。 default (その場合は、提供されている例に応じて、テーブルの修飾名を更新してください)。
    CREATE EXTERNAL TABLE default.exchange_rates(currency string, exchange_rate double)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    STORED AS TEXTFILE
    LOCATION 's3://<enter some bucket>/exchange_rates/';

  3. いくつかの変換例を表に入力します。
    INSERT INTO default.exchange_rates VALUES ('usd', 1.0), ('eur', 1.09), ('gbp', 1.24);
  4. 次のクエリでテーブルを表示できるようになりました。
    SELECT * FROM default.exchange_rates
  5. AWS Glue ビジュアル ジョブに戻り、 見上げる ノード (の子として Drop Fields)それに名前を付ける Exchange rate.
  6. 作成したばかりのテーブルの修飾名を入力します。 currency キーとして、 exchange_rate 使用するフィールド。
    フィールドにはデータとルックアップ テーブルの両方で同じ名前が付けられているため、名前を入力するだけで済みます。 currency マッピングを定義する必要はありません。ルックアップ構成
    この記事の執筆時点では、ルックアップ変換はデータ プレビューでサポートされておらず、テーブルが存在しないというエラーが表示されます。 これはデータのプレビューのみを目的としており、ジョブの正常な実行を妨げるものではありません。 この投稿の残りのいくつかの手順では、スキーマを更新する必要はありません。 他のノードでデータ プレビューを実行する必要がある場合は、ルックアップ ノードを一時的に削除してから元に戻すことができます。
  7. 加える 派生列 ノードに名前を付けます Total in usd.
  8. 派生列に名前を付ける total_usd 次の SQL 式を使用します。
    round(contracts * price * exchange_rate, 2)
    通貨換算設定
  9. 加える 現在のタイムスタンプを追加 ノードに名前を付けて列に名前を付けます ingest_date.
  10. フォーマットを使用する %Y-%m-%d タイムスタンプ (デモンストレーションの目的で、日付を使用しているだけです。必要に応じて、より正確にすることができます)。
    タイムスタンプ構成

過去の注文テーブルを保存する

過去の注文テーブルを保存するには、次の手順を実行します。

  1. S3 ターゲット ノードを追加して名前を付ける Orders table.
  2. snappy 圧縮を使用して Parquet 形式を構成し、結果を保存するための S3 ターゲット パスを提供します (概要とは別に)。
  3. 選択 Data Catalog にテーブルを作成し、その後の実行でスキーマを更新して新しいパーティションを追加します.
  4. ターゲット データベースと新しいテーブルの名前を入力します。たとえば、次のようになります。 option_orders.
    テーブル シンクの構成

ダイアグラムの最後の部分は、次のようになり、XNUMX つの別個の出力に XNUMX つのブランチが表示されます。
DAG

ジョブを正常に実行した後、Athena などのツールを使用して、新しいテーブルをクエリすることにより、ジョブが生成したデータを確認できます。 Athena リストでテーブルを見つけて選択できます。 プレビュー表 または、単に SELECT クエリを実行します (テーブル名を、使用した名前とカタログに更新します)。

SELECT * FROM default.option_orders limit 10

テーブルのコンテンツは、次のスクリーンショットのようになります。
テーブルの内容

クリーンアップ

この例を維持したくない場合は、作成した 3 つのジョブ、Athena の XNUMX つのテーブル、および入力ファイルと出力ファイルが保存されていた SXNUMX パスを削除します。

まとめ

この投稿では、AWS Glue Studio の新しい変換が、最小限の構成でより高度な変換を行うのにどのように役立つかを示しました。 これは、コードを記述して維持する必要なく、より多くの ETL ユース ケースを実装できることを意味します。 新しい変換は AWS Glue Studio で既に利用可能であるため、ビジュアルジョブで新しい変換を今すぐ使用できます。


著者,

ゴンザロエレロス AWS Glue チームのシニアビッグデータアーキテクトです。

タイムスタンプ:

より多くの AWSビッグデータ