掌握市场动态:利用超精确的订单历史记录转变交易成本分析 – PCAP 和 Amazon Athena for Apache Spark |亚马逊网络服务

掌握市场动态:利用超精确的订单历史记录转变交易成本分析 – PCAP 和 Amazon Athena for Apache Spark |亚马逊网络服务

源节点: 3091357

这篇文章是与 LSEG 低延迟小组的 Pramod Nayak、LakshmiKanth Mannem 和 Vivek Aggarwal 共同撰写的。

交易成本分析 (TCA) 被交易者、投资组合经理和经纪商广泛用于交易前和交易后分析,帮助他们衡量和优化交易成本及其交易策略的有效性。在这篇文章中,我们分析了期权买卖价差 LSEG 变动历史 – PCAP 数据集使用 适用于 Apache Spark 的亚马逊雅典娜。我们向您展示如何访问数据、定义应用于数据的自定义函数、查询和过滤数据集以及可视化分析结果,所有这些都无需担心设置基础架构或配置 Spark,即使对于大型数据集也是如此。

背景

期权价格报告局 (OPRA) 是一个重要的证券信息处理机构,负责收集、整合和传播美国期权的最新销售报告、报价和相关信息。 OPRA 拥有 18 个活跃的美国期权交易所和超过 1.5 万份合格合约,在提供全面的市场数据方面发挥着关键作用。

5 年 2024 月 48 日,证券业自动化公司 (SIAC) 计划将 OPRA 源从 96 个多播通道升级到 37.3 个。此增强功能旨在优化符号分布和线路容量利用率,以应对美国期权市场不断升级的交易活动和波动性。 SIAC 建议各公司为高达每秒 XNUMX GB 的峰值数据速率做好准备。

尽管升级不会立即改变已发布数据的总量,但它使 OPRA 能够以明显更快的速度传播数据。这种转变对于满足动态期权市场的需求至关重要。

OPRA 是数量最多的 Feed 之一,在 150.4 年第三季度达到单日 3 亿条消息的峰值,并且单日的容量空间需求为 2023 亿条消息。捕获每条消息对于交易成本分析、市场流动性监控、交易策略评估和市场研究至关重要。

关于数据

LSEG 变动历史 – PCAP 是一个基于云的存储库,超过 30 PB,容纳超高质量的全球市场数据。这些数据是在交换数据中心内直接捕获的,采用战略性地位于全球主要主要和备份交换数据中心的冗余捕获流程。 LSEG 的捕获技术可确保无损数据捕获,并使用 GPS 时间源实现纳秒时间戳精度。此外,还采用复杂的数据套利技术来无缝填补任何数据缺口。捕获后,数据经过细致的处理和仲裁,然后使用以下方法将其规范化为 Parquet 格式: LSEG 的实时超直接 (RTUD) 饲料处理机。

规范化过程是准备分析数据不可或缺的一部分,每天生成多达 6 TB 的压缩 Parquet 文件。海量数据归因于 OPRA 的包容性,跨越多个交易所,并具有众多具有不同属性的期权合约。市场波动性的增加和期权交易所的做市活动进一步增加了 OPRA 上发布的数据量。

Tick History – PCAP 的属性使公司能够进行各种分析,包括以下内容:

  • 交易前分析 – 评估潜在的贸易影响并根据历史数据探索不同的执行策略
  • 交易后评估 – 根据基准衡量实际执行成本,以评估执行策略的绩效
  • 优化 执行 – 根据历史市场模式微调执行策略,以尽量减少市场影响并降低总体交易成本
  • 风险管理 – 识别滑点模式、识别异常值并主动管理与交易活动相关的风险
  • 绩效归因 – 在分析投资组合表现时将交易决策与投资决策的影响分开

LSEG Tick History – PCAP 数据集可在 AWS数据交换 并且可以访问 AWS Marketplace。 同 适用于 Amazon S3 的 AWS 数据交换,您可以直接从 LSEG 访问 PCAP 数据 亚马逊简单存储服务 (Amazon S3) 存储桶,企业无需存储自己的数据副本。这种方法简化了数据管理和存储,使客户能够立即访问高质量的 PCAP 或标准化数据,并且易于使用、集成和 节省大量数据存储.

Apache Spark 的 Athena

对于分析工作, Apache Spark 的 Athena 提供可通过 Athena 控制台或 Athena API 访问的简化笔记本体验,允许您构建交互式 Apache Spark 应用程序。借助优化的 Spark 运行时,Athena 通过在不到一秒的时间内动态扩展 Spark 引擎的数量来帮助分析 PB 级数据。此外,pandas 和 NumPy 等常用 Python 库无缝集成,允许创建复杂的应用程序逻辑。这种灵活性还延伸到了在笔记本中使用的自定义库的导入。 Athena for Spark 可容纳大多数开放数据格式,并与 AWS胶水 数据目录。

数据集

在本次分析中,我们使用了 17 年 2023 月 XNUMX 日的 LSEG Tick History – PCAP OPRA 数据集。该数据集包含以下组成部分:

  • 最佳买价和卖价(BBO) – 报告给定交易所证券的最高出价和最低要价
  • 全国最佳买入价和卖出价(NBBO) – 报告所有交易所中证券的最高出价和最低要价
  • 交易 – 记录所有交易所已完成的交易

该数据集涉及以下数据量:

  • 交易 – 160 MB 分布在大约 60 个压缩的 Parquet 文件中
  • BBO – 2.4 TB 分布在大约 300 个压缩 Parquet 文件中
  • NBBO – 2.8 TB 分布在大约 200 个压缩 Parquet 文件中

分析概览

分析 OPRA 报价历史数据以进行交易成本分析 (TCA) 涉及仔细审查特定交易事件的市场报价和交易。我们使用以下指标作为本研究的一部分:

  • 报价点差 (QS) – 计算为 BBO 卖价和 BBO 买价之间的差额
  • 有效价差 (ES) – 计算为交易价格与 BBO 中点之间的差额(BBO 买价 +(BBO 卖价 – BBO 买价)/2)
  • 有效/报价价差 (EQF) – 计算公式为 (ES / QS) * 100

我们在交易前以及交易后的四个时间间隔(交易后、1 秒、10 秒和 60 秒)计算这些点差。

为 Apache Spark 配置 Athena

要为 Apache Spark 配置 Athena,请完成以下步骤:

  1. 在 Athena 控制台上,在 前往在线商城, 选择 使用 PySpark 和 Spark SQL 分析数据.
  2. 如果这是您第一次使用 Athena Spark,请选择 创建工作组.
  3. 针对 工作组名称¸ 输入工作组的名称,例如 tca-analysis.
  4. 分析引擎 部分,选择 Apache Spark.
  5. 附加配置 部分,您可以选择 使用默认值 或提供自定义 AWS身份和访问管理 (IAM) 角色和计算结果的 Amazon S3 位置。
  6. 创建工作组.
  7. 创建工作组后,导航至 笔记本电脑 标签并选择 创建笔记本.
  8. 输入笔记本的名称,例如 tca-analysis-with-tick-history.
  9. 创建 创建你的笔记本。

启动你的笔记本

如果您已经创建了 Spark 工作组,请选择 启动笔记本编辑器前往在线商城.


创建笔记本后,您将被重定向到交互式笔记本编辑器。


现在我们可以将以下代码添加到我们的笔记本中并运行。

创建分析

完成以下步骤来创建分析:

  • 导入常用库:
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go

  • 为 BBO、NBBO 和交易创建数据框:
bbo_quote = spark.read.parquet(f"s3://<bucket>/mt=bbo_quote/f=opra/dt=2023-05-17/*")
bbo_quote.createOrReplaceTempView("bbo_quote")
nbbo_quote = spark.read.parquet(f"s3://<bucket>/mt=nbbo_quote/f=opra/dt=2023-05-17/*")
nbbo_quote.createOrReplaceTempView("nbbo_quote")
trades = spark.read.parquet(f"s3://<bucket>/mt=trade/f=opra/dt=2023-05-17/29_1.parquet")
trades.createOrReplaceTempView("trades")

  • 现在我们可以确定用于交易成本分析的交易:
filtered_trades = spark.sql("select Product, Price,Quantity, ReceiptTimestamp, MarketParticipant from trades")

我们得到以下输出:

+---------------------+---------------------+---------------------+-------------------+-----------------+ 
|Product |Price |Quantity |ReceiptTimestamp |MarketParticipant| 
+---------------------+---------------------+---------------------+-------------------+-----------------+ 
|QQQ 230518C00329000|1.1700000000000000000|10.0000000000000000000|1684338565538021907,NYSEArca|
|QQQ 230518C00329000|1.1700000000000000000|20.0000000000000000000|1684338576071397557,NASDAQOMXPHLX|
|QQQ 230518C00329000|1.1600000000000000000|1.0000000000000000000|1684338579104713924,ISE|
|QQQ 230518C00329000|1.1400000000000000000|1.0000000000000000000|1684338580263307057,NASDAQOMXBX_Options|
|QQQ 230518C00329000|1.1200000000000000000|1.0000000000000000000|1684338581025332599,ISE|
+---------------------+---------------------+---------------------+-------------------+-----------------+

我们使用突出显示的未来交易信息来表示交易产品 (tp)、交易价格 (tpr) 和交易时间 (tt)。

  • 这里我们创建了一些辅助函数来进行分析
def calculate_es_qs_eqf(df, trade_price):
    df['BidPrice'] = df['BidPrice'].astype('double')
    df['AskPrice'] = df['AskPrice'].astype('double')
    df["ES"] = ((df["AskPrice"]-df["BidPrice"])/2) - trade_price
    df["QS"] = df["AskPrice"]-df["BidPrice"]
    df["EQF"] = (df["ES"]/df["QS"])*100
    return df

def get_trade_before_n_seconds(trade_time, df, seconds=0, groupby_col = None):
    nseconds=seconds*1000000000
    nseconds += trade_time
    ret_df = df[df['ReceiptTimestamp'] < nseconds].groupby(groupby_col).last()
    ret_df['BidPrice'] = ret_df['BidPrice'].astype('double')
    ret_df['AskPrice'] = ret_df['AskPrice'].astype('double')
    ret_df = ret_df.reset_index()
    return ret_df

def get_trade_after_n_seconds(trade_time, df, seconds=0, groupby_col = None):
    nseconds=seconds*1000000000
    nseconds += trade_time
    ret_df = df[df['ReceiptTimestamp'] > nseconds].groupby(groupby_col).first()
    ret_df['BidPrice'] = ret_df['BidPrice'].astype('double')
    ret_df['AskPrice'] = ret_df['AskPrice'].astype('double')
    ret_df = ret_df.reset_index()
    return ret_df

def get_nbbo_trade_before_n_seconds(trade_time, df, seconds=0):
    nseconds=seconds*1000000000
    nseconds += trade_time
    ret_df = df[df['ReceiptTimestamp'] < nseconds].iloc[-1:]
    ret_df['BidPrice'] = ret_df['BidPrice'].astype('double')
    ret_df['AskPrice'] = ret_df['AskPrice'].astype('double')
    return ret_df

def get_nbbo_trade_after_n_seconds(trade_time, df, seconds=0):
    nseconds=seconds*1000000000
    nseconds += trade_time
    ret_df = df[df['ReceiptTimestamp'] > nseconds].iloc[:1]
    ret_df['BidPrice'] = ret_df['BidPrice'].astype('double')
    ret_df['AskPrice'] = ret_df['AskPrice'].astype('double')
    return ret_df

  • 在以下函数中,我们创建包含交易前后所有报价的数据集。 Athena Spark 自动确定启动多少个 DPU 来处理我们的数据集。
def get_tca_analysis_via_df_single_query(trade_product, trade_price, trade_time):
    # BBO quotes
    bbos = spark.sql(f"SELECT Product, ReceiptTimestamp, AskPrice, BidPrice, MarketParticipant FROM bbo_quote where Product = '{trade_product}';")
    bbos = bbos.toPandas()

    bbo_just_before = get_trade_before_n_seconds(trade_time, bbos, seconds=0, groupby_col='MarketParticipant')
    bbo_just_after = get_trade_after_n_seconds(trade_time, bbos, seconds=0, groupby_col='MarketParticipant')
    bbo_1s_after = get_trade_after_n_seconds(trade_time, bbos, seconds=1, groupby_col='MarketParticipant')
    bbo_10s_after = get_trade_after_n_seconds(trade_time, bbos, seconds=10, groupby_col='MarketParticipant')
    bbo_60s_after = get_trade_after_n_seconds(trade_time, bbos, seconds=60, groupby_col='MarketParticipant')
    
    all_bbos = pd.concat([bbo_just_before, bbo_just_after, bbo_1s_after, bbo_10s_after, bbo_60s_after], ignore_index=True, sort=False)
    bbos_calculated = calculate_es_qs_eqf(all_bbos, trade_price)

    #NBBO quotes
    nbbos = spark.sql(f"SELECT Product, ReceiptTimestamp, AskPrice, BidPrice, BestBidParticipant, BestAskParticipant FROM nbbo_quote where Product = '{trade_product}';")
    nbbos = nbbos.toPandas()

    nbbo_just_before = get_nbbo_trade_before_n_seconds(trade_time,nbbos, seconds=0)
    nbbo_just_after = get_nbbo_trade_after_n_seconds(trade_time, nbbos, seconds=0)
    nbbo_1s_after = get_nbbo_trade_after_n_seconds(trade_time, nbbos, seconds=1)
    nbbo_10s_after = get_nbbo_trade_after_n_seconds(trade_time, nbbos, seconds=10)
    nbbo_60s_after = get_nbbo_trade_after_n_seconds(trade_time, nbbos, seconds=60)

    all_nbbos = pd.concat([nbbo_just_before, nbbo_just_after, nbbo_1s_after, nbbo_10s_after, nbbo_60s_after], ignore_index=True, sort=False)
    nbbos_calculated = calculate_es_qs_eqf(all_nbbos, trade_price)

    calc = pd.concat([bbos_calculated, nbbos_calculated], ignore_index=True, sort=False)
    
    return calc

  • 现在让我们使用所选交易的信息调用 TCA 分析函数:
tp = "QQQ 230518C00329000"
tpr = 1.16
tt = 1684338579104713924
c = get_tca_analysis_via_df_single_query(tp, tpr, tt)

可视化分析结果

现在让我们创建用于可视化的数据框。每个数据框包含每个数据源的五个时间间隔之一的报价(BBO、NBBO):

bbo = c[c['MarketParticipant'].isin(['BBO'])]
bbo_bef = bbo[bbo['ReceiptTimestamp'] < tt]
bbo_aft_0 = bbo[bbo['ReceiptTimestamp'].between(tt,tt+1000000000)]
bbo_aft_1 = bbo[bbo['ReceiptTimestamp'].between(tt+1000000000,tt+10000000000)]
bbo_aft_10 = bbo[bbo['ReceiptTimestamp'].between(tt+10000000000,tt+60000000000)]
bbo_aft_60 = bbo[bbo['ReceiptTimestamp'] > (tt+60000000000)]

nbbo = c[~c['MarketParticipant'].isin(['BBO'])]
nbbo_bef = nbbo[nbbo['ReceiptTimestamp'] < tt]
nbbo_aft_0 = nbbo[nbbo['ReceiptTimestamp'].between(tt,tt+1000000000)]
nbbo_aft_1 = nbbo[nbbo['ReceiptTimestamp'].between(tt+1000000000,tt+10000000000)]
nbbo_aft_10 = nbbo[nbbo['ReceiptTimestamp'].between(tt+10000000000,tt+60000000000)]
nbbo_aft_60 = nbbo[nbbo['ReceiptTimestamp'] > (tt+60000000000)]

在以下部分中,我们提供示例代码来创建不同的可视化效果。

交易前绘制 QS 和 NBBO

使用以下代码绘制交易前的报价点差和 NBBO:

fig = px.bar(title="Quoted Spread Before The Trade",
    x=bbo_bef.MarketParticipant,
    y=bbo_bef['QS'],
    labels={'x': 'Market', 'y':'Quoted Spread'})
fig.add_hline(y=nbbo_bef.iloc[0]['QS'],
    line_width=1, line_dash="dash", line_color="red",
    annotation_text="NBBO", annotation_font_color="red")
%plotly fig

绘制每个市场的 QS 和交易后的 NBBO

使用以下代码在交易后立即绘制每个市场和 NBBO 的报价点差:

fig = px.bar(title="Quoted Spread After The Trade",
    x=bbo_aft_0.MarketParticipant,
    y=bbo_aft_0['QS'],
    labels={'x': 'Market', 'y':'Quoted Spread'})
fig.add_hline(
    y=nbbo_aft_0.iloc[0]['QS'],
    line_width=1, line_dash="dash", line_color="red",
    annotation_text="NBBO", annotation_font_color="red")
%plotly fig

绘制 BBO 每个时间间隔和每个市场的 QS

使用以下代码绘制 BBO 每个时间间隔和每个市场的报价点差:

fig = go.Figure(data=[
    go.Bar(name="before trade", x=bbo_bef.MarketParticipant.unique(), y=bbo_bef['QS']),
    go.Bar(name="0s after trade", x=bbo_aft_0.MarketParticipant.unique(), y=bbo_aft_0['QS']),
    go.Bar(name="1s after trade", x=bbo_aft_1.MarketParticipant.unique(), y=bbo_aft_1['QS']),
    go.Bar(name="10s after trade", x=bbo_aft_10.MarketParticipant.unique(), y=bbo_aft_10['QS']),
    go.Bar(name="60s after trade", x=bbo_aft_60.MarketParticipant.unique(), y=bbo_aft_60['QS'])])
fig.update_layout(barmode='group',title="BBO Quoted Spread Per Market/TimeFrame",
    xaxis={'title':'Market'},
    yaxis={'title':'Quoted Spread'})
%plotly fig

绘制每个时间间隔的 ES 和 BBO 的市场

使用以下代码绘制 BBO 每个时间间隔和市场的有效点差:

fig = go.Figure(data=[
    go.Bar(name="before trade", x=bbo_bef.MarketParticipant.unique(), y=bbo_bef['ES']),
    go.Bar(name="0s after trade", x=bbo_aft_0.MarketParticipant.unique(), y=bbo_aft_0['ES']),
    go.Bar(name="1s after trade", x=bbo_aft_1.MarketParticipant.unique(), y=bbo_aft_1['ES']),
    go.Bar(name="10s after trade", x=bbo_aft_10.MarketParticipant.unique(), y=bbo_aft_10['ES']),
    go.Bar(name="60s after trade", x=bbo_aft_60.MarketParticipant.unique(), y=bbo_aft_60['ES'])])
fig.update_layout(barmode='group',title="BBO Effective Spread Per Market/TimeFrame",
    xaxis={'title':'Market'}, 
    yaxis={'title':'Effective Spread'})
%plotly fig

绘制 BBO 每个时间间隔和市场的 EQF

使用以下代码绘制 BBO 每个时间间隔和市场的有效/报价点差:

fig = go.Figure(data=[
    go.Bar(name="before trade", x=bbo_bef.MarketParticipant.unique(), y=bbo_bef['EQF']),
    go.Bar(name="0s after trade", x=bbo_aft_0.MarketParticipant.unique(), y=bbo_aft_0['EQF']),
    go.Bar(name="1s after trade", x=bbo_aft_1.MarketParticipant.unique(), y=bbo_aft_1['EQF']),
    go.Bar(name="10s after trade", x=bbo_aft_10.MarketParticipant.unique(), y=bbo_aft_10['EQF']),
    go.Bar(name="60s after trade", x=bbo_aft_60.MarketParticipant.unique(), y=bbo_aft_60['EQF'])])
fig.update_layout(barmode='group',title="BBO Effective/Quoted Spread Per Market/TimeFrame",
    xaxis={'title':'Market'}, 
    yaxis={'title':'Effective/Quoted Spread'})
%plotly fig

Athena Spark 计算性能

当您运行代码块时,Athena Spark 会自动确定完成计算需要多少 DPU。在最后一个代码块中,我们调用 tca_analysis 函数中,我们实际上是在指示 Spark 处理数据,然后将生成的 Spark 数据帧转换为 Pandas 数据帧。这是分析中最密集的处理部分,当 Athena Spark 运行此块时,它会显示进度条、经过的时间以及当前有多少 DPU 正在处理数据。例如,在以下计算中,Athena Spark 使用 18 个 DPU。

配置 Athena Spark 笔记本时,您可以选择设置它可以使用的最大 DPU 数量。默认值为 20 个 DPU,但我们使用 10、20 和 40 个 DPU 测试了此计算,以演示 Athena Spark 如何自动扩展以运行我们的分析。我们观察到,Athena Spark 线性扩展,当笔记本配置最多 15 个 DPU 时,耗时 21 分 10 秒;当笔记本配置 8 个 DPU 时,耗时 23 分 20 秒;当笔记本配置最大 DPU 时,耗时 4 分 44 秒。配置40个DPU。由于 Athena Spark 根据 DPU 使用情况进行收费,以每秒为粒度,因此这些计算的成本类似,但如果您设置更高的最大 DPU 值,Athena Spark 可以更快地返回分析结果。有关 Athena Spark 定价的更多详细信息,请点击 此处.

结论

在这篇文章中,我们演示了如何使用来自 LSEG 的 Tick History-PCAP 的高保真 OPRA 数据,通过 Athena Spark 执行交易成本分析。及时提供 OPRA 数据,辅以适用于 Amazon S3 的 AWS Data Exchange 的可访问性创新,可以战略性地缩短那些希望为关键交易决策创建可行见解的公司的分析时间。 OPRA 每天生成约 7 TB 的标准化 Parquet 数据,管理基础设施以提供基于 OPRA 数据的分析具有挑战性。

Athena 在处理 Tick History – PCAP for OPRA 数据的大规模数据处理方面具有可扩展性,使其成为在 AWS 中寻求快速且可扩展的分析解决方案的组织的绝佳选择。这篇文章展示了 AWS 生态系统和 Tick History-PCAP 数据之间的无缝交互,以及金融机构如何利用这种协同作用来推动关键交易和投资策略的数据驱动决策。


作者简介

普拉莫德·纳亚克 是 LSEG 低延迟团队的产品管理总监。 Pramod 在金融科技行业拥有超过 10 年的经验,专注于软件开发、分析和数据管理。 Pramod 是一名前软件工程师,对市场数据和量化交易充满热情。

拉克希米·坎特·曼内姆 是 LSEG 低延迟组的产品经理。他专注于低延迟市场数据行业的数据和平台产品。 LakshmiKanth 帮助客户构建满足其市场数据需求的最佳解决方案。

维韦克·阿加瓦尔 是 LSEG 低延迟小组的高级数据工程师。 Vivek 致力于开发和维护数据管道,以处理和交付捕获的市场数据源和参考数据源。

阿尔克特·梅穆沙吉 是 AWS 金融服务市场开发团队的首席架构师。 Alket 负责技术战略,与合作伙伴和客户合作,将最苛刻的资本市场工作负载部署到 AWS 云。

时间戳记:

更多来自 AWS 大数据