发布于: Jun 28, 2022
基于我们之前的架构设计,那我们分阶段来具体动手实践一个数据跨区域迁移的具体场景, Amazon 官网有很多公开的数据集,我们选定 Next Generation Weather Radar (NEXRAD) 作为数据源,该数据源在美东(us-east-1)区域;目标存储桶我们选择在 BJS 区域。该实验仅仅为了验证技术可行性,完整的参考代码见 s3deepdive github;代码不作为生产用途仅仅用来学习用途。
为了简单地完成技术验证,我们所有的测试环境基于一台 r4.2xlarge 的 Amazon Linux 机型展开,系统需要准备好:
• 在 Amazon Web Services Global 美东区域创建一台 EC2 实例
• 关联一个 IAM Role,需要有访问 S3 及 SQS 相关的管理权限
• Python2.7.x
• Boto3
• 300GB gp2 EBS 磁盘
配置好目标存储桶的 IAM Profile 及修改默认获取 IAM Role 的临时 Token 的默认超时时间和重试次数:
[ec2-user@youserver]$ aws configure --profile bjs AWS Access Key ID [None]: AKI***************A AWS Secret Access Key [None]: 7j+R6*****************oDrqUDefault region name [None]: cn-north-1Default output format [None]:[ec2-user@youserver]$ vi ~/.aws/configure[default] region = us-east-1 metadata_service_timeout = 5 metadata_service_num_attempts = 5[profile bjs] region = cn-north-1 metadata_service_timeout = 5 metadata_service_num_attempts = 5
Amazon 公共数据集没有提供清单列表,因此,我们利用前文的逻辑,并尝试利用 Amazon Web Services S3 CLI 命令生成该存储桶的对象清单。该数据集按照年月进行数据分区,我们设定对象的 Prefix 的迭代深度为 3,后台执行以下命令,并观察执行日志:
[ec2-user@youserver]$ cd NEXRAD_Demo/inventory[ec2-user@youserver]$ nohup python ../../s3deepdive/s3_inventory.py -b noaa-nexrad-level2 -r us-east-1 -d 3 > noaa-nexrad-level2.log 2>&1 &
由于数据集非常大,该命令执行需要点时间,最终,3 层的扫描帮助我们并发生成了 2751 个对象清单文件,总大小 4.5GB:
[ec2-user@youserver]$ cd NEXRAD_Demo/inventory[ec2-user@youserver]$ ls noaa-nexrad-level2.*obj* | wc –l2751
由于该场景下,源存储桶和目标存储桶之间的单次传输的速度非常有限,实测该场景下大概在 9KB/s 左右,而且网络抖动比较厉害,因此,我们尽量缩小单个任务的总数据量大小,并设定大对象的大小阈值设置为 2MB;具体参数需要在 Python 常量参数中修改:
由于清单文件太多,总数据量太大,因此,我们可以数据清单分成多个目录,分别进行计算,比如如下命令:
- 我们把大小小于 800000
- bytes 的文件放到目录./1/里面
- 把大小小于 2MB 的文件放到./2/里面
- 把大小小于 6MB 的文件放到./3/里面
大家可以根据自己的需要,分成不同的对象清单文件夹
[ec2-user@youserver]$ cd NEXRAD_Demo/inventory[ec2-user@youserver]$ mkdir 1 2[ec2-user@youserver]$ find ./ -size -800000c -print0 | xargs -0 -I {} mv {} ../1/[ec2-user@youserver]$ find ./ -size -2M -print0 | xargs -0 -I {} mv {} ../2/[ec2-user@youserver]$ find ./ -size -6M -print0 | xargs -0 -I {} mv {} ../3/[ec2-user@youserver]$ cd NEXRAD_Demo/tasksubmit[ec2-user@youserver]$ nohup python ../../s3deepdive/s3_task_submit.py -d ../inventory/1/ -r us-east-1 > noaa-nexrad-level2.task1.log 2>&1 &[ec2-user@youserver]$ nohup python ../../s3deepdive/s3_task_submit.py -d ../inventory/2/ -r us-east-1 > noaa-nexrad-level2.task2.log 2>&1 &[ec2-user@youserver]$ nohup python ../../s3deepdive/s3_task_submit.py -d ../inventory/3/ -r us-east-1 > noaa-nexrad-level2.task3.log 2>&1 &
为了演示,我们没有生成所有对象清单的传输任务,仅仅选取了其中某连个文件夹,生成的传输任务如下图所示,有些队列的消息数为 0,表示我们后台还有传输任务消息没有发送到队列中:
我们来看看队列里面的一个任务的结构组成,S3Task_Bigsize*队列中的任务相比于普通队列中的任务多了一组分片的 Range 范围:
在并发执行数据传输任务之前,我们先看看单个任务执行情况,任务执行需要指明任务队列,源和目的存储桶以及访问目标存储桶的 IAM Profile 名:
[ec2-user@youserver]$ cd NEXRAD_Demo/taskexec[ec2-user@youserver]$ nohup python ../../s3deepdive/s3_task_exec.py -q S3Task_NormalQueue15098144850121 -source_bucket noaa-nexrad-level2 -dest_bucket bjsdest -dest_profile bjs > S3Task_NormalQueue15098144850121.exec1.log 2>&1 &[ec2-user@youserver]$ nohup python ../../s3deepdive/s3_task_exec.py -q S3Task_BigSizeQueue1 -source_bucket noaa-nexrad-level2 -dest_bucket bjsdest -dest_profile bjs > S3Task_BigSizeQueue1.exec1.log 2>&1 &
从执行日志可以分析出,对于 NormalQueue 中的单个任务,由于是小对象,而且数量是 10,因此我们的执行代码可以并发执行,总体执行时间是 26 秒;对比 BigsizeQueue 中的任务,虽然总体数据大小和 NormalQueue 差 不多,但由于只有 2 个对象并发复制,该任务的总体执行时间是 363 秒。
关于并发任务执行,本质上是一个批处理的业务逻辑,假定有 1000 个任务列表,
• 每个任务数据量上限 20MB,如果传输速度在10KB/s那么一个任务需要大概需要 2048 秒即 34 分钟,但我们的的任务执行是多线程并发操作,按每个任务最多 10 个对象算,在 10KB/s 的速度下,一个任务最快需要执行 3.4 分钟左右(10 个对象并发上传),最慢34分钟(一个对象的情况下)
• 如果同时 100 个并发执行,完成所有任务,需要至少执行 10 次,总时长在 34 分钟到 340 分钟之间
• 如果并发 1000 个,完成所有任务需要至少执行1次;总时长 3.4 分钟到 34 分钟之间
本实验为了学习的目的,我们在测试机 r4.2xlarge 的机器上,后台并发执行 100 个任务,并观察数据传输的实际状况,
[ec2-user@youserver]$ cd NEXRAD_Demo/taskexec[ec2-user@youserver]$ vi parallel_run.sh #!/bin/bash for((i=2; i<52;i++)) do nohup python ../../s3deepdive/s3_task_exec.py -q S3Task_BigSizeQueue1 -source_bucket noaa-nexrad-level2 -dest_bucket bjsdest -dest_profile bjs > S3Task_BigSizeQueue1.exec_$i.log 2>&1 & done for((i=2; i<52;i++)) do nohup python ../../s3deepdive/s3_task_exec.py -q S3Task_NormalQueue15098144850121 -source_bucket noaa-nexrad-level2 -dest_bucket bjsdest -dest_profile bjs > S3Task_NormalQueue15098144850121.exec_$i.log 2>&1 & done [ec2-user@youserver]$ chmod +x parallel_run.sh[ec2-user@youserver]$ ./parallel_run.sh
针对下面这两个队列,每个运行了 50 个并发任务,因此在 SQS 界面上,可以看到传输中的消息是 50,也就是同时有 50 个消息任务正在被处理:
可以看到 BigsizeQueue 队列一次就成功完成的数据传输任务总数为 79949+50-79971=28;NormalQueue 队列一次就成功完成的数据传输任务总数为 79950+50-79958=42;我们定义任务的成功与否,为该任务中所有的对象都成功传输完成;该实验我们对于分段采取的大小是 2MB,在 9KB/s 左右的互联网传输速度下,还是有点大,容易失败;普通队列上中的任务,对象大小都在几百 KB 左右,一次传输成功的概率大很多。
本文不对单个对象的完整性问题展开探讨,对于用户首先最关心的问题是,源存储桶的对象有没有完全迁移到目前存储桶中;因此,可以定期生成目标存储桶的对象清单,并比对源存储桶的对象清单,在自定义的清单程序中,我们是逐级生产对象清单文件,有一定的规律,如果两个存储桶使用同样的 depth 参数生成,生成的对象清单文件个数首先一致的;具体到识别出有没有遗漏的迁移对象,可以进一步对比清单中的对象列表。
本文就跨区域 S3 数据迁移整体架构作了基本探讨,并在架构的基础上,学习和实践了利用 Amazon Web Services S3 CLI 以及 boto3 库如何实现自定义的对象清单,传输任务分解及执行逻辑。现实的生产场景下,还需要更多细节的思考和实践,接下来,我们会继续在大规模批处理,大规模对象集的完整性校验方面和大家继续探讨。
相关文章