使用配备 Apache Iceberg 的亚马逊 Athena 加速交易数据湖上的数据科学特征工程

Amazon Athena 是一项交互式查询服务,可以使用 SQL 或 Python 轻松分析 亚马逊简单存储服务 (Amazon S3) 中的数据以及位于 亚马逊云科技、本地或其他云系统中的数据源。雅典娜建立在开源 Trino 和 Presto 引擎以及 Apache Spark 框架之上,无需进行任何配置或配置。Athena 是无服务器的,因此无需管理基础架构,您只需为运行的查询付费。

Apache Iceberg 是一种用于超大型分析数据集的开放表格式。它将大型文件集合作为表进行管理,并支持现代分析数据湖操作,例如记录级插入、更新、删除和时空旅行查询。雅典娜支持对 Apache Iceberg 表进行读取、时空旅行、写入和 DDL 查询,这些表对数据使用 Apache Parquet 格式,对其元存储使用 AW S Glue 数据目录

特征工程 是一个识别和转换原始数据(图像、文本文件、视频等)、回填缺失的数据以及添加一个或多个有意义的数据元素以提供上下文以便机器学习 (ML) 模型可以从中学习的过程。各种用例都需要数据标签,包括预测、计算机视觉、自然语言处理和语音识别。

结合雅典娜的功能,Apache Iceberg 为数据科学家提供了简化的工作流程,无需复制或重新创建整个数据集即可创建新的数据功能。您可以在 Athena 上使用标准 SQL 创建功能,而无需使用任何其他服务进行功能工程。数据科学家可以减少准备和复制数据集所花费的时间,转而专注于数据特征工程、实验和大规模数据分析。

在这篇文章中,我们回顾了使用具有 Apache Iceberg 开放表格式的 Athena 的好处,以及它如何简化数据科学家的常见特征工程任务。我们将演示 Athena 如何转换 Apache Iceberg 格式的现有表,然后在不重新创建或复制数据集的情况下添加列、删除列和修改表中的数据,并使用这些功能在 Apache Iceberg 表上创建新功能。

解决方案概述

数据科学家通常习惯于处理大型数据集。数据集通常以 JSON、CSV、ORC 或 Apache Parq uet 格式或类似的读取优化 格式存储,以实现快速读取性能。数据科学家经常创建新的数据特征,并使用聚合和辅助数据回填此类数据特征。从历史上看,此任务是通过在表的顶部创建一个视图来完成的,该视图中包含 Apache Parquet 格式的基础数据,此类列和数据是在运行时添加的,或者创建包含其他列的新表。尽管此工作流程非常适合许多用例,但对于大型数据集却效率低下,因为数据需要在运行时生成,或者需要复制和转换数据集。

雅典娜引入了 ACID(原子性、一致性、隔离、耐久性)事务 功能,可添加基于 Apache Iceberg 表的插入、更新、删除、合并和时空旅行操作。 这些功能使数据科学家能够创建新的数据特征并将现有数据特征删除到现有数据集上,而不必担心复制或转换数据集或使用视图对其进行抽象。数据科学家可以专注于特征工程工作,避免复制和转换数据集。

Athena Iceberg UPDATE 操作在同一事务中将 Apache Iceberg 位置删除文件和新更新的行写入数据文件。您可以通过单个 UPDATE 语句对记录进行更正。

随着 Athena 引擎版本 3 的发布,Apache Iceberg 表的功能得到了增强,支持诸如 创建表即选择表 (CTAS) 和合并命令之类的操作,这些操作可简化 Iceberg 数据的生命周期管理。CTAS 可以快速、高效地使用其他格式(例如 Apache Paquet)创建表,并 将 MERG E INTO 条件更新、删除行或在 Iceberg 表中插入行。单个语句可以组合更新、删除和插入操作。

先决条件

使用雅典娜引擎版本 3 设置雅典娜工作组,以便在 Apache Iceberg 表中使用 CTAS 和 MERGE 命令。要将现有 Athena 引擎升级到 Athena 工作组中的版本 3,请按照 升级到 Athena 引擎版本 3 中的说明提高查询性能并访问更多分析功能 , 或参阅在 Athena 控制台 中 更改引擎版本

数据集

为了进行演示,我们使用了一个 Apache Parquet 表,该表包含过去几年中存储在 S3 存储桶中的数百万条随机分布的虚拟销售数据记录。 下载 数据集,将其解压缩到您的本地计算机,然后将其上传到您的 S3 存储桶。在这篇文章中,我们将数据集上传到 s3://sample-iceberg-datasets-xxxxxxxxxxx/sampledb/orders_and_customers/

下表显示了 customer_ orders 表格的布局。

Column Name Data Type Description
orderkey string Order number for the order
custkey string Customer identification number
orderstatus string Status of the order
totalprice string Total price of the order
orderdate string Date of the order
orderpriority string Priority of the order
clerk string Name of the clerk who processed the order
shippriority string Priority on the shipping
name string Customer name
address string Customer address
nationkey string Customer nation key
phone string Customer phone number
acctbal string Customer account balance
mktsegment string Customer market segment

进行功能工程

作为数据科学家,我们希望通过将计算得出的每位客户的一年总购买量和现有数据 集中每位客户的一年平均购买量相加来对客户订单数据进行 特征工程 。出于演示目的,我们使用 Athena 在 s ampledb 数据库中创建了 customer_ord ers 表,如以下 DDL 命令 所示。(您可以使用任何现有的数据集并按照本文中提到的步骤进行操作。) customer_order s 数据集以 Parquet 格式生成并存储在 S3 存储桶位置 s3://sample-iceberg-datasets-xxxxxxxxxxx/sampledb/orders_and_customers/ 中。这张桌子不是 Apache Iceberg 牌桌。

CREATE EXTERNAL TABLE sampledb.customer_orders(
  `orderkey` string, 
  `custkey` string, 
  `orderstatus` string, 
  `totalprice` string, 
  `orderdate` string, 
  `orderpriority` string, 
  `clerk` string, 
  `shippriority` string, 
  `name` string, 
  `address` string, 
  `nationkey` string, 
  `phone` string, 
  `acctbal` string, 
  `mktsegment` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://sample-iceberg-datasets-xxxxxxxxxxx/sampledb/orders_and_customers/'
TBLPROPERTIES (
  'classification'='parquet');

通过运行查询来验证表中的数据:

SELECT * 
from sampledb.customer_orders 
limit 10;

我们希望在此表格中添加新功能,以更深入地了解客户销售情况,从而加快模型训练速度并获得更多有价值的见解。要向数据集添加新功能,请将 c ustomer_ orders 雅典娜表转换为 雅典娜上的 Apache Iceberg 表。 发出 CTAS 查询语句, 从 customer_orders 表中创建一个 Apache Iceberg 格式的新表。 在这样做的同时,还添加了一项新功能,用于获取每位客户在过去一年(数据集的最大年份)中的总购买金额。

在以下 CTAS 查询中,添加了一个名为 one_year_sales_agg regate 的新列,其默认值为 精度数据类型的 0.0 , 并将 table_type 设置为 ICE BERG:

CREATE TABLE  sampledb.customers_orders_aggregate
WITH (table_type = 'ICEBERG',
   format = 'PARQUET', 
   location = 's3://sample-iceberg-datasets-xxxxxxxxxxxx/sampledb/customer_orders_aggregate', 
   is_external = false
   ) 
AS 
SELECT 
orderkey,
custkey,
orderstatus,
totalprice,
orderdate, 
orderpriority, 
clerk, 
shippriority, 
name, 
address, 
nationkey, 
phone, 
acctbal, 
mktsegment,
0.0 as one_year_sales_aggregate
from sampledb.customer_orders;

发出以下查询以验证 Apache Iceberg 表中的数据,其中新列 one_year_sales_ aggregate 值为 0.0:

SELECT custkey, totalprice, one_year_sales_aggregate 
from sampledb.customers_orders_aggregate 
limit 10;

我们想 在数据集中填充新功能 one_year_sales_agg regate 的值,以根据每位客户在过去一年(数据集的最大年份)的购买量来获得他们的总购买金额。 使用 Athena 向 Apache Iceberg 表发出 MERGE 查询语句,填充 one_year_sales_aggregate 功能的值:

MERGE INTO sampledb.customers_orders_aggregate coa USING 
    (select custkey, 
            date_format(CAST(orderdate as date), '%Y ') as    orderdate, 
            sum(CAST(totalprice as double)) as one_year_sales_aggregate
    FROM sampledb.customers_orders_aggregate o
    where date_format(CAST(o.orderdate as date), '%Y ') = (select date_format(max(CAST(orderdate as date)), '%Y ') from sampledb.customers_orders_aggregate)
    group by custkey, date_format(CAST(orderdate as date), '%Y ')) sales_one_year_agg
    ON (coa.custkey = sales_one_year_agg.custkey)
    WHEN MATCHED
        THEN UPDATE SET one_year_sales_aggregate = sales_one_year_agg.one_year_sales_aggregate;

发出以下查询以验证过去一年中每位客户总支出的更新值:

SELECT custkey, totalprice, one_year_sales_aggregate
from sampledb.customers_orders_aggregate limit 10;

我们决定在现有的 Apache Iceberg 表中添加另一项功能,以计算和存储每位客户在过去一年的平均购买金额。发出 ALTER 查询语句,为功能 one_year_sales_ average 的现有表中添加新列:

ALTER TABLE sampledb.customers_orders_aggregate
ADD COLUMNS (one_year_sales_average double);

在填充此新功能的值之前,您可以将 one_year_sales_ average 功能的默认值设置为 0.0。 在雅典娜上使用相同的 Apache Iceberg 表,发出更新查询语句,将新功能的值填充为 0.0:

UPDATE sampledb.customers_orders_aggregate
SET one_year_sales_average = 0.0;

发出以下查询,验证过去一年中每位客户平均支出的更新值是否设置为 0.0:

SELECT custkey, orderdate, totalprice, one_year_sales_aggregate, one_year_sales_average 
from sampledb.customers_orders_aggregate 
limit 10;

现在,我们要 在数据集中填充新功能 one_year_sales_ average 的值,以根据每位客户在过去一年(数据集的最大年份)的购买量获得平均购买金额。 使用雅典娜引擎向雅典娜上的现有 Apache Iceberg 表发出 MERGE 查询语句,以填充 one_year_sales_average 功能的值:

MERGE INTO sampledb.customers_orders_aggregate coa USING 
    (select custkey, 
            date_format(CAST(orderdate as date), '%Y') as orderdate, 
            avg(CAST(totalprice as double)) as one_year_sales_average
    FROM sampledb.customers_orders_aggregate o
    where date_format(CAST(o.orderdate as date), '%Y') = (select date_format(max(CAST(orderdate as date)), '%Y') from sampledb.customers_orders_aggregate)
    group by custkey, date_format(CAST(orderdate as date), '%Y')) sales_one_year_avg
    ON (coa.custkey = sales_one_year_avg.custkey)
    WHEN MATCHED
        THEN UPDATE SET one_year_sales_average = sales_one_year_avg.one_year_sales_average;

发出以下查询以验证每位客户平均支出的最新值:

SELECT custkey, orderdate, totalprice, one_year_sales_aggregate, one_year_sales_average 
from sampledb.customers_orders_aggregate 
limit 10;

向数据集添加其他数据特征后,数据科学家通常会开始训练 ML 模型并使用 Amazon Sagemaker 或等效工具集进行推断。

结论

在这篇文章中,我们演示了如何使用雅典娜和 Apache Iceberg 进行功能工程。我们还演示了使用 CTAS 查询从 Apache Parquet 格式的现有数据集在雅典娜上创建 Apache Iceberg 表,使用 ALTER 查询在雅典娜的现有 Apache Iceberg 表中添加新功能,以及使用 UPDATE 和 MERGE 查询语句更新现有列的特征值。

我们鼓励您使用 CTAS 查询来快速高效地创建表,并使用 MERGE 查询语句一步同步表,以便在使用 Athena 和 Apache Iceberg 转换功能时简化数据准备和更新任务。如果您有意见或反馈,请将其留在评论部分。


作者简介

Vivek Gautam 是一名数据架构师,专门研究 亚马逊云科技 专业服务的数据湖。他与企业客户合作,在 亚马逊云科技 上构建数据产品、分析平台和解决方案。在不构建和设计现代数据平台时,Vivek 是一位美食爱好者,他也喜欢探索新的旅行目的地和徒步旅行。

Mikhail Vaynshteyn 是亚马逊网络 服务的解决方案架构师。Mikhail 与医疗保健和生命科学客户合作,构建有助于改善患者疗效的解决方案。米哈伊尔专门研究数据分析服务。

Naresh Gautam 是 亚马逊云科技 的数据分析和 AI/ML 负责人,拥有 20 年的经验,他喜欢帮助客户架构高度可用、高性能和具有成本效益的数据分析和 AI/ML 解决方案,为客户提供数据驱动的决策。在空闲时间,他喜欢冥想和烹饪。

Harsha Tadiparthi 是 亚马逊云科技 专业首席解决方案架构师、分析师。他喜欢在数据库和分析中解决复杂的客户问题,并取得成功的结果。工作之余,他喜欢与家人共度时光,看电影,并尽可能地旅行。


*前述特定亚马逊云科技生成式人工智能相关的服务仅在亚马逊云科技海外区域可用,亚马逊云科技中国仅为帮助您发展海外业务和/或了解行业前沿技术选择推荐该服务。