AWS Glue Studio 中的十个新视觉转换

AWS Glue Studio 中的十个新视觉转换

源节点: 2641422

AWS 胶水工作室 是一个图形界面,使创建、运行和监视提取、转换和加载 (ETL) 作业变得容易 AWS胶水. 它允许您使用代表不同数据处理步骤的节点可视化地组合数据转换工作流,这些步骤随后会自动转换为代码以运行。

AWS 胶水工作室 最近发布 10 多个视觉转换,无需编码技能即可以视觉方式创建更高级的工作。 在本文中,我们将讨论反映常见 ETL 需求的潜在用例。

这篇文章中将演示的新转换是:连接、拆分字符串、数组到列、添加当前时间戳、将行转为列、将列转为行、查找、分解数组或映射到列、派生列和自动平衡处理.

解决方案概述

在此用例中,我们有一些包含股票期权操作的 JSON 文件。 我们希望在存储数据之前进行一些转换,以便于分析,我们还希望产生一个单独的数据集摘要。

在此数据集中,每一行代表一次期权合约交易。 期权是提供以固定价格(称为  执行价格) 在定义的到期日期之前。

输入数据

数据遵循以下架构:

  • ORDER_ID – 唯一 ID
  • 符号 – 通常基于几个字母的代码,用于识别发行标的股票的公司
  • 仪器 – 标识被买入或卖出的特定期权的名称
  • 货币 – 表示价格的 ISO 货币代码
  • 车资 – 购买每份期权合约所支付的金额(在大多数交易所,一份合约允许您买卖 100 股股票)
  • 交换 – 交易期权的交易中心或地点的代码
  • 出售 – 当这是卖出交易时分配给卖出订单的合约数量列表
  • – 当这是买入交易时分配给买入订单的合约数量列表

以下是为这篇文章生成的综合数据示例:

{"order_id": 1679931512485, "symbol": "AMZN", "instrument": "AMZN MAR 24 23 102 PUT", "currency": "usd", "price": 17.18, "exchange": "EDGX", "bought": [18, 38]}
{"order_id": 1679931512486, "symbol": "BMW.DE", "instrument": "BMW.DE MAR 24 23 96 PUT", "currency": "eur", "price": 2.98, "exchange": "XETR", "bought": [28]}
{"order_id": 1679931512487, "symbol": "BMW.DE", "instrument": "BMW.DE APR 28 23 101 CALL", "currency": "eur", "price": 14.71, "exchange": "XETR", "sold": [9, 59, 54]}
{"order_id": 1679931512489, "symbol": "JPM", "instrument": "JPM JUN 30 23 140 CALL", "currency": "usd", "price": 11.83, "exchange": "EDGX", "bought": [33, 42, 55, 67]}
{"order_id": 1679931512490, "symbol": "SIE.DE", "instrument": "SIE.DE MAR 24 23 149 CALL", "currency": "eur", "price": 13.68, "exchange": "XETR", "bought": [96, 89, 82]}
{"order_id": 1679931512491, "symbol": "NKE", "instrument": "NKE MAR 24 23 112 CALL", "currency": "usd", "price": 3.23, "exchange": "EDGX", "sold": [67]}
{"order_id": 1679931512492, "symbol": "AMZN", "instrument": "AMZN MAY 26 23 95 CALL", "currency": "usd", "price": 11.44, "exchange": "EDGX", "sold": [41, 62, 12]}
{"order_id": 1679931512493, "symbol": "JPM", "instrument": "JPM MAR 24 23 121 PUT", "currency": "usd", "price": 1.0, "exchange": "EDGX", "bought": [61, 34]}
{"order_id": 1679931512494, "symbol": "SAP.DE", "instrument": "SAP.DE MAR 24 23 132 CALL", "currency": "eur", "price": 15.9, "exchange": "XETR", "bought": [69, 33]}

ETL要求

这些数据具有许多独特的特征,这些特征在旧系统中很常见,这使得数据更难使用。

以下是 ETL 要求:

  • 仪器名称具有有价值的信息,旨在供人类理解; 我们想将其规范化为单独的列以便于分析。
  • 属性 boughtsold 是互斥的; 我们可以将它们与合同编号合并到一个列中,并在另一列中指示合同是按此顺序买卖的。
  • 我们希望保留有关单个合约分配的信息,但作为单独的行,而不是强迫用户处理一组数字。 我们可以将这些数字相加,但我们会丢失有关订单如何执行的信息(表明市场流动性)。 相反,我们选择对表进行非规范化,这样每一行都有一个合同编号,将具有多个编号的订单拆分到单独的行中。 在压缩的柱状格式中,这种重复的额外数据集大小在应用压缩时通常很小,因此使数据集更易于查询是可以接受的。
  • 我们想要为每只股票的每种期权类型(看涨期权和看跌期权)生成一个交易量汇总表。 这表明了每只股票和整个市场的市场情绪(贪婪与恐惧)。
  • 为了实现整体贸易汇总,我们希望为每项操作提供总计,并使用近似换算参考将货币标准化为美元。
  • 我们想添加这些转换发生的日期。 这可能很有用,例如,参考何时进行货币转换。

根据这些要求,作业将产生两个输出:

  • 一个 CSV 文件,其中包含每个代码和类型的合约数量摘要
  • 在完成指定的转换后,用于保存订单历史记录的目录表
    数据模式

先决条件

您将需要自己的 S3 存储桶来完成此用例。 要创建新存储桶,请参阅 创建一个桶.

生成合成数据

要跟随这篇文章(或自己试验此类数据),您可以综合生成此数据集。 以下 Python 脚本可以在安装了 Boto3 并访问的 Python 环境中运行 亚马逊简单存储服务 (亚马逊S3)。

要生成数据,请完成以下步骤:

  1. 在 AWS Glue Studio 上,使用选项创建一个新作业 Python shell 脚本编辑器.
  2. 给工作起个名字,然后在 工作细节 选项卡,选择一个 合适的角色 和 Python 脚本的名称。
  3. 工作细节 部分,展开 先进的属性 并向下滚动至 作业参数.
  4. 输入一个名为的参数 --bucket 并将要用于存储示例数据的存储桶的名称指定为值。
  5. 在 AWS Glue shell 编辑器中输入以下脚本:
    import argparse
    import boto3
    from datetime import datetime
    import io
    import json
    import random
    import sys # Configuration
    parser = argparse.ArgumentParser()
    parser.add_argument('--bucket')
    args, ignore = parser.parse_known_args()
    if not args.bucket: raise Exception("This script requires an argument --bucket with the value specifying the S3 bucket where to store the files generated") data_bucket = args.bucket
    data_path = "transformsblog/inputdata"
    samples_per_file = 1000 # Create a single file with synthetic data samples
    s3 = boto3.client('s3')
    buff = io.BytesIO() sample_stocks = [("AMZN", 95, "usd"), ("NKE", 120, "usd"), ("JPM", 130, "usd"), ("KO", 130, "usd"), ("BMW.DE", 95, "eur"), ("SIE.DE", 140, "eur"), ("SAP.DE", 115, "eur")]
    option_type = ["PUT", "CALL"]
    operations = ["sold", "bought"]
    dates = ["MAR 24 23", "APR 28 23", "MAY 26 23", "JUN 30 23"]
    for i in range(samples_per_file): stock = random.choice(sample_stocks) symbol = stock[0] ref_price = stock[1] currency = stock[2] strike_price = round(ref_price * 0.9 + ref_price * random.uniform(0.01, 0.3)) sample = { "order_id": int(datetime.now().timestamp() * 1000) + i, "symbol": stock[0], "instrument":f"{symbol} {random.choice(dates)} {strike_price} {random.choice(option_type)}", "currency": currency, "price": round(random.uniform(0.5, 20.1), 2), "exchange": "EDGX" if currency == "usd" else "XETR" } sample[random.choice(operations)] = [random.randrange(1,100) for i in range(random.randrange(1,5))] buff.write(json.dumps(sample).encode()) buff.write("n".encode()) s3.put_object(Body=buff.getvalue(), Bucket=data_bucket, Key=f"{data_path}/{int(datetime.now().timestamp())}.json")

  6. 运行作业并等待它在“运行”选项卡上显示为成功完成(应该只需要几秒钟)。

每次运行都会在指定的存储桶和前缀下生成一个包含 1,000 行的 JSON 文件 transformsblog/inputdata/. 如果您想使用更多输入文件进行测试,您可以多次运行该作业。
合成数据中的每一行都是代表 JSON 对象的数据行,如下所示:

{ "order_id":1681986991888, "symbol":"AMZN", "instrument":"AMZN APR 28 23 100 PUT", "currency":"usd", "price":2.89, "exchange":"EDGX", "sold":[88,49]
}

创建 AWS Glue 视觉作业

要创建 AWS Glue 视觉作业,请完成以下步骤:

  1. 转到 AWS Glue Studio 并使用选项创建作业 带有空白画布的视觉效果.
  2. 编辑 Untitled job 给它起个名字并分配 适合 AWS Glue 的角色工作细节 标签。
  3. 添加一个 S3 数据源(你可以命名它 JSON files source) 并输入存储文件的 S3 URL(例如, s3://<your bucket name>/transformsblog/inputdata/),然后选择 JSON 作为数据格式。
  4. 选择 推断架构 所以它根据数据设置输出模式。

从这个源节点开始,您将继续链接转换。 添加每个变换时,请确保所选节点是最后添加的节点,以便将其指定为父节点,除非说明中另有说明。

如果您没有选择正确的父项,您始终可以通过选择它并在配置窗格中选择另一个父项来编辑父项。

节点父配置

对于添加的每个节点,您将为其指定一个特定名称(因此节点用途显示在图中)和配置 改造 标签。

每次转换更改架构(例如,添加新列)时,都需要更新输出架构,以便下游转换可见。 您可以手动编辑输出架构,但使用数据预览更实用、更安全。
此外,您可以通过这种方式验证转换是否按预期进行。 为此,打开 资料预览 选择转换的选项卡并启动预览会话。 在您验证转换后的数据看起来符合预期后,转到 输出模式 标签并选择 使用数据预览架构 自动更新架构。

当您添加新类型的转换时,预览可能会显示有关缺少依赖项的消息。 发生这种情况时,选择 结束会话 并开始一个新的节点,因此预览会选择新的节点类型。

提取仪器信息

让我们从处理仪器名称的信息开始,将其规范化为在生成的输出表中更容易访问的列。

  1. 添加 拆分字符串 节点并命名 Split instrument,这将使用空白正则表达式标记仪器列: s+ (在这种情况下,一个空格就可以了,但这种方式更灵活,视觉上也更清晰)。
  2. 我们希望保持原始仪器信息不变,因此为拆分数组输入一个新的列名: instrument_arr.
    拆分配置
  3. 添加一个 数组到列 节点并命名 Instrument columns 将刚刚创建的数组列转换为新字段,除了 symbol,为此我们已经有一列。
  4. 选择列 instrument_arr, 跳过第一个标记并告诉它提取输出列 month, day, year, strike_price, type 使用索引 2, 3, 4, 5, 6 (逗号后面的空格是为了便于阅读,它们不影响配置)。
    阵列配置

提取的年份只用两位数表示; 如果他们只使用两位数,让我们暂时假设它是在本世纪。

  1. 添加 派生列 节点并命名 Four digits year.
  2. 输入 year 作为派生列,因此它会覆盖它,并输入以下 SQL 表达式:
    CASE WHEN length(year) = 2 THEN ('20' || year) ELSE year END
    年份派生列配置

为了方便起见,我们建立一个 expiration_date 用户可以作为期权行使最后日期参考的字段。

  1. 添加 连接列 节点并命名 Build expiration date.
  2. 命名新列 expiration_date, 选择列 year, monthday (按此顺序)和一个连字符作为间隔符。
    串联日期配置

到目前为止的图表应该类似于以下示例。

DAG

到目前为止,新列的数据预览应类似于以下屏幕截图。

资料预览

合同数量标准化

数据中的每一行表示买入或卖出的每个期权的合约数量以及订单执行的批次。 在不丢失有关各个批次的信息的情况下,我们希望将每个金额放在具有单个金额值的单独行中,而其余信息将复制到生成的每一行中。

首先,让我们将金额合并到一列中。

  1. 添加一个 将列逆透视为行 节点并命名 Unpivot actions.
  2. 选择列 boughtsold 取消透视并将名称和值存储在名为的列中 actioncontracts
    逆轴配置
    在预览中注意新列 contracts 变换后仍然是一个数字数组。
  1. 添加一个 分解数组或映射成行 行命名 Explode contracts.
  2. 选择 contracts 列并输入 contracts 作为覆盖它的新列(我们不需要保留原始数组)。

预览现在显示每一行都有一个 contracts amount,其余字段相同。

这也意味着 order_id 不再是唯一键。 对于您自己的用例,您需要决定如何对数据建模以及是否要进行非规范化。
爆炸配置

以下屏幕截图是迄今为止转换后新列的外观示例。
资料预览

创建汇总表

现在您创建一个汇总表,其中包含每种类型和每种股票代码的交易合约数量。

为了便于说明,我们假设处理的文件属于一天,因此此摘要向业务用户提供有关当天市场兴趣和情绪的信息。

  1. 添加 选择字段 节点并选择以下列保留摘要: symbol, typecontracts.
    选定字段
  2. 添加 将行透视为列 节点并命名 Pivot summary.
  3. 聚合于 contracts 列使用 sum 并选择转换 type 列。
    枢轴配置

通常,您会将其存储在某个外部数据库或文件中以供参考; 在此示例中,我们将其保存为 Amazon S3 上的 CSV 文件。

  1. 添加一个 自动平衡处理 节点并命名 Single output file.
  2. 虽然该转换类型通常用于优化并行性,但在这里我们使用它来将输出减少到单个文件。 因此,输入 1 在分区数配置中。
    自动平衡配置
  3. 添加 S3 目标并命名 CSV Contract summary.
  4. 选择 CSV 作为数据格式并输入允许作业角色存储文件的 S3 路径。

作业的最后一部分现在应该类似于以下示例。
DAG

  1. 保存并运行作业。 使用 运行 选项卡以检查它何时成功完成。
    尽管没有该扩展名,但您会在该路径下找到一个 CSV 文件。 下载后您可能需要添加扩展程序才能打开它。
    在可以读取 CSV 的工具上,摘要应类似于以下示例。
    电子表格

清理临时列

为了准备将订单保存到历史表中以供将来分析,让我们清理一些在此过程中创建的临时列。

  1. 添加 拖放字段 节点与 Explode contracts 选择作为其父节点的节点(我们正在分支数据管道以生成单独的输出)。
  2. 选择要删除的字段: instrument_arr, month, dayyear.
    其余的我们要保留,以便将它们保存在我们稍后创建的历史表中。
    删除字段

货币标准化

该合成数据包含对两种货币的虚构操作,但在真实系统中,您可以获得来自世界各地市场的货币。 将处理的货币标准化为单一参考货币非常有用,这样可以轻松地比较和汇总它们以进行报告和分析。

我们使用 亚马逊雅典娜 模拟一个包含定期更新的近似货币转换的表(这里我们假设我们足够及时地处理订单,以便转换是用于比较目的的合理代表)。

  1. 在您使用 AWS Glue 的同一区域中打开 Athena 控制台。
  2. 运行以下查询以通过设置 S3 位置创建表,您的 Athena 和 AWS Glue 角色都可以在该位置读取和写入。 此外,您可能希望将表存储在不同的数据库中 default (如果这样做,请在提供的示例中相应地更新表限定名称)。
    CREATE EXTERNAL TABLE default.exchange_rates(currency string, exchange_rate double)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    STORED AS TEXTFILE
    LOCATION 's3://<enter some bucket>/exchange_rates/';

  3. 在表中输入一些示例转换:
    INSERT INTO default.exchange_rates VALUES ('usd', 1.0), ('eur', 1.09), ('gbp', 1.24);
  4. 您现在应该能够使用以下查询查看表:
    SELECT * FROM default.exchange_rates
  5. 回到 AWS Glue 视觉作业,添加一个 查找 节点(作为 Drop Fields)并命名 Exchange rate.
  6. 输入您刚刚创建的表的限定名称,使用 currency 作为键并选择 exchange_rate 要使用的字段。
    因为该字段在数据和查找表中的名称相同,所以我们可以直接输入名称 currency 并且不需要定义映射。查找配置
    在撰写本文时,数据预览不支持 Lookup 转换,它会显示表不存在的错误。 这仅用于数据预览,不会阻止作业正常运行。 帖子中剩下的几个步骤不需要您更新架构。 如果需要在其他节点上运行数据预览,可以暂时移除查找节点,然后再放回去。
  7. 添加 派生列 节点并命名 Total in usd.
  8. 命名派生列 total_usd 并使用以下 SQL 表达式:
    round(contracts * price * exchange_rate, 2)
    货币转换配置
  9. 添加 添加当前时间戳 节点并命名列 ingest_date.
  10. 使用格式 %Y-%m-%d 对于您的时间戳(出于演示目的,我们仅使用日期;如果您愿意,可以使其更精确)。
    时间戳配置

保存历史订单表

要保存历史订单表,请完成以下步骤:

  1. 添加一个 S3 目标节点并命名 Orders table.
  2. 使用 snappy 压缩配置 Parquet 格式,并提供用于存储结果的 S3 目标路径(与摘要分开)。
  3. 选择 在数据目录中创建表,并在后续运行中更新架构并添加新分区.
  4. 输入目标数据库和新表的名称,例如: option_orders.
    表接收器配置

该图的最后一部分现在应该类似于下图,其中两个分支用于两个单独的输出。
DAG

成功运行作业后,您可以使用 Athena 等工具通过查询新表来查看作业生成的数据。 您可以在 Athena 列表中找到该表并选择 预览表 或者只运行一个 SELECT 查询(将表名更新为您使用的名称和目录):

SELECT * FROM default.option_orders limit 10

您的表格内容应类似于以下屏幕截图。
表格内容

清理

如果您不想保留此示例,请删除您创建的两个作业、Athena 中的两个表以及存储输入和输出文件的 S3 路径。

结论

在本文中,我们展示了 AWS Glue Studio 中的新转换如何帮助您以最少的配置进行更高级的转换。 这意味着您可以实施更多的 ETL 用例,而无需编写和维护任何代码。 新的转换已在 AWS Glue Studio 上可用,因此您现在可以在视觉作业中使用新的转换。


关于作者

贡萨洛·埃雷罗斯 是 AWS Glue 团队的高级大数据架构师。

时间戳记:

更多来自 AWS 大数据