在亚马逊 SageMaker Pipelines 中使用 PySpark 运行安全处理作业

亚马逊 SageMaker Studio 可以帮助您构建、训练、调试、部署和监控模型,并管理您的机器学习 (ML) 工作流程。 亚马逊 SageMaker Pip elin es 使您能够在 Stud io 中构建安全、可扩展和灵活的 mLoPs 平台。

在这篇文章中,我们解释了如何在管道中运行 PySpark 处理作业。这使任何想要使用 Pipelines 训练模型的人也可以使用 PySpark 预处理训练数据、后处理推理数据或评估模型。当您需要处理大规模数据时,此功能尤其重要。此外,我们还展示了如何使用配置和 Spark 用户界面日志来优化您的 PySpark 步骤。

管道是一款 亚马逊 S ageMaker 工具,用于构建和管理端到端机器学习管道。它是一项完全托管的按需服务,与 SageMaker 和其他 亚马逊云科技 服务集成,因此可以为您创建和管理资源。这可确保仅在运行管道时配置和使用实例。此外,流水线由 SageMaker Python SDK 支持 ,允许您跟踪 数据沿袭 并 通过缓存步骤来 重复使用这些步骤 ,从而缩短开发时间和降低开发成本。SageMaker 流水线可以使用 处理步骤 来处理数据或执行模型评估。

在处理大规模数据时,数据科学家和机器学习工程师经常使用 PySpark ,这是 Python Apache Spark 的接口。SageMaker 提供预构建的 Docker 镜像,其中包含 PySpark 和运行分布式数据处理作业(包括使用 Spark 框架进行数据转换和功能工程)所需的其他依赖关系。尽管这些映像允许您快速开始在处理作业中使用 PySpark,但大规模数据处理通常需要特定的 Spark 配置,以优化 SageMaker 创建的集群的分布式计算。

在我们的示例中,我们创建了一个运行单个处理步骤的 SageMaker 流水线。有关可以向管道添加哪些其他步骤的更多信息,请参阅工作 流步骤

SageMaker 处理库

SageMaker Processing 可以在特定的 框架 (例如 sklearnProcessor、pySparkProcessor 或 Hugging Face)上运行。无论使用的框架如何,每个 处理步骤都 需要以下内容:

  • 步骤名称 -用于 SageMaker 工作流步骤的名称
  • 步骤参数 - 处理 步骤的参数

此外,您可以提供以下信息:

  • 为您的步骤缓存配置以避免在 SageMaker 管道中不必要地运行您的步骤
  • Proc essingStep 所依赖的步骤名称、步骤实例或步骤 收集实例的列表
  • 处理 步骤的显示名称
  • 处理 步骤的描述
  • 属性文件
  • 重试策略

参数已移交给 p rocessing Step。你可以使用 Sagemaker.spark.py sparkProcessor 或 sagemaker.sparkjarProces sor 类在处理作业中运行你的 Spar k 应用程序

每个处理器都有自己的需求,具体取决于框架。最好使用 PySpark Processor 来说明这一点 ,在其中您可以传递其他信息以 进一步优化 处理步骤 ,例如在运行作业时通过 配置 参数。

在安全的环境中运行 SageMaker 处理作业

最佳做法 是创建私有 Amazon VPC 并对其进行配置,使您的任务无法通过公共互联网访问。 SageMaker 处理任务允许您指定您的 VPC 中的私有子网和安全组,并使用 createProcessingJob API 的 netw orkConfig.vpcConfig 请求参数启用网络 隔离和容器间流量加密。 在下一节中,我们将使用 SageMaker SDK 提供此配置 的示例。

SageMaker 管道中的 pySpark 处理步骤

在此示例中,我们假设您将 Studio 部署在已经可用的安全环境中,包括 VPC、VPC 终端节点、安全组、 亚马逊云科技 身份和访问管理 (IAM) 角色以及 亚马逊云科技 密钥管理服务 (亚马逊云科技 KMS) 密钥。我们还假设你有两个存储桶:一个用于存放代码和日志等工件,另一个用于存放数据。 basic_infra.yaml 文件提供了用于配置必要的必备基础设施 的 亚马逊云科技 CloudForm at ion 代码 示例。示例代码和部署指南也可在 GitHub 上找到

例如,我们建立了一个包含单个 P rocessingSt ep 的管道,在该管道 中,我们只需使用 Spark 读取和写入 鲍鱼数据集 即可。代码示例向您展示了如何设置和配置 P rocessing Step。

我们为管道定义参数(名称、角色、存储桶等)和特定步骤的设置(实例类型和数量、框架版本等)。在此示例中,我们使用安全设置,还定义了子网、安全组和容器间流量加密。在此示例中,您需要一个具有 SageMaker 完全访问权限的管道执行角色和一个 VPC。参见以下代码:

{
	"pipeline_name": "ProcessingPipeline",
	"trial": "test-blog-post",
	"pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>",
	"network_subnet_ids": [
		"subnet-<SUBNET_ID>",
		"subnet-<SUBNET_ID>"
	],
	"network_security_group_ids": [
		"sg-<SG_ID>"
	],
	"pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>",
	"pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>",
	"pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py",
	"spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json",
	"pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py",
	"process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}",
	"pyspark_framework_version": "2.4",
	"pyspark_process_name": "pyspark-processing",
	"pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv",
	"pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output",
	"pyspark_process_instance_type": "ml.m5.4xlarge",
	"pyspark_process_instance_count": 6,
	"tags": {
		"Project": "tag-for-project",
		"Owner": "tag-for-owner"
	}
}

为了演示,以下代码示例使用 PySparkProcessor 在管道内运行 SageMaker Procession 上的 PySpark 脚本:

# import code requirements
# standard libraries import
import logging
import json

# sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor

from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config

def create_pipeline(pipeline_params, logger):
    """
    Args:
        pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters
        logger (logger): logger
    Returns:
        ()
    """
    # Create SageMaker Session
    sagemaker_session = PipelineSession()

    # Get Tags
    tags_input = get_tags_input(pipeline_params["tags"])

    # get network configuration
    network_config = get_network_configuration(
        subnets=pipeline_params["network_subnet_ids"],
        security_group_ids=pipeline_params["network_security_group_ids"]
    )

    # Get Pipeline Configurations
    pipeline_config = get_pipeline_config(pipeline_params)

    # setting processing cache obj
    logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days")
    cache_config = CacheConfig(enable_caching=True, expire_after="p30d")

    # Create PySpark Processing Step
    logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor")

    # setting up spark processor
    processing_pyspark_processor = PySparkProcessor(
        base_job_name=pipeline_params["pyspark_process_name"],
        framework_version=pipeline_params["pyspark_framework_version"],
        role=pipeline_params["pipeline_role"],
        instance_count=pipeline_params["pyspark_process_instance_count"],
        instance_type=pipeline_params["pyspark_process_instance_type"],
        volume_kms_key=pipeline_params["pyspark_process_volume_kms"],
        output_kms_key=pipeline_params["pyspark_process_output_kms"],
        network_config=network_config,
        tags=tags_input,
        sagemaker_session=sagemaker_session
    )
    
    # setting up arguments
    run_ags = processing_pyspark_processor.run(
        submit_app=pipeline_params["pyspark_process_code"],
        submit_py_files=[pipeline_params["pyspark_helper_code"]],
        arguments=[
        # processing input arguments. To add new arguments to this list you need to provide two entrances:
        # 1st is the argument name preceded by "--" and the 2nd is the argument value
        # setting up processing arguments
            "--input_table", pipeline_params["pyspark_process_data_input"],
            "--output_table", pipeline_params["pyspark_process_data_output"]
        ],
        spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]),
        inputs = [
            ProcessingInput(
                source=pipeline_params["spark_config_file"],
                destination="/opt/ml/processing/input/conf",
                s3_data_type="S3Prefix",
                s3_input_mode="File",
                s3_data_distribution_type="FullyReplicated",
                s3_compression_type="None"
            )
        ],
    )

    # create step
    pyspark_processing_step = ProcessingStep(
        name=pipeline_params["pyspark_process_name"],
        step_args=run_ags,
        cache_config=cache_config,
    )

    # Create Pipeline
    pipeline = Pipeline(
        name=pipeline_params["pipeline_name"],
        steps=[
            pyspark_processing_step
        ],
        pipeline_experiment_config=PipelineExperimentConfig(
            pipeline_params["pipeline_name"],
            pipeline_config["trial"]
        ),
        sagemaker_session=sagemaker_session
    )
    pipeline.upsert(
        role_arn=pipeline_params["pipeline_role"],
        description="Example pipeline",
        tags=tags_input
    )
    return pipeline


def main():
    # set up logging
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.INFO)
    logger.info("Get Pipeline Parameter")

    with open("ml_pipeline/params/pipeline_params.json", "r") as f:
        pipeline_params = json.load(f)
    print(pipeline_params)

    logger.info("Create Pipeline")
    pipeline = create_pipeline(pipeline_params, logger=logger)
    logger.info("Execute Pipeline")
    execution = pipeline.start()
    return execution


if __name__ == "__main__":
    main()

如前面的代码所示,我们通过提供配置 .json 作为 processingInput 来覆盖默认 Spark 配置。 我们使用保存在亚马逊 S imple Storage Service ( Amazon S3) 中的 配置.js on 文件,其设置如下:

[
    {
        "Classification":"spark-defaults",
        "Properties":{
            "spark.executor.memory":"10g",
            "spark.executor.memoryOverhead":"5g",
            "spark.driver.memory":"10g",
            "spark.driver.memoryOverhead":"10g",
            "spark.driver.maxResultSize":"10g",
            "spark.executor.cores":5,
            "spark.executor.instances":5,
            "spark.yarn.maxAppAttempts":1
            "spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com",
            "spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true
        }
    }
]

我们可以通过将文件作为 p rocessingInput 传递 或在运行 run () 函数时使用配置参数来更新默认 Spark 配置。

Spark 配置取决于其他选项,例如为处理作业选择的实例类型和实例数。第一个考虑因素是实例的数量、每个实例拥有的 vCPU 内核以及实例内存。您可以使用 Spark UI CloudWatch 实例指标 和日志在多次运行迭代中校准这些值。

此外,可以进一步优化执行器和驱动程序的设置。有关如何计算这些内存的示例,请参阅在 A mazon EM R 上成功管理 Apache Spark 应用程序内存 的最佳实践

接下来,对于驱动程序和执行器设置,我们建议调查提交者设置以提高写入 Amazon S3 时的性能。在我们的例子中,我们正在向亚马逊 S3 写入 Parquet 文件并将 “sp ark.sql.parquet.fs.optimized.comitter.optimization-enabled” 设置为真

如果需要连接到 Amazon S3,可以在配置文件中指定区域终端节点 “sp ark.hadoop.fs.s3a.endpoint ”。

在此示例管道中,PySpark 脚本 spark_process.py (如以下代码所示)将来自亚马逊 S3 的 CSV 文件加载到 Spark 数据框中,并将数据作为 Parquet 保存回亚马逊 S3。

请注意,我们的示例配置与工作负载不成比例,因为读取和写入鲍鱼数据集可以在一个实例上的默认设置下完成。我们提到的配置应根据您的特定需求进行定义。

# import requirements
import argparse
import logging
import sys
import os
import pandas as pd

# spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType

from data_utils import(
    spark_read_parquet,
    Unbuffered
)

sys.stdout = Unbuffered(sys.stdout)

# Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO)

def main(data_path):

    spark = SparkSession.builder.appName("PySparkJob").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")

    schema = StructType(
        [
            StructField("sex", StringType(), True),
            StructField("length", FloatType(), True),
            StructField("diameter", FloatType(), True),
            StructField("height", FloatType(), True),
            StructField("whole_weight", FloatType(), True),
            StructField("shucked_weight", FloatType(), True),
            StructField("viscera_weight", FloatType(), True),
            StructField("rings", FloatType(), True),
        ]
    )

    df = spark.read.csv(data_path, header=False, schema=schema)
    return df.select("sex", "length", "diameter", "rings")

if __name__ == "__main__":
    logger.info(f"===============================================================")
    logger.info(f"================= Starting pyspark-processing =================")
    parser = argparse.ArgumentParser(description="app inputs")
    parser.add_argument("--input_table", type=str, help="path to the channel data")
    parser.add_argument("--output_table", type=str, help="path to the output data")
    args = parser.parse_args()
    
    df = main(args.input_table)

    logger.info("Writing transformed data")
    df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite")

    # save data
    df.coalesce(10).write.mode("overwrite").parquet(args.output_table)

    logger.info(f"================== Ending pyspark-processing ==================")
    logger.info(f"===============================================================")

要深入优化 Spark 处理任务,你可以使用 CloudWatch 日志和 Spark 用户界面。你可以通过在 SageMaker 笔记本实例上运行处理作业来创建 Spark 用户界面。如果 Spark UI 日志保存在同一 Amazon S3 位置,则可以通过 在 SageMaker 笔记本实例中 运行历史服务器 来查看管道中运行的处理作业 的 Spark 用户界面。

清理

如果您按照教程进行操作,则最好删除不再用于停止产生费用的资源。确保 删除您用于创建资源的 CloudFormation 堆栈 。这将删除创建的堆栈及其创建的资源。

结论

在这篇文章中,我们展示了如何在 SageMaker Pipelines 中使用 PySpark 运行安全的 SageMaker 处理作业。我们还演示了如何使用 Spark 配置优化 PySpark,以及如何将处理作业设置为在安全的网络配置中运行。

下一步,探索如何实现整个模型生命周期的自动化,以及 客户如何 使用 SageMaker 服务构建安全且可扩展的 mLoPs 平台


作者简介

玛伦·苏尔曼 是 A WS 专业服务的数据科学家。 她与各行各业的客户合作,揭示人工智能/机器学习在实现业务成果方面的力量。自 2019 年 11 月以来,Maren 一直在 亚马逊云科技 工作。在业余时间,她喜欢跆拳道、徒步欣赏美景和玩棋盘游戏之夜。


Maira Ladeira Tanke
是 亚马逊云科技 的机器学习专家。她拥有数据科学背景,拥有 9 年与各行各业客户一起架构和构建 ML 应用程序的经验。作为技术主管,她通过新兴技术和创新解决方案帮助客户加速实现商业价值。在空闲时间,Maira喜欢旅行和在温暖的地方与家人共度时光。


Pauline Ting
亚马逊云科技 专业服务 团队的数据科学家。她通过开发 AI/ML 解决方案来支持客户实现和加速其业务成果。在业余时间,Pauline 喜欢旅行、冲浪和尝试新的甜点店。


Donald Fossouo
是 A WS 专业服务 团队的高级数据架构师,主要在全球金融服务部门工作。他与客户合作创建创新的解决方案,以解决客户的业务问题并加速 亚马逊云科技 服务的采用。在业余时间,唐纳德喜欢阅读、跑步和旅行。


*前述特定亚马逊云科技生成式人工智能相关的服务仅在亚马逊云科技海外区域可用,亚马逊云科技中国仅为帮助您发展海外业务和/或了解行业前沿技术选择推荐该服务。