发布于: Aug 23, 2022

本分步演示描述了流数据的写入工作的完成,如何使用 Hive 兼容文件夹结构,借助 Kinesis Data Firehose 将流数据写入 Amazon S3。 接着,它显示了 Amazon Web Services Glue 爬网程序如何推断架构和提取我们在 Kinesis Data Firehose 中指定的正确分区名称,并在 Amazon Web Services Glue 数据目录中对其进行编目。 最后,我们运行示例查询,确认能正确识别分区。

为了演示这一点,我们使用 python 代码生成示例数据。 我们还在 Kinesis Data Firehose 上使用 Lambda 转换来强制创建故障。这会演示数据如何被保存到错误输出位置。本分步演示所需的代码包含在 GitHub 中。

对于本分步演示内容,这是我们正在构建的架构:

创建一个 Kinesis Data Firehose 用于传输事件记录的 S3 存储桶。我们使用 Amazon Web Services 命令行界面 ( Amazon Web Services CLI)在美国东部(弗吉尼亚北部)区域创建 Amazon S3 存储桶。 请记得将示例中存储桶名称(以及您想使用的区域名字)替换为您自己的。

aws s3 mb s3://kdfs3customprefixesexample --region us-east-1

传入事件在事件负载中具有 ApproximateArrivalTimestamp 字段。 这足以在 Amazon S3 上创建适当的文件夹结构。 但是,在查询数据时,将此时间戳值公开为顶级列以便于过滤和验证可能更有利。 为此,我们创建了一个 Lambda 函数,它将 ApproximateArrivalTimestamp 添加为数据负载中的顶级字段。数据负载是 Kinesis Data Firehose 在 Amazon S3 中作为对象写入的内容。此外,Lambda 代码还会人为生成一些处理错误,这些错误会被传输到为传输目标指定的“ErrorOutputPrefix”位置,以说明“ErrorOutputPrefix”中表达式的使用。

首先,为名为 LambdaBasicRole 的 Lambda 函数创建一个角色。 TrustPolicyForLambda.json 文件包含在 GitHub 存储库中。

$ aws iam create-role --role-name KDFLambdaBasicRole 
--assume-role-policy-document file://TrustPolicyForLambda.json

创建角色后,将托管的 Lambda 基本执行策略附加到该角色。

$ aws iam attach-role-policy --policy-arn 
arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole 
--role-name KDFLambdaBasicRole

要创建 Lambda 函数,请从 Python Kinesis Data Firehose 蓝图“General Firehose Processing”开始,然后对其进行修改。有关记录结构和必须返回的内容的更多信息,请参阅 Amazon Kinesis Data Firehose 数据转换。

压缩 Python 文件,然后使用 Amazon Web Services CLI 创建 Lambda 函数。CreateLambdaFunctionS3CustomPrefixes.json 文件包含在 GitHub 存储库中。 

aws lambda create-function --zip-file "fileb://lambda_function.zip" 
--cli-input-json file://CreateLambdaFunctionS3CustomPrefixes.json

接下来,创建 Kinesis Data Firehose 传输流。 createdeliverystream.json 文件包含在 GitHub 存储库中。

aws firehose create-delivery-stream --cli-input-json 
file://createdeliverystream.json

在先前的配置中,我们在“ExtendedS3DestinationConfiguration”元素下定义了 Prefix 和 ErrorOutputPrefix。我们对“S3BackupConfiguration”元素进行了相同的定义。请注意,当“ProcessingConfiguration”元素设置为“禁用”时,“ExtendedS3DestinationConfiguration”元素的 ErrorOutputPrefix 参数仅出于一致性原因存在。否则没有意义。

我们选择了一个可使文件夹结构与 hive 风格的分区兼容前缀。这是我们使用的前缀:

“fhbase/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/”

Kinesis Data Firehose 首先在 Amazon S3 存储桶下直接创建一个名为“fhbase”的基础文件夹。其次,它使用 Java DateTimeFormatter 格式计算表达式 !{timestamp:YYYY}、!{timestamp:MM}、!{timestamp:dd} 和 !{timestamp:HH} 的年、月、日和小时结果。例如,采用 UNIX 纪元时间的 1549754078390 的 ApproximateArrivalTimestamp 为 2019-02-09T16:13:01.000000Z(采用 UTC),则其计算结果为“year=2019”、“month=02”、“day=09”和“hour=16”。 因此,Amazon S3 中传输数据记录的位置的计算结果为“fhbase/year=2019/month=02/day=09/hour=16/”。

同样,ErrorOutputPrefix“fherroroutputbase/!{firehose:random-string}/!{firehose:error-output-type}/!{timestamp:yyyy/MM/dd}/”会使用名为为“fherroroutputbase”的文件夹直接在 S3 存储桶下。表达式 !{firehose:random-string} 的计算结果为 11 个字符的随机字符串,如“ztWxkdg3Thg”。 如果在同一表达式中多次使用它,则每个实例都会评估为一个新的随机字符串。表达式 !{firehose:error-output-type} 的计算结果为以下之一:

  1.  “processing-failed”(对应 Lambda 转换传输失败)
  2.  “elasticsearch-failed”(对应 Amazon ES 目标传输失败)
  3. “splunk-failed”(对应 Splunk 目标交付失败)
  4. “format-conversion-failed”(对应数据格式转换失败)

因此,包含 Lambda 转换的传输失败记录的 Amazon S3 对象的位置的计算结果为:fherroroutputbase/ztWxkdg3Thg/processing-failed/2019/02/09/。

您可以运行

aws firehose describe-delivery-stream --delivery-stream-name KDFS3customPrefixesExample

来描述创建的传输流。

接下来,为传输流启用静态加密:

aws firehose start-delivery-stream-encryption --delivery-stream-name KDFS3customPrefixesExample

1.选择来源。对于此示例,我使用 Direct PUT。

2.选择是否要使用 Lambda 转换转换传入记录。选择启用,并选择了之前创建的 Lambda 函数的名称。

3.选择目标。选择 Amazon S3 目标。

4.选择 Amazon S3 存储桶。选择之前创建的 Amazon S3 存储桶。

5. 指定 Amazon S3 前缀和 Amazon S3 错误前缀。这对应于之前在 Amazon Web Services CLI 输入 JSON 的上下文中解释的“Prefix”和“ErrorOutputPrefix”。

6. 选择是否要将原始(转换前)记录备份到另一个 Amazon S3 位置。我选择了启用并指定了相同的存储桶(您可以选择不同的存储桶)。我还从转换后记录中指定了不同的前缀 – 基础文件夹不同,但下面的文件夹结构相同。这样可以更有效地使用 Amazon Web Services Glue 爬网程序对此位置进行爬网,或者在 Athena 或 Redshift Spectrum 中创建指向此位置的外部表。

7.指定 Amazon S3 目标的缓冲提示。我选择了 1 MB 和 240 秒。

8.选择 S3 压缩和加密设置。我没有为转换后记录的位置选择压缩。我选择使用服务托管的 Amazon Web Services KMS 客户主密钥 (CMK) 加密静态的 Amazon S3 位置。

9.选择是否要启用 Error Logging in Cloudwatch(Cloudwatch 中的错误记录)。我选择了启用。

10.指定您希望 Kinesis Data Firehose 承担的代表您访问资源的 IAM 角色。选择新建或选择以显示新屏幕。选择创建新的 IAM 角色,为该角色命名,然后选择允许。

11.选择创建传输流。

现已创建并激活传输流。您可以向其发送事件。

我使用 Python 代码生成示例数据。生成数据的结构如下:

{'sector': 'HEALTHCARE', 'price': 194.07, 'ticker_symbol': 'UFG', u'EventTime': '2019-02-12T07:10:52.649000Z', 'change': 20.56}{'sector': 'HEALTHCARE', 'price': 124.01, 'ticker_symbol': 'QXZ', u'EventTime': '2019-02-12T07:10:53.745000Z', 'change': 3.32}{'sector': 'MANUFACTURING', 'price': 26.95, 'ticker_symbol': 'QXZ', u'EventTime': '2019-02-12T07:10:54.864000Z', 'change': 24.53}

用于生成数据并将其推送到 Kinesis Data Firehose 的示例代码包含在 GitHub 存储库中。

在您开始向 Kinesis Data Firehose 传输流发送事件后,对象应该开始出现在 Amazon S3 中指定的前缀下。

我希望说明 Lambda 调用错误以及 Lambda 转换错误的 ErrorOutputPrefix 位置中的文件外观。因此,我没有授予“firehose_delivery_role”调用我的 Lambda 函数的权限。以下文件显示在 ErrorOutputPrefix 指定的位置。 

aws s3 ls s3://kdfs3customprefixesexample/fherroroutputbase/FxvO2Tf9MQP/processing-failed/2019/02/12/

2019-02-12 16:57:24     260166 
KDFS3customPrefixesExample-1-2019-02-12-16-53-20-5262db81-0f3a-48bf-8fc6-2249124923ff

这是我之前提到的错误文件内容的片段。

{"attemptsMade":4,"arrivalTimestamp":1549990400391,"errorCode":"Lambda.InvokeAccessDenied","errorMessage":"Access was denied.Ensure that the access policy allows access to the Lambda function.","attemptEndingTimestamp":1549990478018,"rawData":"eyJzZWN0b3IiOiAiSEVBTFRIQ0FSRSIsICJwcmljZSI6IDE4Ny45NCwgInRpY2tlcl9zeW1ib2wiOiAiVUZHIiwgIkV2ZW50VGltZSI6ICIyMDE5LTAyLTEyVDE2OjUzOjE5Ljk5MzAwMFoiLCAiY2hhbmdlIjogOS4yNn0=","lambdaArn":"arn:aws:lambda:us-east-1:<account-id>:function:KDFS3CustomPrefixesTransform:$LATEST"}

在我给予“firehose_delivery_role”相应的权限后,数据对象显示在为 Amazon S3 目标指定的“前缀”位置。

aws s3 ls s3://kdfs3customprefixesexample/fhbase/year=2019/month=02/day=12/hour=17/

2019-02-12 17:17:26    1392167 
KDFS3customPrefixesExample-1-2019-02-12-17-14-51-fc63e8f6-7421-491d-8417-c5002fca1722

2019-02-12 17:18:39    1391946 
KDFS3customPrefixesExample-1-2019-02-12-17-16-43-e080a18a-3e1e-45ad-8f1a-98c7887f5430

此外,由于我的 Lambda 转换中的 Lambda 代码将 10% 的记录设置为失败状态,因此以下内容显示在 Lambo 转换错误的 ErrorOutputPrefix 位置。

aws s3 ls s3://kdfs3customprefixesexample/fherroroutputbase/ztWxkdg3Thg/processing-failed/2019/02/12/

2019-02-12 17:25:54     180092 
KDFS3customPrefixesExample-1-2019-02-12-17-21-53-3bbfe7c0-f505-47d0-b880-797ce9035f73

以下是错误文件内容的片段:

{"attemptsMade":1,"arrivalTimestamp":1549992113419,"errorCode":"Lambda.ProcessingFailedStatus","errorMessage":"ProcessingFailed status set for record","attemptEndingTimestamp":1549992138424,"rawData":"eyJ0aWNrZXJfc3ltYm9sIjogIlFYWiIsICJzZWN0b3IiOiAiSEVBTFRIQ0FSRSIsICJwcmljZSI6IDE3LjUyLCAiY2hhbmdlIjogMTcuNTUsICJFdmVudFRpbWUiOiAiMjAxOS0wMi0xMlQxNzoyMTo1My4zOTY2NDdaIn0=","lambdaArn":"arn:aws:lambda:us-east-1:<account-id>:function:KDFS3CustomPrefixesTransform:$LATEST"}

您现在可以创建 Amazon Web Services Glue 爬网程序。有关使用 Amazon Web Services Glue 数据目录的更多信息,请参阅填充 Amazon Web Services Glue 数据目录。

1.在 Amazon Web Services Glue 控制台中,转到爬网程序,然后选择添加爬网程序。

2. 添加有关爬网程序的信息,然后选择下一步。

3.在“包含路径”中,指定您在 Amazon S3 目标下输入的 Amazon S3 存储桶名称。同时包括创建 Kinesis Data Firehose 传输流时使用的静态前缀。不包括自定义前缀表达式。

4.选择下一步。

5.选择下一步、否、下一步。

6.指定 Amazon Web Services Glue 将使用的 IAM 角色。我选择创建一个新的 IAM 角色。选择下一步。

7.指定运行爬网程序的日程。我选择按需运行。选择下一步。

8.指定爬网程序添加已爬网和已发现表的位置。我选择了默认数据库。选择下一步。

9.选择完成。

10.已创建爬网程序并准备运行。选择运行爬网程序。

11.在 Amazon Web Services Glue 控制台中,转到表。您可以看到,已使用基础文件夹的名称创建了一个表。选择 fhbase。

爬网程序已发现并填充了表及其属性。

您可以看到发现的架构。爬网程序已根据前缀表达式指定的文件夹结构标识并创建了分区。

打开 Amazon Athena 控制台,从下拉菜单中选择默认数据库。在 New query1 窗口中编写以下查询,然后选择 运行查询。 

SELECT * FROM “default”.”fhbase”

where year = ‘2019’ and day = ’12’ and hour = ’17’

order by approxarrtimestamputcfh desc 

请注意,Amazon Athena 将 fhbase 表识别为分区表。查询可以利用查询中的分区来筛选结果。

如本文所示,Amazon S3 对象的自定义前缀为自定义文件夹结构提供了很大的灵活性,其中 Kinesis Data Firehose 传输 Amazon S3 中的数据记录和失败记录。控制 Amazon S3 中的文件夹结构和命名可简化数据发现、编目和访问。因此,它有助于更方便地进行深入了解,并帮助您更好地管理查询成本。

相关文章