我们使用机器学习技术将英文博客翻译为简体中文。您可以点击导航栏中的“中文(简体)”切换到英文版本。
在亚马逊 SageMaker Pipelines 中使用 PySpark 运行安全处理作业
在这篇文章中,我们解释了如何在管道中运行 PySpark 处理作业。这使任何想要使用 Pipelines 训练模型的人也可以使用 PySpark 预处理训练数据、后处理推理数据或评估模型。当您需要处理大规模数据时,此功能尤其重要。此外,我们还展示了如何使用配置和 Spark 用户界面日志来优化您的 PySpark 步骤。
管道是一款
在处理大规模数据时,数据科学家和机器学习工程师经常使用
在我们的示例中,我们创建了一个运行单个处理步骤的 SageMaker 流水线。有关可以向管道添加哪些其他步骤的更多信息,请参阅工作
SageMaker 处理库
SageMaker Processing 可以在特定的
- 步骤名称 -用于 SageMaker 工作流步骤的名称
- 步骤参数 -
处理
步骤的参数
此外,您可以提供以下信息:
- 为您的步骤缓存配置以避免在 SageMaker 管道中不必要地运行您的步骤
- Proc
essingStep 所依赖的步骤名称、步骤实例或步骤
收集实例的列表 -
处理
步骤的显示名称 - 对
处理
步骤的描述 - 属性文件
- 重试策略
参数已移交给 p rocessing
Step。你可以使用 Sagemaker.spark.py
每个处理器都有自己的需求,具体取决于框架。最好使用 PySpark
Processor 来说明这一点 ,在其中您可以传递其他信息以 进一步优化 处理步骤
,例如在运行作业时通过 配置
参数。
在安全的环境中运行 SageMaker 处理作业
SageMaker 处理任务允许您指定您的 VPC 中的私有子网和安全组,并使用 createProcessingJob API 的 netw
在下一节中,我们将使用
orkConfig.vpcConfig 请求参数启用网络
隔离和容器间流量加密。
SageMaker 管道中的 pySpark 处理步骤
在此示例中,我们假设您将 Studio 部署在已经可用的安全环境中,包括 VPC、VPC 终端节点、安全组、
例如,我们建立了一个包含单个 P rocessingSt
ep 的管道,在该管道 中,我们只需使用 Spark 读取和写入
rocessing
Step。
我们为管道定义参数(名称、角色、存储桶等)和特定步骤的设置(实例类型和数量、框架版本等)。在此示例中,我们使用安全设置,还定义了子网、安全组和容器间流量加密。在此示例中,您需要一个具有 SageMaker 完全访问权限的管道执行角色和一个 VPC。参见以下代码:
{
"pipeline_name": "ProcessingPipeline",
"trial": "test-blog-post",
"pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>",
"network_subnet_ids": [
"subnet-<SUBNET_ID>",
"subnet-<SUBNET_ID>"
],
"network_security_group_ids": [
"sg-<SG_ID>"
],
"pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>",
"pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>",
"pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py",
"spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json",
"pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py",
"process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}",
"pyspark_framework_version": "2.4",
"pyspark_process_name": "pyspark-processing",
"pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv",
"pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output",
"pyspark_process_instance_type": "ml.m5.4xlarge",
"pyspark_process_instance_count": 6,
"tags": {
"Project": "tag-for-project",
"Owner": "tag-for-owner"
}
}
为了演示,以下代码示例使用 PySparkProcessor 在管道内运行 SageMaker Procession 上的 PySpark 脚本:
# import code requirements
# standard libraries import
import logging
import json
# sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor
from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config
def create_pipeline(pipeline_params, logger):
"""
Args:
pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters
logger (logger): logger
Returns:
()
"""
# Create SageMaker Session
sagemaker_session = PipelineSession()
# Get Tags
tags_input = get_tags_input(pipeline_params["tags"])
# get network configuration
network_config = get_network_configuration(
subnets=pipeline_params["network_subnet_ids"],
security_group_ids=pipeline_params["network_security_group_ids"]
)
# Get Pipeline Configurations
pipeline_config = get_pipeline_config(pipeline_params)
# setting processing cache obj
logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days")
cache_config = CacheConfig(enable_caching=True, expire_after="p30d")
# Create PySpark Processing Step
logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor")
# setting up spark processor
processing_pyspark_processor = PySparkProcessor(
base_job_name=pipeline_params["pyspark_process_name"],
framework_version=pipeline_params["pyspark_framework_version"],
role=pipeline_params["pipeline_role"],
instance_count=pipeline_params["pyspark_process_instance_count"],
instance_type=pipeline_params["pyspark_process_instance_type"],
volume_kms_key=pipeline_params["pyspark_process_volume_kms"],
output_kms_key=pipeline_params["pyspark_process_output_kms"],
network_config=network_config,
tags=tags_input,
sagemaker_session=sagemaker_session
)
# setting up arguments
run_ags = processing_pyspark_processor.run(
submit_app=pipeline_params["pyspark_process_code"],
submit_py_files=[pipeline_params["pyspark_helper_code"]],
arguments=[
# processing input arguments. To add new arguments to this list you need to provide two entrances:
# 1st is the argument name preceded by "--" and the 2nd is the argument value
# setting up processing arguments
"--input_table", pipeline_params["pyspark_process_data_input"],
"--output_table", pipeline_params["pyspark_process_data_output"]
],
spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]),
inputs = [
ProcessingInput(
source=pipeline_params["spark_config_file"],
destination="/opt/ml/processing/input/conf",
s3_data_type="S3Prefix",
s3_input_mode="File",
s3_data_distribution_type="FullyReplicated",
s3_compression_type="None"
)
],
)
# create step
pyspark_processing_step = ProcessingStep(
name=pipeline_params["pyspark_process_name"],
step_args=run_ags,
cache_config=cache_config,
)
# Create Pipeline
pipeline = Pipeline(
name=pipeline_params["pipeline_name"],
steps=[
pyspark_processing_step
],
pipeline_experiment_config=PipelineExperimentConfig(
pipeline_params["pipeline_name"],
pipeline_config["trial"]
),
sagemaker_session=sagemaker_session
)
pipeline.upsert(
role_arn=pipeline_params["pipeline_role"],
description="Example pipeline",
tags=tags_input
)
return pipeline
def main():
# set up logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.info("Get Pipeline Parameter")
with open("ml_pipeline/params/pipeline_params.json", "r") as f:
pipeline_params = json.load(f)
print(pipeline_params)
logger.info("Create Pipeline")
pipeline = create_pipeline(pipeline_params, logger=logger)
logger.info("Execute Pipeline")
execution = pipeline.start()
return execution
if __name__ == "__main__":
main()
如前面的代码所示,我们通过提供配置
我们使用保存在亚马逊 S
.json
作为 processingInput 来覆盖默认 Spark 配置。 配置.js
on 文件,其设置如下:
[
{
"Classification":"spark-defaults",
"Properties":{
"spark.executor.memory":"10g",
"spark.executor.memoryOverhead":"5g",
"spark.driver.memory":"10g",
"spark.driver.memoryOverhead":"10g",
"spark.driver.maxResultSize":"10g",
"spark.executor.cores":5,
"spark.executor.instances":5,
"spark.yarn.maxAppAttempts":1
"spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com",
"spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true
}
}
]
我们可以通过将文件作为 p rocessingInput
传递 或在运行 run ()
函数时使用配置参数来更新默认 Spark 配置。
Spark 配置取决于其他选项,例如为处理作业选择的实例类型和实例数。第一个考虑因素是实例的数量、每个实例拥有的 vCPU 内核以及实例内存。您可以使用
此外,可以进一步优化执行器和驱动程序的设置。有关如何计算这些内存的示例,请参阅在 A
接下来,对于驱动程序和执行器设置,我们建议调查提交者设置以提高写入 Amazon S3 时的性能。在我们的例子中,我们正在向亚马逊 S3 写入 Parquet 文件并将 “sp ark.sql.parquet.fs.optimized.comitter.optimization-enabled” 设置为真
。
如果需要连接到 Amazon S3,可以在配置文件中指定区域终端节点 “sp ark.hadoop.fs.s3a.endpoint
”。
在此示例管道中,PySpark 脚本 spark_process.py
(如以下代码所示)将来自亚马逊 S3 的 CSV 文件加载到 Spark 数据框中,并将数据作为 Parquet 保存回亚马逊 S3。
请注意,我们的示例配置与工作负载不成比例,因为读取和写入鲍鱼数据集可以在一个实例上的默认设置下完成。我们提到的配置应根据您的特定需求进行定义。
# import requirements
import argparse
import logging
import sys
import os
import pandas as pd
# spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType
from data_utils import(
spark_read_parquet,
Unbuffered
)
sys.stdout = Unbuffered(sys.stdout)
# Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO)
def main(data_path):
spark = SparkSession.builder.appName("PySparkJob").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
schema = StructType(
[
StructField("sex", StringType(), True),
StructField("length", FloatType(), True),
StructField("diameter", FloatType(), True),
StructField("height", FloatType(), True),
StructField("whole_weight", FloatType(), True),
StructField("shucked_weight", FloatType(), True),
StructField("viscera_weight", FloatType(), True),
StructField("rings", FloatType(), True),
]
)
df = spark.read.csv(data_path, header=False, schema=schema)
return df.select("sex", "length", "diameter", "rings")
if __name__ == "__main__":
logger.info(f"===============================================================")
logger.info(f"================= Starting pyspark-processing =================")
parser = argparse.ArgumentParser(description="app inputs")
parser.add_argument("--input_table", type=str, help="path to the channel data")
parser.add_argument("--output_table", type=str, help="path to the output data")
args = parser.parse_args()
df = main(args.input_table)
logger.info("Writing transformed data")
df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite")
# save data
df.coalesce(10).write.mode("overwrite").parquet(args.output_table)
logger.info(f"================== Ending pyspark-processing ==================")
logger.info(f"===============================================================")
要深入优化 Spark 处理任务,你可以使用 CloudWatch 日志和 Spark 用户界面。你可以通过在 SageMaker 笔记本实例上运行处理作业来创建 Spark 用户界面。如果
清理
如果您按照教程进行操作,则最好删除不再用于停止产生费用的资源。确保
结论
在这篇文章中,我们展示了如何在 SageMaker Pipelines 中使用 PySpark 运行安全的 SageMaker 处理作业。我们还演示了如何使用 Spark 配置优化 PySpark,以及如何将处理作业设置为在安全的网络配置中运行。
下一步,探索如何实现整个模型生命周期的自动化,以及
作者简介
玛伦·苏尔曼 是 A
Maira Ladeira Tanke 是 亚马逊云科技 的机器学习专家。她拥有数据科学背景,拥有 9 年与各行各业客户一起架构和构建 ML 应用程序的经验。作为技术主管,她通过新兴技术和创新解决方案帮助客户加速实现商业价值。在空闲时间,Maira喜欢旅行和在温暖的地方与家人共度时光。
Pauline Ting 是
Donald Fossouo 是 A
*前述特定亚马逊云科技生成式人工智能相关的服务仅在亚马逊云科技海外区域可用,亚马逊云科技中国仅为帮助您发展海外业务和/或了解行业前沿技术选择推荐该服务。