发布于: Mar 22, 2022

要将本地 数据迁移上云,要求我们在设计层面做出调整以获得最佳结果。在云端运行大数据管道时,运营成本优化无疑是最重要的目标之一。而成本的两大组成部分,分别为存储成本与计算成本。在传统的本地 Hadoop 仓库当中,各仓库会被耦合为存储节点,同时亦充当计算节点。这种耦合的缺点在于,指向存储层的一切变更(例如维护)都会给计算层造成影响。在 Amazon Web Services 等云环境中,我们可以专门指定 S3 作为存储方案、Amazon EMR 作为计算方案,借此轻松实现存储与计算资源的剥离。由于各集群仅在工作负载出现时临时运行,因此这将给集群维护工作带来巨大的灵活性优势。

为了进一步节约成本,我们还需要找到在计算层上实现资源利用率最大化的方法。这意味着我们需要将原本的整体平台转换为与多条不同管道对接的多个集群,其中各个集群都能够根据管道需求实现自动规模伸缩。

S3 上运行 Hadoop 数据仓库,我们需要留心以下注意事项。S3 并不属于 HDFS 这类文件系统,而且并不提供同样的即时一致性保证。大家可以将 S3 理解成一种最终一致性对象存储库,可以使用 REST API 对其执行访问。

S3 的一大核心差异,在于其重命名(rename)不属于原子操作。S3 上的所有重命名操作都会先执行复制操作,而后再执行删除操作。考虑到运行时间成本,我们最好不要在 S3 上直接执行重命名。为了高效使用 S3 服务,大家应注意删除所有重命名操作。重命名通常被用于 Hadoop 仓库的各个提交阶段当中,例如通过原子操作将临时目录移动至其最终目的地。最好的方法就是避免一切重命名操作,转而选择一次性数据写入。

SparkApache MapReduce 作业中都包含提交阶段,这些阶段负责将多个由分布式工作程序生成的输出文件提交至最终输出目录。受篇幅所限,本文无法具体解释输出 committers 的实际工作原理,但最重要的是,默认输出 committers 在设计上基于 HDFS 标准、因此会在其中涉及重命名操作。如前所述,重命名操作会给 S3 这类存储系统造成严重的性能损失,因此最简单的策略就是禁用推测执行并切换输出 committers 的算法版本。当然,大家也可以自行编写定制化 committers 程序,确保其中不涉及任何重命名操作。例如,从 Amazon EMR 5.19.0 版本开始,Amazon Web Services 正式提供自定义 OutputCommitter for Spark,其中针对 S3 进行了写入优化设计。

S3 的实际使用中,另一大挑战在于其具有最终一致性属性,这与 HDFS 的强一致性完全不同。S3 虽然能够为新对象的 PUTS 操作提供写后读取保证,但这还不足以始终支撑起分布式管道的一致性需求。大数据处理中经常出现的一类情况在于,我们的某一项作业会将文件列表输出至目录,而另一项作业则对该目录执行读取操作。要运行第二项作业,其必须列出目录内容以查找需要读取的所有文件。在 S3 中不存在这样的目录。我们仅列出具有相同前缀的文件,意味着在第一项作业完成之后,我们可能无法立即看到所有新文件。

为了解决这个问题,Amazon Web Services 提供 EMRFS,即添加在 S3 之上的一致性层,可以使 S3 拥有与一致性文件系统一致的运行表现。EMRFS 使用 Amazon DynamoDB,并在 S3 上保留关于各个文件的元数据。简单来说,在启用 EMRFS 之后,当我们列出某 S3 文件前缀时,实际 S3 响应结果将与 DynamoDB 上的元数据进行比较。如果存在不匹配,则 S3 驱动程序会延长轮询时间,以等待 S3 完成数据更新。

一般而言,EMRFS 能够保证良好的数据一致性。对于一部分数据管道,我们会选择使用不支持 EMRFS 的 PrestoDB,并使用 PrestoDB 对 S3 上的存储数据进行聚合。虽然这会给上游作业带来最终一致性不匹配的风险,但我们发现只要监控上游与下游数据间的差异,并在必要时重新运行上游作业即可解决这类问题。根据我们的经验,一致性问题的发生几率不高,但确实存在。如果选择不使用 EMRFS,则应在系统设计中考虑到相关影响。

另一个重要但却经常受到忽视的挑战,就是弄清楚该如何使用 Amazon EMR 的自动规模伸缩功能。为了获得最佳运营成本,我们需要尽可能减少处于闲置状态的服务器数量。

实现这一目标的方式看似简单:创建一套长期运行的 EMR 集群,并通过随时可用的自动规模伸缩功能根据参数(例如集群上的可用内存容量)控制集群的大小。但是,我们的某些批处理管道每小时只启动一次,每次运行 20 分钟,且带来巨大的计算量需求。由于处理速度非常重要,我们必须保证不浪费任何时间。对我们来说,最佳策略是在特定批处理管道启动之前,通过自定义脚本预先调整集群的大小。

另外,我们往往很难在单一集群之上运行多条数据管道,并在任意给定时间将其保持在最佳容量水平。这是因为不同的管道在负载需求方面总是有所区别。相反,我们选择在多个独立的 EMR 集群上对应运行各主要管道。这种作法有很多优势,缺点则只有小小的一个。优点在于,各个集群都能够根据速度需求精确调整大小,运行最适合其实际需求的软件版本,并在不影响其他管道的前提下接受管理。而小缺点是,多集群加多管道的组合会额外增加命名节点与任务节点,导致少量资源浪费。

在制定自动规模伸缩策略时,我们首先尝试在每次需要运行管道时创建并删除集群。但我们很快发现,从零开始引导集群所耗费的时间往往远超我们的预期。相反,我们应当让这些集群始终保持运行状态,并在管道启动前添加任务节点的方式扩展集群规模,并在管道结束时立即删除相应任务节点。我们发现,只需要添加更多任务节点,我们就能够快速开始运行管道。如果在集群的长期运行中出现其他问题,我们可以快速回收并从零开始创建新的集群。在解决这些问题的过程中,我们一直与 Amazon Web Services 保持密切合作。

我们的自定义自动规模伸缩脚本是一套简单的 Python 脚本,通常会在管道启动之前运行。例如,假定我们的管道由具有单一映射与归约阶段的简单 MapReduce 作业构成,同时假设映射阶段的计算成本更高,那么我们可以编写出一个简单脚本,预测下一小时需要处理的数据量,并借助 Hadoop 作业的方式计算出处理这部分数据所需要的映射器数量。当我们了解当前映射任务数量后,即可判断出并行运行所有映射任务所需要的服务器数量。

在运行 Spark 实时管道时,事情要更棘手一些。这是因为我们有时候必须删除应用程序运行时中的计算资源。对我们来说,最简单有效的策略之一,是在现有集群之外并行创建一个独立的实时集群,根据最近一小时内的数据处理量将其扩展至所需的大小,而后增加部分容量再重新启动新集群上的实时应用程序。

相关文章