发布于: Oct 10, 2022
import sysimport datetimefrom awsglue.utils import getResolvedOptionsfrom pyspark.context import SparkContextfrom awsglue.context import GlueContext ARG_TABLE_NAME = "table_name" ARG_READ_PERCENT = "read_percentage" ARG_OUTPUT = "output_prefix" ARG_FORMAT = "output_format" PARTITION = "snapshot_timestamp" args = getResolvedOptions(sys.argv,['JOB_NAME', ARG_TABLE_NAME, ARG_READ_PERCENT, ARG_OUTPUT, ARG_FORMAT]) table_name = args[ARG_TABLE_NAME] read = args[ARG_READ_PERCENT] output_prefix = args[ARG_OUTPUT] fmt = args[ARG_FORMAT] print("Table name:", table_name)print("Read percentage:", read)print("Output prefix:", output_prefix)print("Format:", fmt) date_str = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M') output = "%s/%s=%s" % (output_prefix, PARTITION, date_str) sc = SparkContext() glueContext = GlueContext(sc) table = glueContext.create_dynamic_frame.from_options("dynamodb", connection_options={"dynamodb.input.tableName": table_name,"dynamodb.throughput.read.percent": read}) glueContext.write_dynamic_frame.from_options( frame=table, connection_type="s3", connection_options={"path": output},format=fmt, transformation_ctx="datasink")
脚本代码很简短。我们创建了 dynamodb 连接类型的 DynamicFrameReader,并传入表名和所需的最大读取吞吐量消耗。我们将该数据帧传递至 DynamicFrameWriter,后者将全表数据以指定的格式写入 S3。
Amazon 大多数团队的应用程序都包含多个 DynamoDB 表,我自己的团队也是如此。我们当前的应用程序使用了五个主表。理想情况下,在导出工作流程结束时,您可以在一致的表视图中编写简单明了的查询。但是,每个导出的表都按表导出时的时间戳进行分区。这使得跨一个或多个表进行查询非常麻烦,因为您必须向查询中的每个表引用添加 WHERE snapshot_timestamp = 条件 子句。此外,每个表可能会在任何给定日期具有不同的 snapshot_timestamp 值!
此导出工作流程的最后一步会创建一个 Athena 视图,该视图可为您添加 WHERE 子句。这意味着您可以与 DynamoDB 导出数据进行交互,就好像导出的 DynamoDB 表是一个正常视图一样。
我创建的 Amazon CloudFormation 栈分为两个堆栈。公共堆栈包含共享的基础设施,每个 Amazon Web Services 区域只需要创建一个。表堆栈的设计方式是,您可以在任何给定的 Amazon Web Services 区域中为每个表-格式组合创建一个表堆栈。它包含导出和转换 DynamoDB 表所需的 CloudWatch 事件逻辑和 Amazon Glue 组件。
公共堆栈包含大部分基础设施。其中包括 Step Functions 状态机和 Lambda 函数,用于触发和检查异步作业的状态。此外,它还包括导出堆栈使用的 IAM 角色以及用于存储导出的 S3 存储桶。
要创建公共堆栈,请执行以下操作:
1. 选择此启动堆栈
2. 选择我确认 Amazon CloudFormation 可能创建具有自定义名称的 IAM 资源。
3. 选择创建堆栈。
如果您没有要导出的 DynamoDB 表,请按照原博文进行操作。从“处理评论堆栈”部分分开始往后执行,直到将这两个项目添加到表中。或者,您可以随意将此 CloudFormation 堆栈指向您最喜欢的同时使用预置吞吐量的 DynamoDB 表。目前不支持使用按需吞吐量的表。
由于此架构的大部分都是可以共享的,因此表导出堆栈中没有多少内容。此堆栈定义了 CloudWatch 事件,用于触发 Step Functions 状态机并在 JSON 格式的事件消息中包含所有必要元数据。此外,它还包含用于导出表的 Amazon Glue ETL 作业和用于更新 Amazon Glue Data Catalog 中元数据的 Amazon Glue 爬网程序。
从技术上讲,您可以在公共堆栈中定义 Amazon Glue ETL 作业,因为它已经被参数化。但是,Amazon Glue 作业并发运行的默认限制为 3 项作业。这是一个软限制,但是使用这种架构,您可以在申请提高并发限制数之前最多导出 25 个表。
要创建表导出堆栈,请执行以下操作:
1. 选择此启动堆栈
2. 从列表中选择输出格式。Athena 原生支持所有可用格式。
3. 输入 DynamoDB 表名称。
4. 输入作业应从表的当前预置吞吐量中消耗的读取容量单位 (RCU) 的百分比。该百分比表示为 0.1(含)到 1.0(含)之间的浮点数。默认值为 0.25 (25%)。
例如:假设您的表的 RCU 设置为 100,并使用默认值 0.25 (25%),然后 Amazon Glue 作业在运行时消耗 25 个 RCU。
1. 选择创建。
为了演示相关工作原理,我们手动运行 DynamoDB 导出任务状态机,需要将 CloudWatch 事件的 JSON 参数传递给 Step Functions。
要获取事件 JSON 参数,请执行以下操作:
1. 在 Amazon Web Services 管理控制台中打开 CloudWatch。
2. 在事件下的左列中,选择规则。
3. 从列表中选择您的规则。它以 Amazon BigDataBlog- 为前缀。
4. 对于操作,选择编辑。
5. 从目标的配置输入部分复制事件的 JSON 参数。
6. 选择取消退出编辑模式。
要开始执行状态机,请执行以下步骤:
1. 在控制台中打开 Step Functions。
2. 选择 DynamoDBExportAndAthenaLoad 状态机。
3. 选择开始执行。
4. 将 JSON 格式的有效负载粘贴到输入
5. 选择开始执行。
可以采用以下几种方法追踪执行过程。在进入和退出执行步骤时,系统会向执行事件历史记录列表添加条目。如果需要调试,这是查看传递到每个步骤的状态(Lambda speak 中的事件)的绝佳方法。
您还可以展开可视化工作流程。这是一个非常棒的概要视图,可供了解工作流程的进展情况。
工作流程完成后,您会在 Amazon Glue Data Catalog 中的 dynamodb_exports 数据库下看到两个新表。DynamoDB 快照表名称以 snapshots_ 为前缀。该架构针对 Amazon Glue Data Catalog 进行了格式设置(小写和连字符转换为下划线)。您还有一个视图表,其表名称也应用于相同的中 Amazon Glue Data Catalog 格式设置,但没有 snapshots_ 前缀。
为了展示表最新快照单独视图表的用途,我将使用上一篇博文中的“评论”表。该表包含两个项目。我还会运行两次导出工作流程。正如您在预览表时所看到的,总共有四个项目。那是因为每个快照包含两个项目。
在这些项目中,最新的 snapshot_timestamp 是 2019-01-11T23:26。当我针对视图表评论运行相同的预览查询时,我们看到结果只有两个项目,这符合我们的预期。该视图负责指定 where snapshot_timestamp=… 子句,因此您无需指定。
在本文中,我向您演示了如何使用 Amazon Glue 的 DynamoDB 集成和 Amazon Step Functions 创建工作流程,将您的 DynamoDB 表数据已 Parquet 格式导出到 S3 中。我还演示了如何为每个表的最新快照创建 Athena 视图,从而为您导出的 DynamoDB 表提供一致的视图。
相关文章