提高在亚马逊 S3 数据湖上构建的 Apache Iceberg 表的运营效率

Apache Iceberg 是一种开放表格式,适用于 亚马逊 Simple Storage Serv ice (Amazon S3) 中的大型数据集,可对大型表、原子提交、并发写入和兼容 SQL 的表演变提供快速查询性能。当您使用 Apache Iceberg 构建交易数据湖以解决功能用例时,您需要重点关注 S3 数据湖的运营用例,以优化生产环境。组织关注的 S3 数据湖的一些重要非功能用例包括存储成本优化、灾难恢复和业务连续性功能、对数据湖的跨账户和多区域访问以及处理提高的 Amazon S3 请求速率。

在这篇文章中,我们将向您展示如何提高基于 亚马逊 S3 数据湖 和 Amaz on EMR 大数据 平台构建的 Apache Iceberg 表的运营效率。

优化数据湖存储

在 Amazon S3 上构建现代数据湖的主要优势之一是,它可以在不影响性能的情况下降低成本。您可以将 Amazon S3 生命周期配置和 Amazon S3 对象标记与 Apache Iceberg 表一起使用,以优化整体数据湖存储的成本。Amazon S3 生命周期配置是一组规则,用于定义 Amazon S3 对一组对象应用的操作。有两种类型的操作:

  • 过渡操作 -这些操作定义对象何时过渡到其他存储类别;例如,亚马逊 S3 标准存储到 Amazon S3 Glacier。
  • 过期操作 -这些操作定义对象何时过期。Amazon S3 代表您删除过期的对象。

Amazon S3 使用对象标签对存储进行分类,其中每个标签都是一个键值对。从 Apache Iceberg 的角度来看,它支持自定义 Amazon S3 对象 标签 ,这些标签 可以在向表中写入和删除时添加到 S3 对象。Iceberg 还允许您在存储桶级别配置基于标签的对象生命周期策略,以将对象转移到不同的 Amazon S3 层。使用 Iceberg 中的 s3.delete.tags 配置属性,对象在删除之前会使用配置的键值对进行标记。如果将目录属性 s3.delete-enab led 设置 为 false ,则不会从 Amazon S3 中硬删除对象。预计这将与 Amazon S3 删除标签结合使用,因此使用 Ama zon S3 生命周期策略 标记和删除对象。默认情况下,此属性设置为 t rue

这篇文章中的示例笔记本显示了 Apache Iceberg 表的 S3 对象标记和生命周期规则的实现示例,以优化存储成本。

实现业务连续性

Amazon S3 让任何开发人员都能访问与亚马逊运行自己的全球网站网络相同的高度可扩展、可靠、快速、廉价的数据存储基础设施。Amazon S3 的设计耐久性为 99.999999999%(11 9),S3 标准版的可用性设计为 99.99%,标准版 — IA 的可用性设计为 99.9%。不过,为了使您的数据湖工作负载在不太可能出现的中断情况下保持高度可用,您可以将 S3 数据复制到另一个 亚马逊云科技 区域作为备份。由于 S3 数据位于多个区域,您可以使用 S3 多区域接入点作为解决方案来访问来自备份区域的数据。借 助 Amazon S3 多区域接入点故障转移控制 ,您可以通过单个全球终端节点路由所有 S3 数据请求流量,并随时直接控制 S3 数据请求流量在区域之间的转移。在计划内或计划外的区域流量中断期间,故障转移控制可让您在几分钟内控制不同区域和账户中的存储桶之间的故障转移。Apache Iceberg 通过指定存储桶 到 接入点 的映射来支持接入点执行 S3 操作。我们将在本文后面介绍使用 Apache Iceberg 实现 S3 接入点的示例。

提高亚马逊 S3 的性能和吞吐量

Amazon S3 支持的请求速率为存储桶中每个前缀每秒 3,500 个 PUT/COPY/POST/DELETE 请求或每秒 5,500 个 GET/HEAD 请求。创建前缀时,此请求速率的资源不会自动分配。取而代之的是,随着前缀请求速率的逐渐提高,Amazon S3 会自动扩展以处理增加的请求速率。对于某些需要突然提高前缀中对象的请求速率的工作负载,Amazon S3 可能会返回 503 个 Slow Down 错误,也称为 S3 限制 。它在执行此操作的同时在后台进行扩展以处理增加的请求速率。此外,如果超过了支持的请求速率,则最佳做法是跨多个 前缀 分配对象和请求。实现此解决方案以跨多个前缀分配对象和请求涉及对数据入口或数据输出应用程序的更改。在您的 S3 数据湖中使用 Apache Iceberg 文件格式可以通过启用 Ob jectStoreLocationProvider 功能来显著减少工程工作量,该 功能会在您指定的 S3 对象 路径中添加 S3 哈希 [0*7FFFFF] 前缀。

默认情况下,Iceberg 使用 Hive 存储布局,但你可以将其切换为使用 ObjectStoreLocationProvider 。 默认情况下,此选项未启用,因此可以灵活选择要添加哈希前缀的位置。 使用 ObjectStoreLocationProvid er ,将为每个存储的文件生成确定性哈希值,并在使用参数 write.data.path(写入.object-storage-storage-pat h 适用于 Iceberg 0.12 及以下版本的存储路径)指定的 S3 文件夹后面添加一个子文件夹。 这可确保写入 Amazon S3 的文件均等地分布在 S3 存储桶中的多个前缀中,从而最大限度地减少限制错误。在以下示例中,我们将 write.data.path 值设置为 s3://my-table-data-bucket ,Iceberg 生成的 S3 哈希前缀将附加在此位置之后:

CREATE TABLE my_catalog.my_ns.my_table
( id bigint,
data string,
category string)
USING iceberg OPTIONS
( 'write.object-storage.enabled'=true,
'write.data.path'='s3://my-table-data-bucket')
PARTITIONED BY (category);

您的 S3 文件将按照 MURMUR3 S3 哈希前缀排列,如下所示:

2021-11-01 05:39:24 809.4 KiB 7ffbc860/my_ns/my_table/00328-1642-5ce681a7-dfe3-4751-ab10-37d7e58de08a-00015.parquet
2021-11-01 06:00:10 6.1 MiB 7ffc1730/my_ns/my_table/00460-2631-983d19bf-6c1b-452c-8195-47e450dfad9d-00001.parquet
2021-11-01 04:33:24 6.1 MiB 7ffeeb4e/my_ns/my_table/00156-781-9dbe3f08-0a1d-4733-bd90-9839a7ceda00-00002.parquet

使用 Iceberg ObjectStoreLocation Provider 并不是避免 S3 503 错误的万无一失的机制。您仍然需要设置适当的 EMRFS 重试次数以提供额外的弹性。您可以通过增加默认指数退避重试策略的最大重试限制或启用和配置加法增加/乘法减少 (AIMD) 重试策略来调整重试策略。亚马逊 EMR 版本 6.4.0 及更高版本支持 AIMD。有关更多信息,请参阅使用 EM RFS 重试 Amazon S3 请求

在以下部分中,我们将提供这些用例的示例。

存储成本优化

在此示例中,我们使用了 Iceberg 的 S3 标签功能, 其中写入标签 write-tag-name=created,删除标签为 delete-tag-name=del eted。 此示例在 EMR 版本 emr-6.10.0 集群上演示,该集群安装了应用程序 Hadoop 3.3.3、Jupyter Enterprise Gateway 2.6.0 和 Spark 3.3.1。这些示例在连接到 EMR 集群的 Jupyter 笔记本电脑环境上运行。要了解有关如何使用 Iceberg 创建 EMR 集群和使用亚马逊 EMR Studio 的更多信息,请分别参阅将 Iceberg 集群与 Spark 一起 使用 亚马逊 EMR Studio 管理指南

以下示例也可以在 aws-samples GitHub 存储库的示例笔记本中找到,以便快速 进行实验。

在 Spark 会话上配置 Iceberg

使用 %%configure 魔法命令配置 你的 Spark 会话。您可以使用 亚马逊云科技 Glue 数据目录 (推荐)或 Hive 目录存储 Iceberg 表。在此示例中,我们使用 Hive 目录,但我们可以使用以下配置更改为数据目录:

spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog

在运行此步骤之前,使用命名惯例 / iceberg/ 在您的 亚马逊云科技 账户中创建 S3 存储桶和冰山文件夹。

使用您为测试此示例而创建的存储桶更新以下配置 中的iceberg-storage-blog 。请注意配置参数 s3.write.tags.write-tag-name 和 s3.delete.tags.delete-tag-name ,它们将使用相应的标签值 标记新的 S3 对象 和已删除的对象。 我们将在后续步骤中使用这些标签来实施 S3 生命周期策略,将对象过渡到成本较低的存储层或根据用例将其过期。

%%configure -f { "conf":{ "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.sql.catalog.dev":"org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.dev.catalog-impl":"org.apache.iceberg.hive.HiveCatalog", "spark.sql.catalog.dev.io-impl":"org.apache.iceberg.aws.s3.S3FileIO", "spark.sql.catalog.dev.warehouse":"s3://<your-iceberg-storage-blog>/iceberg/", "spark.sql.catalog.dev.s3.write.tags.write-tag-name":"created", "spark.sql.catalog.dev.s3.delete.tags.delete-tag-name":"deleted", "spark.sql.catalog.dev.s3.delete-enabled":"false" } }

使用 Spark-SQL 创建 Apache Iceberg 表

现在,我们为 亚马逊商品评论数据集 创建了一个 Iceberg 表:

spark.sql(""" DROP TABLE if exists dev.db.amazon_reviews_iceberg""")
spark.sql(""" CREATE TABLE dev.db.amazon_reviews_iceberg (
marketplace string,
customer_id string,
review_id string,
product_id string,
product_parent string,
product_title string,
star_rating int,
helpful_votes int,
total_votes int,
vine string,
verified_purchase string,
review_headline string,
review_body string,
review_date date,
year int)
USING iceberg
location 's3://<your-iceberg-storage-blog>/iceberg/db/amazon_reviews_iceberg'
PARTITIONED BY (years(review_date))""")

在下一步中,我们使用 Spark 操作在表中加载数据集。

将数据加载到 Iceberg 表中

在插入数据时,我们根据表定义按 review_dat e 对数据进行分区。在你的 PySpark 笔记本中运行以下 Spark 命令:

df = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Electronics/*.parquet")

df.sortWithinPartitions("review_date").writeTo("dev.db.amazon_reviews_iceberg").append()

在同一 Iceberg 表中插入一条记录,这样它就会使用当前 rev iew_date 创建一个分区:

spark.sql("""insert into dev.db.amazon_reviews_iceberg values ("US", "99999999","R2RX7KLOQQ5VBG","B00000JBAT","738692522","Diamond Rio Digital",3,0,0,"N","N","Why just 30 minutes?","RIO is really great",date("2023-04-06"),2023)""")

您可以通过查询 Iceberg 快照来检查在此追加操作之后是否创建了新快照:

spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.snapshots""").show()

您将看到类似于以下内容的输出,其中显示了对表执行的操作。

检查 S3 标签的填充情况

您可以使用 亚马逊云科技 命令行接口 (亚马逊云科技 CLI) 或 亚马逊云科技 管理控制台 来检查为新写入操作填充的标签。让我们检查一下与通过单行插入创建的对象相对应的标签。

在亚马逊 S3 控制台上,查看 S3 文件夹 s3://your-iceberg-storage-blog/iceberg/db/amazon_reviews_iceberg/data/ 并指向 review_date _year=2023/ 分区。然后检查该文件夹下的 Parquet 文件,以检查与 Parquet 格式的数据文件相关的标签。

在 亚马逊云科技 CLI 中运行以下命令,查看该标签是基于 Spark 配置创建的 spar k.sql.catalog.dev.s3.write.tags.write-tag-nam e”: “created”:

xxxx@3c22fb1238d8 ~ % aws s3api get-object-tagging --bucket your-iceberg-storage-blog --key iceberg/db/amazon_reviews_iceberg/data/review_date_year=2023/00000-43-2fb892e3-0a3f-4821-a356-83204a69fa74-00001.parquet

您将看到一个类似于以下内容的输出,其中显示了该文件的关联标签

{ "VersionId": "null", "TagSet": [{ "Key": "write-tag-name", "Value": "created" } ] }

删除记录并使快照过期

在此步骤中,我们从 Iceberg 表中删除一条记录,并使与已删除记录对应的快照过期。我们删除使用当前 review_d ate 插入的新单条记录:

spark.sql("""delete from dev.db.amazon_reviews_iceberg where review_date = '2023-04-06'""")

现在,我们可以检查是否创建了新的快照,并将该操作标记为 删除

spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.snapshots""").show()

如果我们想在将来进行时空旅行并检查已删除的行,这很有用。在这种情况下,我们必须使用与删除的行 对应的 快照 ID 来查询表。但是,我们不在本文中讨论时空旅行。

我们将表中的旧快照过期,只保留最后两个快照。您可以根据您的特定要求修改查询以保留快照:

spark.sql ("""CALL dev.system.expire_snapshots(table => 'dev.db.amazon_reviews_iceberg', older_than => DATE '2024-01-01', retain_last => 2)""")

如果我们对快照运行相同的查询,我们可以看到只有两个快照可用:

spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.snapshots""").show()

在 亚马逊云科技 CLI 中,你可以运行以下命令来查看标签是基于 Spark 配置 spar k.sql.catalog.dev.s3 创建的。delete.tags.del ete-tag-name”: “已删除”:

xxxxxx@3c22fb1238d8 ~ % aws s3api get-object-tagging --bucket avijit-iceberg-storage-blog --key iceberg/db/amazon_reviews_iceberg/data/review_date_year=2023/00000-43-2fb892e3-0a3f-4821-a356-83204a69fa74-00001.parquet

您将看到类似于以下内容的输出,其中显示了该文件的关联标签

{ "VersionId": "null", "TagSet": [ { "Key": "delete-tag-name", "Value": "deleted" }, { "Key": "write-tag-name", "Value": "created" } ] }

快照到期后,您可以从元数据日志条目元表中查看现有的元数据文件:

spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.metadata_log_entries""").show()

已过期的快照将最新的快照 ID 显示为

创建 S3 生命周期规则以将存储分区过渡到不同的存储层

为存储分区创建生命周期配置,将带有 delete-tag-name=deleted S3 标签的对象转移到 Glacier 即时检索类。Amazon S3 每天午夜世界协调时间 (UTC) 运行一次生命周期规则,新的生命周期规则可能需要长达 48 小时才能完成首次运行。Amazon S3 Glacier 非常适合存档需要立即访问的数据(以毫秒为单位进行检索)。使用 S3 Glacier 即时检索,每季度访问一次数据时,与使用 S3 标准不频繁访问(S3 标准-IA)存储类相比,您可以节省高达 68% 的存储成本。

当您想要访问数据时,可以 批量还原存档的对象 。在 S3 标准类中恢复对象后,您可以将元数据和数据注册为存档表以供查询。如前所述,可以从元数据日志条目元表中提取元数据文件位置。如前所述,值为空的最新快照 ID 表示快照已过期。我们可以拍摄一张过期的快照并进行批量恢复:

spark.sql("""CALL dev.system.register_table(table => 'db.amazon_reviews_iceberg_archive', metadata_file => 's3://avijit-iceberg-storage-blog/iceberg/db/amazon_reviews_iceberg/metadata/00000-a010f15c-7ac8-4cd1-b1bc-bba99fa7acfc.metadata.json')""").show()

灾难恢复和业务连续性、跨账户和多区域访问数据湖的能力

由于 Iceberg 不支持相对路径,因此您可以通过指定存储桶 到 接入点的映射来使用接入点 执行 Amazon S3 操作。这对于多区域访问、跨区域访问、灾难恢复等很有用。

对于跨区域接入点,我们还需要将 启用 use-arn-region 的 目录属性设置为 t rue,以使 S3FileIO 能够进行跨区域 调用。 如果将 Amazon S3 资源 ARN 作为 Amazon S3 操作的目标传入,而该操作的区域与配置客户端的区域不同,则必须将此标志设置为 “ true ” 以允许客户端对 ARN 中指定的区域进行跨区域调用,否则将引发异常。 但是,对于相同或多区域接入点,应将 use-arn-region-enabled 标志设置 为 “false”。

例如,要在 Spark 3.3 中使用具有多区域访问权限的 S3 接入点,您可以使用以下代码启动 Spark SQL 外壳:

spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket2/my/key/prefix \
--conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
--conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.my_catalog.s3.use-arn-region-enabled=false \
--conf spark.sql.catalog.test.s3.access-points.my-bucket1=arn:aws:s3::123456789012:accesspoint:mfzwi23gnjvgw.mrap \
--conf spark.sql.catalog.test.s3.access-points.my-bucket2=arn:aws:s3::123456789012:accesspoint:mfzwi23gnjvgw.mrap

在此示例中,亚马逊 S3 中的 my-b ucket1 和 my-buck et 2 存储 桶上的对象使用 arn: aws: s3:: 123456789012:accesspoint : mf zwi23gn jvgw.mrap 接入点进行所有亚马逊 S3 操作。

有关使用接入点的更多详细信息,请参阅将接 入点与兼容的 Amazon S3 操作结合 使用

假设你的表路径在 mybucket1 下 ,因此区域 1 中的 mybucket1 和区域 中的 mybucket 2 在元数据文件中都有 mybucket 1 的路径。 在调用 S3(GET/PUT)时,我们将 mybucket1 引用替换为多区域接入点。

处理提高的 S3 请求速率

使用 ObjectStoreLocationProvid er (有关更多详细信息,请参阅 对象存储文件布局 )时,会为每个存储的文件生成确定性哈希,并将哈希值直接附加在写入.data.path 之后。 问题在于,默认哈希算法生成的哈希值不超过整数 MAX_VAL UE ,在 Java 中为 (2^31) -1。当将其转换为十六进制时,它会生成 0x7FFFFFF,因此第一个字符差异仅限于 [0-8]。根据Amazon S3 的建议 ,我们应该在此处设定最大方差以缓解这种情况。

Amazon EMR 6.10 开始 ,Amazon EMR 添加了一个优化的位置提供商,使用来自 [0-9] [A-Z] [a-z] 的字符集,确保生成的前缀哈希在前两个字符中具有均匀分布。

该位置提供商最近由亚马逊 EMR 通过 C ore 开源:提高对象存储布局 中的位密度 ,应该从 Iceberg 1.3.0 开始上线。

要使用,请确保 iceberg.enabled 分类设置为真,并将 wr ite.location-provider.im pl 设置为 org.apache.iceberg.em r .opt imizeds3LocationProvider。

以下是 Spark shell 命令的示例:

spark-shell --conf spark.driver.memory=4g \
--conf spark.executor.cores=4 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket/iceberg-V516168123 \
--conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
--conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.my_catalog.table-override.write.location-provider.impl=org.apache.iceberg.emr.OptimizedS3LocationProvider

以下示例显示,当您在 Iceberg 表中启用对象存储时,它会在 S3 路径中直接在您在 DDL 中提供的位置之后添加哈希前缀。

定义表 write.object-storage.enable d 参数并提供 S3 路径,之后您要使用 write.data.path(适用于 Iceberg 版本 0.13 及更高版本)或 write.object-storage.path (适用于 Iceberg 版本 0.12 及以下)参数添加哈希前缀。

将数据插入到您创建的表中。

哈希前缀是在 DDL 中定义的 S3 路径中的 /current/ 前缀之后添加的。

清理

完成测试后,清理资源以避免任何经常性费用:

  1. 删除您为此测试创建的 S3 存储桶。
  2. 删除 EMR 集群。
  3. 停止并删除 EMR 笔记本实例。

结论

随着各公司继续在S3数据湖上的超大型数据集上使用Apache Iceberg开放表格式构建更新的交易数据湖用例,人们将越来越关注优化这些 PB 级生产环境以降低成本、提高效率和实现高可用性。这篇文章演示了提高在 亚马逊云科技 上运行的 Apache Iceberg 开放表格式的运营效率的机制。

要了解有关 Apache Iceberg 的更多信息并为您的交易数据湖用例实现这种开放表格式,请参阅以下资源:

  • Apache Iceberg 表格规格
  • 亚马逊 EMR 上的 Apache Iceberg 支持
  • 使用安装了 Iceberg 的集群

作者简介

Avijit Goswami 是 亚马逊云科技 的首席解决方案架构师,专门研究数据和分析。他支持 亚马逊云科技 战略客户使用 亚马逊云科技 托管服务和开源解决方案在 亚马逊云科技 上构建高性能、安全和可扩展的数据湖解决方案。工作之余,Avijit 喜欢旅行、在旧金山湾区步道上徒步旅行、观看体育赛事和听音乐。

Rajarshi Sarkar 是亚马逊 EMR/Athena 的软件开发工程师。他致力于开发亚马逊 EMR/Athena 的前沿功能,还参与了 Apache Iceberg 和 Trino 等开源项目。在业余时间,他喜欢旅行、看电影和和朋友一起出去玩。

普拉尚特·辛格 是 亚马逊云科技 的软件开发工程师。他对数据库和数据仓库引擎感兴趣,并曾在 EMR 上优化 Apache Spark 性能。他是 Apache Spark 和 Apache Iceberg 等开源项目的积极贡献者。在空闲时间,他喜欢探索新地方、食物和徒步旅行。


*前述特定亚马逊云科技生成式人工智能相关的服务仅在亚马逊云科技海外区域可用,亚马逊云科技中国仅为帮助您发展海外业务和/或了解行业前沿技术选择推荐该服务。