发布于: Sep 24, 2022
前言:Amazon Well-Architected Framework 描述了用于在云中设计和运行工作负载的关键概念、设计原则和架构最佳实践。其中可持续性支柱作为目前 Well-Architected Framework 中的最新一员侧重于减少运行的云工作负载对环境的影响。在正确的时间以正确的数量提供正确的资源,以满足明确定义的业务需求。以 Reuse, Recycle, Reduce, Rearchitect 四个方面为准则构建最佳架构。本系列文章将以在 Amazon EKS 上的部署 Flink 作业为例,通过 Karpenter, Spot, Graviton 等技术,遵循 Reuse, Recycle, Reduce, Rearchitect 四大原则,从零开始构建最佳架构。
在上一篇文章中,我们介绍了遵循 Reuse, Recycle 原则,通过容器化应用和 Spot 实例实现云应用的交付和优化,结合 Karpenter 弹性伸缩工具最大限度地提高利用率和资源效率。在这一篇文章中,我们将:
· 遵循 Rearchitect 原则,引入 Graviton,实现从 x86 到 arm 架构的转变的同时,进一步推进成本节省和持续的基础设施优化。
· 亚马逊云科技设计的 Amazon Graviton 处理器为 Amazon EC2 中运行的云工作负载提供最佳性价比。与当前一代基于 x86 的实例相比,采用亚马逊云科技 Graviton2 处理器的通用可突发性能型 (T4g) 、通用型 (M6g) 、计算优化型 (C6g) 和内存优化型 (R6g, X2gd) EC2 实例,及其具有基于 NVMe 的 SSD 存储的变体,为广泛的工作负载(如应用程序服务器、微服务、视频编码、高性能计算、电子设计自动化、压缩、游戏、开源数据库、内存缓存和基于 CPU 的机器学习推理)提供高达40%的性价比提升。除此之外,Graviton 提供在 Amazon EC2 实例家族中每瓦能源使用的最佳性能,详细的 SPECint2017 benchmark 数据请参考下图:
实验架构回顾:
架构概要说明:
1. 创建 EKS 集群时,添加一个托管按需节点组(默认一个节点),用于部署系统组件例如 EBS CSI 驱动程序等。
2. 借助 Karpenter 动态拉起 Flink 作业需要的计算资源,通过配置多个 Provisioner,每个 Provisioner 设置不同 weight,实现精细化协同控制。
3. ARM 节点主动打上 Taints,配合使用 Tolerations,以确保 Flink 作业调度到合适的节点上。
4. 利用 docker buildx 工具一键打包 Multi-Arch 镜像并推送到镜像仓库。
5. Flink Job Manager (Flink JM) 利用 nodeSelector 主动调度到由按需节点(包括部署系统组件的按需节点组和 Karpenter 拉起的节点)。
6. Flink Task Manager (Flink TM) 默认不加任何限定条件 (nodeSelector/nodeAffinity) ,并且配置 HPA(基于CPU)。当资源不够时,由 Provisioner 按优先级协调拉起合适节点。
7. 利用 Kinesis Data Generator 生成大量模拟数据,打到 Kinesis Data Stream 数据。随着数据的增加,配置了 HPA 的 Task Manager 自动弹出更多 Pod。
8. Flink 作业启用检查点,并将作业检查点数据写入 S3,从而允许 Flink 保存状态并具备容错性。
9. 使用 Fault Injection Simulator 模拟 Spot 回收事件。
10. Node Termination Handler 配合 Spot,让应用运行更平稳。
上一篇我们已经搭建好 EKS 集群,并将 Flink 作业运行在 x86 节点上。接下来首先通过切换到基于 Graviton 的实例来提高计算工作负载的能效。同时综合运用 Karpenter 的优先级策略,实现对 Flink 作业计算资源的精细化管理。
从 CPU 架构和容量类型,我们一共设置了四个 Provisioner,注意,为保证部署在按需节点上的 Job Manager 的高可用性,我们仅对 *-spot-provisioner 启用 consolidation:
注意:不能保证 Karpenter 在特定要求下始终选择最高优先级的 Provisioner。
例如有2种场景:
1. 遵循上一篇提到的"Reuse"原则,如果现有容量可用,kube-scheduler 将直接调度 pod,不会触发 Karpenter 拉起新节点。
2. 根据 Karpenter 执行 pod 批处理和 bin 打包的方式,如果一个 pod 无法使用最高优先级的配置器进行调度,它将强制使用较低优先级的配置器创建一个节点,这可能允许该批次中的其他 pod 也可以在该节点上进行调度。
如果您希望保持简单,不要求最大化已有托管节点组利用率,对统一的 capacity 类型标签 (eks.amazonaws.com/capacityType) 也没有要求,可以设置Flink 作业只使用 Kaprenter 节点。
Karpenter 已经内置了优先选择 Spot 然后按需的机制,这样初始只需配置二个 Provisioner (x86/arm) 。对于 Flink Job Manager 等必须使用按需实例的任务,只需利用 nodeSelector 通过原生标签 karpenter.sh/capacity-type 指定即可。后期如果作业容器镜像已经都是 multi-arch,则可以进一步将 x86 和 arm 实例放在同一个 Provisioner 中,Karpenter 会分别按照 spot capacity 优先、按需成本优先的原则自动选择 x86 和 arm。您可以权衡考虑,如果单一 Provisioner 可以满足需求,则可以大幅简化目前多个 Provisioner 的配置和选择。
2.1 准备 buildx 工具
打开 Cloud9 控制台:
https://us-east-1.console.aws.amazon.com/cloud9/home?region=us-east-1
进入到 IDE 环境,前面一篇已经安装好 docker buildx 工具,如果有问题请下载 prepareCloud9IDE.sh:
wget https://raw.githubusercontent.com/BWCXME/cost-optimized-flink-on-kubernetes/main/prepareCloud9IDE.sh
然后打开查看 buildx 部分,复制到命令行手动安装:
c9 prepareCloud9IDE.sh
2.2 配置 build
创建并使用 flink-build:
docker buildx create --name flink-build --use
docker buildx inspect --bootstrap
docker buildx ls
2.3 一键打包多 CPU 架构镜像
首先进入代码目录:
cd ~/environment/flink-demo
登录仓库:
aws ecr get-login-password --region ${AWS_REGION} | docker login --username AWS --password-stdin ${ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com
借助 buildx 插件,一条命令同时编译、打包、推送 x86 和 arm 架构镜像:
docker buildx build --platform linux/amd64,linux/arm64 --tag ${ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/flink-demo:latest --push .
查看 Dockerfile,内容如下所示:
FROM maven:3.8.6-jdk-8-slim AS builder COPY src/ /home/app/src COPY pom.xml /home/app/ RUN ls -l /home/app RUN mvn -f /home/app/pom.xml clean package -Dflink.version=1.15.1 FROM flink:1.15.1 RUN mkdir -p $FLINK_HOME/usrlib COPY --from=builder /home/app/target/aws-kinesis-analytics-java-apps-1.0.jar $FLINK_HOME/usrlib/aws-kinesis-analytics-java-apps-1.0.jar RUN mkdir $FLINK_HOME/plugins/s3-fs-hadoop COPY /lib/flink-s3-fs-hadoop-1.15.1.jar $FLINK_HOME/plugins/s3-fs-hadoop/
如果您要更改基础镜像 maven 或 Flink 的版本,请确保指定 tag 下有 arm 的版本,不然 buildx 会报错。
推送完成后,检查镜像信息:
docker buildx imagetools inspect ${ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/flink-demo:latest
返回类似如下:
简单3步,Flink 作业的 ARM 镜像就打好了,即不用更改 Dockerfile,也不用单独设置 Tag。
2.4 构建自定义版本的 Flink 多 CPU 架构镜像
在 Docker Hub 上 Flink 的官方镜像仓库中只有1.14及以上的版本有支持 arm64/v8 即支持 Graviton 的镜像,如前面所说的如果镜像不支持 arm64/v8,那么通过 buildx 打包的时候会报错。但是在有些场景下,客户依然想要使用1.13版本的 Flink,或者希望使用除了 openjdk 以外的其他 JDK,比如针对 Graviton 优化的 Amazon Corretto JDK,这时候就需要我们自己编译构建一个自定义的 Flink 多 CPU 架构镜像。
作为示例,下面我们构建一个1.13版本并且基于 Amazon Corretto 11 JDK 的自定义镜像,并且同样构建成多 CPU 架构镜像。
因为涉及编译 arm64 架构的 Flink,这里推荐启动一台Amazon linux 2 操作系统的 Graviton 实例(比如 t4g.large)编译构建 Flink 镜像。
在 Graviton 实例上拉取代码:
sudo yum groupinstall "Development Tools"
git clone -b DIYflink https://github.com/BWCXME/cost-optimized-flink-on-kubernetes flink-private-demo
cd flink-private-demo
参考文档【2】,使用脚本构建1.13版本的 Flink:
sudo sh build_flink.sh -f 1.13 -j 11 -s 2.11
上述命令会直接构建一个支持 Graviton 的1.13版本的 Flink 镜像,然后需要将镜像上传到私有镜像仓库中,并且打上自定义 tag,后面会使用这个镜像构建多 CPU 架构镜像。
转到 cloud9 开发环境下,拉取代码:
cd ~/environment
git clone -b DIYflink https://github.com/BWCXME/cost-optimized-flink-on-kubernetes flink-private-demo
cd flink-private-demo
使用上面的私有仓库地址和 tag 代替 Dockerfile 中的<private_repo_address>和<tag>
将 Amazon Corretto 11 JDK 的压缩文件下载到当前目录下:
wget https://corretto.aws/downloads/latest/amazon-corretto-11-aarch64-linux-jdk.tar.gz
接下来使用 buildx 构建多 CPU 架构镜像。
登录仓库:
aws ecr get-login-password --region ${AWS_REGION} | docker login --username AWS --password-stdin ${ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com
借助 buildx 插件,一条命令同时编译、打包、推送 x86 和 arm 架构镜像:
docker buildx build --platform linux/amd64,linux/arm64 --tag ${ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/flink-demo:private --push .
在后续的部署过程中,可以用 tag 为 private 的 flink-demo 镜像代替默认的 latest 达到部署自定义 JDK 和 Flink 版本的目的,结果如下图所示:
3部署 arm provisioner
当我们有一个具有多架构 vcpus 的集群时,我们可以配合使用 k8s 里的 Taints 和 Tolerations,以确保 pod 调度到合适的节点上。
例如,默认我们可以给 gravtion 节点都打上 taint,确保不会有 x86 应用程序部署到 graviton 节点上。只有经过测试,加上了 toleration 的应用才可以调度到 gravtion 节点。
同样我们为 arm 节点分别设置 Spot 和按 provisioner。
一个能容忍"cpu-architecture:arm64:NoSchedule"污点的应用,尝试的优先顺序依次如下:
1. arm-spot-provisioner (100), arm 和 spot 两大成本优化利器的组合拳
2. x86-spot-provisioner (50),如果 arm 的 spot 资源不足,退回到 x86 spot
3. arm-ondemand-provisioner (30),如果 spot 资源总体紧张,再退到arm 按需
4. x86-ondemand-provisioner (10),最后由 x86 按需兜底
3.1 筛选机型
借助 ec2-instance-selector 工具快速搜索 arm 机型:
ec2-instance-selector --memory 16 --vcpus 4 --cpu-architecture arm64 --gpus 0
返回类似如下:
im4gn.xlarge
m6g.xlarge
m6gd.xlarge
t4g.xlarge
3.2 创建 arm provisioner
创建 arm provisioner 配置文件 provisioner-arm.yaml:
cat > provisioner-arm.yaml <<EOF apiVersion: karpenter.sh/v1alpha5 kind: Provisioner metadata: name: arm-spot-provisioner spec: consolidation: enabled: true ttlSecondsUntilExpired: 2592000 # 30 Days = 60 * 60 * 24 * 30 Seconds; weight: 100 # 值越大,优先级越高 taints: - key: cpu-architecture value: "arm64" effect: NoSchedule requirements: - key: karpenter.sh/capacity-type operator: In values: ["spot"] - key: "node.kubernetes.io/instance-type" operator: In values: ["m6g.xlarge", "m6gd.xlarge", "im4gn.xlarge"] - key: "topology.kubernetes.io/zone" operator: In values: ["${AWS_REGION}a", "${AWS_REGION}b", "${AWS_REGION}c"] - key: "kubernetes.io/arch" operator: In values: ["arm64"] kubeletConfiguration: systemReserved: cpu: 1 memory: 5Gi ephemeral-storage: 10Gi maxPods: 20 limits: resources: cpu: 1000 memory: 2000Gi providerRef: # optional, recommended to use instead of provider name: flink labels: eks.amazonaws.com/capacityType: 'SPOT' cpu-architecture: arm64 network: private group: 'NONE' --- apiVersion: karpenter.sh/v1alpha5 kind: Provisioner metadata: name: arm-ondemand-provisioner spec: consolidation: enabled: false ttlSecondsAfterEmpty: 60 ttlSecondsUntilExpired: 2592000 # 30 Days = 60 * 60 * 24 * 30 Seconds; weight: 30 # 值越大,优先级越高 taints: - key: cpu-architecture value: "arm64" effect: NoSchedule requirements: - key: karpenter.sh/capacity-type operator: In values: ["on-demand"] - key: "node.kubernetes.io/instance-type" operator: In values: ["m6g.xlarge", "m6gd.xlarge", "im4gn.xlarge"] - key: "topology.kubernetes.io/zone" operator: In values: ["${AWS_REGION}a", "${AWS_REGION}b", "${AWS_REGION}c"] - key: "kubernetes.io/arch" operator: In values: ["arm64"] kubeletConfiguration: systemReserved: cpu: 1 memory: 5Gi ephemeral-storage: 10Gi maxPods: 20 limits: resources: cpu: 1000 memory: 2000Gi providerRef: name: flink labels: eks.amazonaws.com/capacityType: 'ON_DEMAND' cpu-architecture: arm64 network: private group: 'NONE' EOF
执行部署:
k apply -f provisioner-arm.yaml
检查部署:
k apply -f provisioner-arm .yaml4部署 Flink 到 Graviton 节点
4.1 清理原 x86 部署
如果原 x86 还未删除,请先执行:
cd ~/environment/flink-demo/x86
k delete -f .
cd .
4.2 提交任务 (YAML)
您既可以通过明确定义 YAML 文件来部署,也可以利用命令行快捷部署。这里为了方便理解,我们主要演示基于完整的 YAML 文件部署,您也可以参考后面一节的命令行提交方式。
准备目录:
cd ~/environment/flink-demo/
mkdir arm
cd arm
生成配置文件:
cat > flink-configuration-configmap.yaml <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
kubernetes.cluster-id: flink-demo
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: s3a://${FLINK_S3_BUCKET}/recovery
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 100000
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 1
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 2048m
taskmanager.memory.process.size: 2048m
scheduler-mode: reactive
parallelism.default: 4
rest.flamegraph.enabled: true
log4j-console.properties: |+
# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = DEBUG
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = \${sys:log.file}
appender.rolling.filePattern = \${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
EOF
cat > jobmanager-application-ha.yaml <<EOF apiVersion: batch/v1 kind: Job metadata: name: flink-jobmanager spec: parallelism: 1 # Set the value to greater than 1 to start standby JobManagers template: metadata: labels: app: flink component: jobmanager spec: serviceAccountName: ${FLINK_SA} nodeSelector: 'eks.amazonaws.com/capacityType': 'ON_DEMAND' tolerations: - key: "cpu-architecture" operator: "Equal" value: "arm64" effect: "NoSchedule" restartPolicy: OnFailure containers: - name: jobmanager image: ${ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/flink-demo:latest imagePullPolicy: Always env: - name: POD_IP valueFrom: fieldRef: apiVersion: v1 fieldPath: status.podIP args: ["standalone-job", "--host", "\$(POD_IP)","--job-classname", "com.amazonaws.services.kinesisanalytics.S3StreamingSinkJob","--inputStreamName", "${FLINK_INPUT_STREAM}", "--region", "${AWS_REGION}", "--s3SinkPath", "s3a://${FLINK_S3_BUCKET}/data", "--checkpoint-dir", "s3a://${FLINK_S3_BUCKET}/recovery"] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob-server - containerPort: 8081 name: webui livenessProbe: tcpSocket: port: 6123 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/flink/conf securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties EOF
在上述的配置中,通过 nodeSelector,强制往按需节点上调度,同时遵循"Reuse"原则,托管节点组或者 Karpenter 拉起的按需节点都可以。如果您希望限定到没有自动伸缩组的节点(由 Karpenter 拉起),请手动添加:
nodeSelector:
'group': 'NONE'
生成 taskmanager 部署文件:
cat > taskmanager-job-deployment.yaml <<EOF apiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager spec: replicas: 1 selector: matchLabels: app: flink component: taskmanager template: metadata: labels: app: flink component: taskmanager spec: serviceAccountName: flink-service-account tolerations: - key: "cpu-architecture" operator: "Equal" value: "arm64" effect: "NoSchedule" containers: - name: taskmanager image: ${ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/flink-demo:latest imagePullPolicy: Always resources: requests: cpu: 250m memory: "4096Mi" limits: cpu: 500m memory: "8192Mi" env: args: ["taskmanager"] ports: - containerPort: 6122 name: rpc - containerPort: 6125 name: query-state livenessProbe: tcpSocket: port: 6122 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/flink/conf/ securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties EOF
我们遵循"Reuse"原则,上述配置中,没有添加 nodeSelector 或 nodeAffinity,优先利用现有计算资源,不够时再按照前面配置的 provisioner 优先级,依次尝试直到成功拉起资源。
如果您需要限定托管按需节点组优先部署集群管理相关组件,或者稳定性要求高的应用,可以参考以下配置,让 Task Manager 优先运行在 Karpenter 拉起的节点上(如果需要,请手动添加到前面生成的 taskmanager-job-deployment.yaml):
affinity:
nodeAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 50
preference:
matchExpressions:
- key: group
operator: In
values:
- NONE
准备服务部署文件:
cat > jobmanager-svc.yaml <<EOF
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob-server
port: 6124
- name: webui
port: 8081
selector:
app: flink
component: jobmanager
---
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager-web
annotations:
service.beta.kubernetes.io/aws-load-balancer-security-groups: "${EKS_EXTERNAL_SG}"
spec:
type: LoadBalancer
ports:
- name: web
port: 80
targetPort: 8081
selector:
app: flink
component: jobmanager
EOF
注意这里为了方便测试,使用 LoadBalancer 将服务暴露出来,并且绑定了安全组 ExternalSecurityGroup,请确保:
· 这个安全组允许您的本机 IP 访问80端口;
· 如果您修改了暴露端口80,例如用的8081,请相应在安全组中放开8081端口。
执行部署(请先确认在 arm 目录下):
kgpk apply -f .
检查部署:
kgp
当 Pod 都拉起来以后,检查机器,利用预设好的别名:
kk
如我们的预期,分别拉起了一台 Graviton 的按需和 Spot 实例,实现了非常好的性价比。
检查机器的 taints:
kubectl get nodes -o json | jq '.items[].spec.taints'
获取服务地址:
k get svc flink-jobmanager-web
拿到地址后在浏览器中打开:
一切正常,至此我们很轻松的就将一个 Flink 作业从 x86 迁移到了 arm。
4.3 提交任务(命令行)
您也可以使用命令行提交任务,目前 flink 1.13+以上支持 pod 模板,我们可以自定义 JM 跟 TM 的启动方式。这允许直接支持 Flink Kubernetes 配置选项不支持的高级功能。
定义任务名称:
export kubernetes_cluster_id=your-flink-job-name
使用参数 kubernetes.pod-template-file 指定包含 pod 定义的本地文件。它将用于初始化 JobManager 和 TaskManager。
指定 job manager 运行在按需节点上并且能够容忍 arm64:
cat > arm-jobmanager-pod-template.yaml <<EOF
apiVersion: v1
kind: Pod
metadata:
name: jobmanager-pod-template
spec:
nodeSelector:
'eks.amazonaws.com/capacityType': 'ON_DEMAND'
tolerations:
- key: "cpu-architecture"
operator: "Equal"
value: "arm64"
effect: "NoSchedule"
EOF
配置 task manager 能够容忍 arm64:
cat > arm-taskmanager-pod-template.yaml <<EOF
apiVersion: v1
kind: Pod
metadata:
name: taskmanager-pod-template
spec:
tolerations:
- key: "cpu-architecture"
operator: "Equal"
value: "arm64"
effect: "NoSchedule"
containers:
# Do not change the main container name
- name: flink-main-container
env:
- name: HADOOP_USER_NAME
value: "hdfs"
EOF
使用命令行提交任务,注意指定参数 kubernetes.pod-template-file.jobmanager 和 kubernetes.pod-template-file.taskmanager:
flink run-application -p 2 -t kubernetes-application \ -Dkubernetes.cluster-id=${kubernetes_cluster_id} \ -Dkubernetes.container.image=${ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/flink-demo:latest \ -Dkubernetes.container.image.pull-policy=Always \ -Dkubernetes.jobmanager.service-account=flink-service-account \ -Dkubernetes.pod-template-file.jobmanager=./arm-jobmanager-pod-template.yaml \ -Dkubernetes.rest-service.exposed.type=LoadBalancer \ -Dkubernetes.rest-service.annotations=service.beta.kubernetes.io/aws-load-balancer-security-groups:${EKS_EXTERNAL_SG} \ -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory\ -Dhigh-availability.cluster-id=${kubernetes_cluster_id} \ -Dhigh-availability.storageDir=s3://${FLINK_S3_BUCKET}/recovery \ -Dstate.savepoints.dir=s3://${FLINK_S3_BUCKET}/savepoints/${kubernetes_cluster_id} \ -Dkubernetes.taskmanager.service-account=flink-service-account \ -Dkubernetes.taskmanager.cpu=1 \ -Dtaskmanager.memory.process.size=4096m \ -Dtaskmanager.numberOfTaskSlots=2 \ -Dkubernetes.pod-template-file.taskmanager=./arm-taskmanager-pod-template.yaml \ local:///opt/flink/usrlib/aws-kinesis-analytics-java-apps-1.0.jar \ --inputStreamName ${FLINK_INPUT_STREAM} --region ${AWS_REGION} --s3SinkPath s3://${FLINK_S3_BUCKET}/data --checkpoint-dir s3://${FLINK_S3_BUCKET}/recovery
4.4 配置 HPA
首先安装 metrics-server:
k apply -f https://github.com/kubernetes-sigs/metrics-server/releases/latest/download/components.yaml
检查部署:
k get apiservice v1beta1.metrics.k8s.io -o json | jq '.status'
Autoscaling 基于 Fink 的"Reactive Mode"
(https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#reactive-mode) 。通过设置 Horizontal Pod Autoscaler,监控 CPU 负载并进行相应的缩放:
k autoscale deployment flink-taskmanager --min=1 --max=25 --cpu-percent=35
检查当前 Task Manager Pod 数量:
kgp -l component=taskmanager
目前只有一个:
5集成测试
我们在上一篇中已经设置好输入/输出,接下来我们模拟生成数据,测试整个端到端流程。
5.1 配置数据生成器
这里示例,我们使用Kinesis Data Generator,打开 https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html 页面。
点击通过 CloudFormation 配置测试用户(跳转后注意切换到自己所在区域):
下一步设置用户名和密码,其他参数保持默认,创建即可。
接着从 CloudFormation 输出堆栈里找到 URL,跳转到 Kinesis Data Generator 页面。
输入前面创建堆栈时设置的用户名和密码完成登录:
5.2 准备数据模板
示例模板如下:
{"EVENT_TIME": "{{date.now("dddd, MMMM Do YYYY, h:mm:ss a")}}",
"TICKER": "{{random.arrayElement(
["ABCD", "CDEF", "IJKL", "MNOP", "QRST"]
)}}",
"PRICE": "{{random.number(
{
"min":500,
"max":1000
}
)}}",
"ID": "{{random.uuid}}"
}
5.3 注入测试数据
切到所在区域,然后选择之前准备的输入流,替换模板后,点击发送数据。
注意:请确保 Kinesis Data Generator 仍然保持在登录状态,开始发送后先切到 Kinesis 控制台检查监控指标,确保有数据写入。
观察 HPA 变化:
k get hpa flink-taskmanager -w
观察 Task Manager Pod 数量:
kgp -l component=taskmanager
6混沌测试
您可以借助 Amazon Fault Injection Simulator
(https://aws.amazon.com/cn/fis/) 模拟 Spot 事件,例如提前5分钟发出通知,然后观察节点变化和 Flink 的行为。
6.1 配置 FIS 模板
打开 Fault Injection Simulator 控制台 :
https://us-east-1.console.aws.amazon.com/fis/home?region=us-east-1
创建新实验,参数如下:
· 实验名称,例如"flink-spot-experiment"
· Action 名称:spot-interruptions
· Action 类型:aws:ec2:send-spot-instance-interruptions
· 提前通知时间:时间在2~15分钟之内,例如设置5分钟
· Target标签筛选:Resource tags, filters and parameters
○Key: karpenter.sh/provisioner-name
○Value: arm-spot-provisioner
· Target 资源筛选:Resource filters
○路径:Name
○值:running
· Target选择模式:
○方式:Count
○数量:不超过5
6.2 监控 Job Manager 日志
可以通过 kubectl 命令:
k logs -f <job-manager-pod-name>
或者利用预装的 k9s 工具进行跟踪:
k9s
然后选择 Job Manager,按下"l"键查看日志。
6.3 启动实验
回到 FIS 控制台,启动前面创建的实验。然后详细查看 JobManager 日志,发现 JobManager 恢复作业的过程:
如果出现中断,则将使用检查点数据重新启动 Flink 应用程序。JobManager 将恢复作业。受影响的节点将被自动替换。
7成本可见性
前面我们主要在集群层面进行优化,下面我们将视角切到应用/作业层面,遵循"Reduce"原则,将成本管理进行到底。
从2022年8月25号开始,Amazon EKS 客户可以部署 EKS 优化且免费的 Kubecost 包,以实现集群成本可见性。通过 Kubecost 可以查看按 Kubernetes 资源(包括 pod、节点、命名空间、标签等)细分的成本。Kubernetes 平台管理员和财务负责人可以使用 Kubecost 可视化其 Amazon EKS 费用明细和分配成本等。
Kubecost 还能根据其基础设施环境和集群内的使用模式获得定制的成本优化建议,例如设置合适的节点规模,容器资源申请建议等。
检查可安装版本:
https://gallery.ecr.aws/kubecost/cost-analyzer
准备安装参数:
cat > kubecost-values.yaml <<EOF
service:
type: LoadBalancer
port: 80
targetPort: 9090
# nodePort:
annotations:
service.beta.kubernetes.io/aws-load-balancer-security-groups: "${EKS_EXTERNAL_SG}"
EOF
这里为方便演示,使用 LoadBalancer 将服务暴露出来,并且绑定了安全组 ExternalSecurityGroup,请确保:
· 这个安全组允许您的本机 IP 访问80端口。
· 如果您修改了暴露端口80,例如用的9090,请相应在安全组中放开9090端口。
安装 kubecost(以1.96.0为例):
helm upgrade -i kubecost oci://public.ecr.aws/kubecost/cost-analyzer --version 1.96.0 \
--namespace kubecost --create-namespace \
-f kubecost-values.yaml
获取服务地址:
k get svc kubecost-cost-analyzer -n kubecost
拿到地址后在浏览器中打开,查看节省建议类似如下:
如果提示还在收集数据,可以等待15分钟左右再刷新页面。
Kubecost is collecting data. Data should be ready for viewing within 15 minutes.
总结
在本文中,我们遵循 Amazon Well-Architected Framework的可持续性支柱,从 Rearchitect 的角度,我们介绍了通过 buildx 工具,一条命令同时编译、打包、推送 x86 和 arm 架构镜像,平滑地实现从 x86 实例迁移到 Graviton 实例,同时通过自定义 Flink 镜像让使用者可以自由选择 Flink, Java, Scala 等组件的版本以适应业务需求。从 Reduce 角度,通过部署 Kubecost 实现成本管理和持续的基础设施优化。
参考文档
【1】Optimizing Apache Flink on Amazon EKS using Amazon EC2 Spot Instances:
https://aws.amazon.com/blogs/compute/optimizing-apache-flink-on-amazon-eks-using-amazon-ec2-spot-instances/
【2】https://github.com/frego-dev/flink-docker-image-build
本篇作者
蒋龙
OPPO 实时计算平台高级研发工程师,Apache Flink Contributor,长期专注于大数据领域。曾就职于金山、美团、360等互联网公司,在大数据引擎,调度、架构等方面有丰富的实战经验。
龙斌
亚马逊云科技解决方案架构师,负责协助客户业务系统上云的解决方案架构设计和咨询,现致力于容器和机器学习相关领域的研究。
王子豪
亚马逊云科技弹性计算解决方案架构师,主要负责亚马逊云科技弹性计算相关产品的技术咨询与方案设计
翁建清
亚马逊云科技资深解决方案架构师,具有多年 IT 从业经验,涉及移动互联网、企业、金融、政府等行业,曾任职咨询总监、CIO、企业架构师等岗位,具有多年丰富的各类项目经验,尤其在数据仓库、大数据等方面具有丰富的实战经验,目前专注于企业整体上云的架构规划、设计和实施。