Google Cloud VertexAI 算子

Google Cloud VertexAI 将 AutoML 和 AI Platform 统一到一个 API、客户端库和用户界面中。AutoML 允许您在图像、表格、文本和视频数据集上训练模型,而无需编写代码,而 AI Platform 中的训练允许您运行自定义训练代码。使用 Vertex AI,AutoML 训练和自定义训练都是可用的选项。无论您选择哪种训练选项,您都可以使用 Vertex AI 保存模型、部署模型和请求预测。

创建数据集

要创建 Google VertexAI 数据集,您可以使用 CreateDatasetOperator。该算子在 XCom 中返回数据集 ID,键为 dataset_id

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py[源代码]

create_image_dataset_job = CreateDatasetOperator(
    task_id="image_dataset",
    dataset=IMAGE_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)
create_tabular_dataset_job = CreateDatasetOperator(
    task_id="tabular_dataset",
    dataset=TABULAR_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)
create_text_dataset_job = CreateDatasetOperator(
    task_id="text_dataset",
    dataset=TEXT_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)
create_video_dataset_job = CreateDatasetOperator(
    task_id="video_dataset",
    dataset=VIDEO_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)
create_time_series_dataset_job = CreateDatasetOperator(
    task_id="time_series_dataset",
    dataset=TIME_SERIES_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)

创建数据集后,可以使用 ImportDataOperator 使用该数据集导入一些数据。

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py[源代码]

import_data_job = ImportDataOperator(
    task_id="import_data",
    dataset_id=create_image_dataset_job.output["dataset_id"],
    region=REGION,
    project_id=PROJECT_ID,
    import_configs=TEST_IMPORT_CONFIG,
)

要导出数据集,可以使用 ExportDataOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py[源代码]

export_data_job = ExportDataOperator(
    task_id="export_data",
    dataset_id=create_image_dataset_job.output["dataset_id"],
    region=REGION,
    project_id=PROJECT_ID,
    export_config=TEST_EXPORT_CONFIG,
)

要删除数据集,可以使用 DeleteDatasetOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py[源代码]

delete_dataset_job = DeleteDatasetOperator(
    task_id="delete_dataset",
    dataset_id=create_text_dataset_job.output["dataset_id"],
    region=REGION,
    project_id=PROJECT_ID,
)

要获取数据集,可以使用 GetDatasetOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py[源代码]

get_dataset = GetDatasetOperator(
    task_id="get_dataset",
    project_id=PROJECT_ID,
    region=REGION,
    dataset_id=create_tabular_dataset_job.output["dataset_id"],
)

要获取数据集列表,可以使用 ListDatasetsOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py[源代码]

list_dataset_job = ListDatasetsOperator(
    task_id="list_dataset",
    region=REGION,
    project_id=PROJECT_ID,
)

要更新数据集,可以使用 UpdateDatasetOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py[源代码]

update_dataset_job = UpdateDatasetOperator(
    task_id="update_dataset",
    project_id=PROJECT_ID,
    region=REGION,
    dataset_id=create_video_dataset_job.output["dataset_id"],
    dataset=DATASET_TO_UPDATE,
    update_mask=TEST_UPDATE_MASK,
)

创建训练作业

要创建 Google Vertex AI 训练作业,你有三个操作符 CreateCustomContainerTrainingJobOperatorCreateCustomPythonPackageTrainingJobOperatorCreateCustomTrainingJobOperator。它们每个都将等待操作完成。每个操作符的结果都将是一个模型,该模型是由用户使用这些操作符训练的。

准备步骤

对于每个操作符,你必须准备并创建数据集。然后将数据集 ID 放入操作符中的 dataset_id 参数。

如何运行自定义容器训练作业 CreateCustomContainerTrainingJobOperator

在开始运行此作业之前,你应该创建一个包含训练脚本的 docker 镜像。你可以通过此链接找到有关如何创建镜像的文档:https://cloud.google.com/vertex-ai/docs/training/create-custom-container 之后,你应该将指向镜像的链接放入 container_uri 参数中。你还可以键入将在 command 参数中从该镜像创建的容器的执行命令。

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_container.py[source]

create_custom_container_training_job = CreateCustomContainerTrainingJobOperator(
    task_id="custom_container_task",
    staging_bucket=f"gs://{CUSTOM_CONTAINER_GCS_BUCKET_NAME}",
    display_name=CONTAINER_DISPLAY_NAME,
    container_uri=CUSTOM_CONTAINER_URI,
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=tabular_dataset_id,
    command=["python3", "task.py"],
    model_display_name=MODEL_DISPLAY_NAME,
    replica_count=REPLICA_COUNT,
    machine_type=MACHINE_TYPE,
    accelerator_type=ACCELERATOR_TYPE,
    accelerator_count=ACCELERATOR_COUNT,
    training_fraction_split=TRAINING_FRACTION_SPLIT,
    validation_fraction_split=VALIDATION_FRACTION_SPLIT,
    test_fraction_split=TEST_FRACTION_SPLIT,
    region=REGION,
    project_id=PROJECT_ID,
)

CreateCustomContainerTrainingJobOperator 也提供了可延迟模式

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_container.py[source]

create_custom_container_training_job_deferrable = CreateCustomContainerTrainingJobOperator(
    task_id="custom_container_task_deferrable",
    staging_bucket=f"gs://{CUSTOM_CONTAINER_GCS_BUCKET_NAME}",
    display_name=f"{CONTAINER_DISPLAY_NAME}_DEF",
    container_uri=CUSTOM_CONTAINER_URI,
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=tabular_dataset_id,
    command=["python3", "task.py"],
    model_display_name=f"{MODEL_DISPLAY_NAME}_DEF",
    replica_count=REPLICA_COUNT,
    machine_type=MACHINE_TYPE,
    accelerator_type=ACCELERATOR_TYPE,
    accelerator_count=ACCELERATOR_COUNT,
    training_fraction_split=TRAINING_FRACTION_SPLIT,
    validation_fraction_split=VALIDATION_FRACTION_SPLIT,
    test_fraction_split=TEST_FRACTION_SPLIT,
    region=REGION,
    project_id=PROJECT_ID,
    deferrable=True,
)

如何运行 Python 包训练作业 CreateCustomPythonPackageTrainingJobOperator

在开始运行此作业之前,您应该创建一个包含训练脚本的 Python 包。有关如何创建的文档,您可通过此链接找到: https://cloud.google.com/vertex-ai/docs/training/create-python-pre-built-container 接下来,您应该将链接放入 python_package_gcs_uri 参数中的包中,同时 python_module_name 参数应具有将运行您的训练任务的脚本的名称。

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job_python_package.py[source]

create_custom_python_package_training_job = CreateCustomPythonPackageTrainingJobOperator(
    task_id="python_package_task",
    staging_bucket=f"gs://{CUSTOM_PYTHON_GCS_BUCKET_NAME}",
    display_name=PACKAGE_DISPLAY_NAME,
    python_package_gcs_uri=PYTHON_PACKAGE_GCS_URI,
    python_module_name=PYTHON_MODULE_NAME,
    container_uri=CONTAINER_URI,
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=tabular_dataset_id,
    model_display_name=MODEL_DISPLAY_NAME,
    replica_count=REPLICA_COUNT,
    machine_type=MACHINE_TYPE,
    accelerator_type=ACCELERATOR_TYPE,
    accelerator_count=ACCELERATOR_COUNT,
    training_fraction_split=TRAINING_FRACTION_SPLIT,
    validation_fraction_split=VALIDATION_FRACTION_SPLIT,
    test_fraction_split=TEST_FRACTION_SPLIT,
    region=REGION,
    project_id=PROJECT_ID,
)

CreateCustomPythonPackageTrainingJobOperator 也提供了可延迟模式

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job_python_package.py[source]

create_custom_python_package_training_job_deferrable = CreateCustomPythonPackageTrainingJobOperator(
    task_id="python_package_task_deferrable",
    staging_bucket=f"gs://{CUSTOM_PYTHON_GCS_BUCKET_NAME}",
    display_name=f"{PACKAGE_DISPLAY_NAME}_DEF",
    python_package_gcs_uri=PYTHON_PACKAGE_GCS_URI,
    python_module_name=PYTHON_MODULE_NAME,
    container_uri=CONTAINER_URI,
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=tabular_dataset_id,
    model_display_name=f"{MODEL_DISPLAY_NAME}_DEF",
    replica_count=REPLICA_COUNT,
    machine_type=MACHINE_TYPE,
    accelerator_type=ACCELERATOR_TYPE,
    accelerator_count=ACCELERATOR_COUNT,
    training_fraction_split=TRAINING_FRACTION_SPLIT,
    validation_fraction_split=VALIDATION_FRACTION_SPLIT,
    test_fraction_split=TEST_FRACTION_SPLIT,
    region=REGION,
    project_id=PROJECT_ID,
    deferrable=True,
)

如何运行自定义训练作业 CreateCustomTrainingJobOperator

要创建和运行自定义训练作业,您应将本地训练脚本的路径放入 script_path 参数中。

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py[source]

create_custom_training_job = CreateCustomTrainingJobOperator(
    task_id="custom_task",
    staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
    display_name=CUSTOM_DISPLAY_NAME,
    script_path=LOCAL_TRAINING_SCRIPT_PATH,
    container_uri=CONTAINER_URI,
    requirements=["gcsfs==0.7.1"],
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=tabular_dataset_id,
    replica_count=REPLICA_COUNT,
    model_display_name=MODEL_DISPLAY_NAME,
    region=REGION,
    project_id=PROJECT_ID,
)

model_id_v1 = create_custom_training_job.output["model_id"]

可以在可延迟模式下执行相同操作

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py[source]

create_custom_training_job_deferrable = CreateCustomTrainingJobOperator(
    task_id="custom_task_deferrable",
    staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
    display_name=f"{CUSTOM_DISPLAY_NAME}-def",
    script_path=LOCAL_TRAINING_SCRIPT_PATH,
    container_uri=CONTAINER_URI,
    requirements=["gcsfs==0.7.1"],
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=tabular_dataset_id,
    replica_count=REPLICA_COUNT,
    model_display_name=f"{MODEL_DISPLAY_NAME}-def",
    region=REGION,
    project_id=PROJECT_ID,
    deferrable=True,
)
model_id_deferrable_v1 = create_custom_training_job_deferrable.output["model_id"]

此外,您可以创建现有自定义训练作业的新版本。它将用另一个版本替换现有模型,而不是在模型注册表中创建新模型。这可以通过在运行自定义训练作业时指定 parent_model 参数来完成。

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py[source]

create_custom_training_job_v2 = CreateCustomTrainingJobOperator(
    task_id="custom_task_v2",
    staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
    display_name=CUSTOM_DISPLAY_NAME,
    script_path=LOCAL_TRAINING_SCRIPT_PATH,
    container_uri=CONTAINER_URI,
    requirements=["gcsfs==0.7.1"],
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    parent_model=model_id_v1,
    # run params
    dataset_id=tabular_dataset_id,
    replica_count=REPLICA_COUNT,
    model_display_name=MODEL_DISPLAY_NAME,
    sync=False,
    region=REGION,
    project_id=PROJECT_ID,
)

可以在可延迟模式下执行相同操作

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py[source]

create_custom_training_job_deferrable_v2 = CreateCustomTrainingJobOperator(
    task_id="custom_task_deferrable_v2",
    staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
    display_name=f"{CUSTOM_DISPLAY_NAME}-def",
    script_path=LOCAL_TRAINING_SCRIPT_PATH,
    container_uri=CONTAINER_URI,
    requirements=["gcsfs==0.7.1"],
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    parent_model=model_id_deferrable_v1,
    # run params
    dataset_id=tabular_dataset_id,
    replica_count=REPLICA_COUNT,
    model_display_name=f"{MODEL_DISPLAY_NAME}-def",
    sync=False,
    region=REGION,
    project_id=PROJECT_ID,
    deferrable=True,
)

您可以使用 ListCustomTrainingJobOperator 获取训练作业列表。

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_list_custom_jobs.py[源代码]

list_custom_training_job = ListCustomTrainingJobOperator(
    task_id="list_custom_training_job",
    region=REGION,
    project_id=PROJECT_ID,
)

如果您希望删除自定义训练作业,可以使用 DeleteCustomTrainingJobOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py[source]

delete_custom_training_job = DeleteCustomTrainingJobOperator(
    task_id="delete_custom_training_job",
    training_pipeline_id="{{ task_instance.xcom_pull(task_ids='custom_task', key='training_id') }}",
    custom_job_id="{{ task_instance.xcom_pull(task_ids='custom_task', key='custom_job_id') }}",
    region=REGION,
    project_id=PROJECT_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

创建 AutoML 训练作业

要创建 Google Vertex AI Auto ML 训练作业,您有五个操作符 CreateAutoMLForecastingTrainingJobOperator CreateAutoMLImageTrainingJobOperator CreateAutoMLTabularTrainingJobOperator CreateAutoMLTextTrainingJobOperator CreateAutoMLVideoTrainingJobOperator 它们每个都会等待操作完成。每个操作符的结果将是一个模型,该模型由用户使用这些操作符进行训练。

如何运行 AutoML 预测训练作业 CreateAutoMLForecastingTrainingJobOperator

在开始运行此作业之前,您必须准备并创建 TimeSeries 数据集。之后,您应将数据集 ID 放入操作符中的 dataset_id 参数。

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_forecasting_training.py[source]

create_auto_ml_forecasting_training_job = CreateAutoMLForecastingTrainingJobOperator(
    task_id="auto_ml_forecasting_task",
    display_name=FORECASTING_DISPLAY_NAME,
    optimization_objective="minimize-rmse",
    column_specs=COLUMN_SPECS,
    # run params
    dataset_id=forecast_dataset_id,
    target_column=TEST_TARGET_COLUMN,
    time_column=TEST_TIME_COLUMN,
    time_series_identifier_column=TEST_TIME_SERIES_IDENTIFIER_COLUMN,
    available_at_forecast_columns=[TEST_TIME_COLUMN],
    unavailable_at_forecast_columns=[TEST_TARGET_COLUMN],
    time_series_attribute_columns=["city", "zip_code", "county"],
    forecast_horizon=30,
    context_window=30,
    data_granularity_unit="day",
    data_granularity_count=1,
    weight_column=None,
    budget_milli_node_hours=1000,
    model_display_name=MODEL_DISPLAY_NAME,
    predefined_split_column_name=None,
    region=REGION,
    project_id=PROJECT_ID,
)

如何运行 AutoML 图像训练作业 CreateAutoMLImageTrainingJobOperator

在开始运行此作业之前,必须准备并创建 图像 数据集。之后,应将数据集 ID 放入运算符中的 dataset_id 参数。

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_image_training.py[源代码]

create_auto_ml_image_training_job = CreateAutoMLImageTrainingJobOperator(
    task_id="auto_ml_image_task",
    display_name=IMAGE_DISPLAY_NAME,
    dataset_id=image_dataset_id,
    prediction_type="classification",
    multi_label=False,
    model_type="CLOUD",
    training_fraction_split=0.6,
    validation_fraction_split=0.2,
    test_fraction_split=0.2,
    budget_milli_node_hours=8000,
    model_display_name=MODEL_DISPLAY_NAME,
    disable_early_stopping=False,
    region=REGION,
    project_id=PROJECT_ID,
)

如何运行 AutoML 表格训练作业 CreateAutoMLTabularTrainingJobOperator

在开始运行此作业之前,必须准备并创建 表格 数据集。之后,应将数据集 ID 放入运算符中的 dataset_id 参数。

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_tabular_training.py[源代码]

create_auto_ml_tabular_training_job = CreateAutoMLTabularTrainingJobOperator(
    task_id="auto_ml_tabular_task",
    display_name=TABULAR_DISPLAY_NAME,
    optimization_prediction_type="classification",
    column_transformations=COLUMN_TRANSFORMATIONS,
    dataset_id=tabular_dataset_id,
    target_column="Adopted",
    training_fraction_split=0.8,
    validation_fraction_split=0.1,
    test_fraction_split=0.1,
    model_display_name=MODEL_DISPLAY_NAME,
    disable_early_stopping=False,
    region=REGION,
    project_id=PROJECT_ID,
)

如何运行 AutoML 文本训练作业 CreateAutoMLTextTrainingJobOperator

在开始运行此作业之前,必须准备并创建 文本 数据集。之后,应将数据集 ID 放入运算符中的 dataset_id 参数。

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_text_training.py[源代码]

create_auto_ml_text_training_job = CreateAutoMLTextTrainingJobOperator(
    task_id="auto_ml_text_task",
    display_name=TEXT_DISPLAY_NAME,
    prediction_type="classification",
    multi_label=False,
    dataset_id=text_dataset_id,
    model_display_name=MODEL_DISPLAY_NAME,
    training_fraction_split=0.7,
    validation_fraction_split=0.2,
    test_fraction_split=0.1,
    sync=True,
    region=REGION,
    project_id=PROJECT_ID,
)

如何运行 AutoML 视频训练作业 CreateAutoMLVideoTrainingJobOperator

在开始运行此作业之前,必须准备并创建 视频 数据集。之后,应将数据集 ID 放入运算符中的 dataset_id 参数。

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py[源代码]

create_auto_ml_video_training_job = CreateAutoMLVideoTrainingJobOperator(
    task_id="auto_ml_video_task",
    display_name=VIDEO_DISPLAY_NAME,
    prediction_type="classification",
    model_type="CLOUD",
    dataset_id=video_dataset_id,
    model_display_name=MODEL_DISPLAY_NAME,
    region=REGION,
    project_id=PROJECT_ID,
)
model_id_v1 = create_auto_ml_video_training_job.output["model_id"]

此外,您还可以创建现有 AutoML 视频训练作业的新版本。在这种情况下,结果将是现有模型的新版本,而不是在模型注册表中创建的新模型。可以通过在运行 AutoML 视频训练作业时指定 parent_model 参数来完成此操作。

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py[源代码]

create_auto_ml_video_training_job_v2 = CreateAutoMLVideoTrainingJobOperator(
    task_id="auto_ml_video_v2_task",
    display_name=VIDEO_DISPLAY_NAME,
    prediction_type="classification",
    model_type="CLOUD",
    dataset_id=video_dataset_id,
    model_display_name=MODEL_DISPLAY_NAME,
    parent_model=model_id_v1,
    region=REGION,
    project_id=PROJECT_ID,
)

您可以使用 ListAutoMLTrainingJobOperator 获取 AutoML 训练作业的列表。

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_list_training.py[源代码]

list_auto_ml_training_job = ListAutoMLTrainingJobOperator(
    task_id="list_auto_ml_training_job",
    region=REGION,
    project_id=PROJECT_ID,
)

如果您希望删除 Auto ML 训练作业,可以使用 DeleteAutoMLTrainingJobOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_forecasting_training.py[source]

delete_auto_ml_forecasting_training_job = DeleteAutoMLTrainingJobOperator(
    task_id="delete_auto_ml_forecasting_training_job",
    training_pipeline_id="{{ task_instance.xcom_pull(task_ids='auto_ml_forecasting_task', "
    "key='training_id') }}",
    region=REGION,
    project_id=PROJECT_ID,
)

创建批处理预测作业

要创建 Google VertexAI 批处理预测作业,可以使用 CreateBatchPredictionJobOperator。该操作员在 XCom 中返回批处理预测作业 ID,键为 batch_prediction_job_id

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py[源代码]

create_batch_prediction_job = CreateBatchPredictionJobOperator(
    task_id="create_batch_prediction_job",
    job_display_name=JOB_DISPLAY_NAME,
    model_name="{{ti.xcom_pull('auto_ml_forecasting_task')['name']}}",
    predictions_format="csv",
    bigquery_source=BIGQUERY_SOURCE,
    gcs_destination_prefix=GCS_DESTINATION_PREFIX,
    model_parameters=MODEL_PARAMETERS,
    region=REGION,
    project_id=PROJECT_ID,
)

CreateBatchPredictionJobOperator 还提供可延迟模式

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py[源代码]

create_batch_prediction_job_def = CreateBatchPredictionJobOperator(
    task_id="create_batch_prediction_job_def",
    job_display_name=JOB_DISPLAY_NAME,
    model_name="{{ti.xcom_pull('auto_ml_forecasting_task')['name']}}",
    predictions_format="csv",
    bigquery_source=BIGQUERY_SOURCE,
    gcs_destination_prefix=GCS_DESTINATION_PREFIX,
    model_parameters=MODEL_PARAMETERS,
    region=REGION,
    project_id=PROJECT_ID,
    deferrable=True,
)

要删除批处理预测作业,可以使用 DeleteBatchPredictionJobOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py[源代码]

delete_batch_prediction_job = DeleteBatchPredictionJobOperator(
    task_id="delete_batch_prediction_job",
    batch_prediction_job_id=create_batch_prediction_job.output["batch_prediction_job_id"],
    region=REGION,
    project_id=PROJECT_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

要获取批处理预测作业列表,可以使用 ListBatchPredictionJobsOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py[源代码]

list_batch_prediction_job = ListBatchPredictionJobsOperator(
    task_id="list_batch_prediction_jobs",
    region=REGION,
    project_id=PROJECT_ID,
)

创建端点服务

要创建 Google VertexAI 端点,可以使用 CreateEndpointOperator。该操作符在 XCom 中返回端点 ID,键为 endpoint_id

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py[源代码]

create_endpoint = CreateEndpointOperator(
    task_id="create_endpoint",
    endpoint=ENDPOINT_CONF,
    region=REGION,
    project_id=PROJECT_ID,
)

创建端点后,可以使用 DeployModelOperator 使用该端点部署某些模型。

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py[源代码]

deploy_model = DeployModelOperator(
    task_id="deploy_model",
    endpoint_id=create_endpoint.output["endpoint_id"],
    deployed_model=DEPLOYED_MODEL,
    traffic_split={"0": 100},
    region=REGION,
    project_id=PROJECT_ID,
)

要取消部署模型,可以使用 UndeployModelOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py[源代码]

undeploy_model = UndeployModelOperator(
    task_id="undeploy_model",
    endpoint_id=create_endpoint.output["endpoint_id"],
    deployed_model_id=deploy_model.output["deployed_model_id"],
    region=REGION,
    project_id=PROJECT_ID,
)

要删除端点,可以使用 DeleteEndpointOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py[源代码]

delete_endpoint = DeleteEndpointOperator(
    task_id="delete_endpoint",
    endpoint_id=create_endpoint.output["endpoint_id"],
    region=REGION,
    project_id=PROJECT_ID,
)

要获取端点列表,可以使用 ListEndpointsOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py[源代码]

list_endpoints = ListEndpointsOperator(
    task_id="list_endpoints",
    region=REGION,
    project_id=PROJECT_ID,
)

创建超参数调优作业

要创建 Google VertexAI 超参数调优作业,可以使用 CreateHyperparameterTuningJobOperator。该运算符在 XCom 中返回超参数调优作业 ID,其键为 hyperparameter_tuning_job_id

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py[源代码]

create_hyperparameter_tuning_job = CreateHyperparameterTuningJobOperator(
    task_id="create_hyperparameter_tuning_job",
    staging_bucket=STAGING_BUCKET,
    display_name=DISPLAY_NAME,
    worker_pool_specs=WORKER_POOL_SPECS,
    sync=False,
    region=REGION,
    project_id=PROJECT_ID,
    parameter_spec=PARAM_SPECS,
    metric_spec=METRIC_SPEC,
    max_trial_count=15,
    parallel_trial_count=3,
)

CreateHyperparameterTuningJobOperator 还支持可延迟模式

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py[源代码]

create_hyperparameter_tuning_job_def = CreateHyperparameterTuningJobOperator(
    task_id="create_hyperparameter_tuning_job_def",
    staging_bucket=STAGING_BUCKET,
    display_name=DISPLAY_NAME,
    worker_pool_specs=WORKER_POOL_SPECS,
    sync=False,
    region=REGION,
    project_id=PROJECT_ID,
    parameter_spec=PARAM_SPECS,
    metric_spec=METRIC_SPEC,
    max_trial_count=15,
    parallel_trial_count=3,
    deferrable=True,
)

要删除超参数调优作业,可以使用 DeleteHyperparameterTuningJobOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py[源代码]

delete_hyperparameter_tuning_job = DeleteHyperparameterTuningJobOperator(
    task_id="delete_hyperparameter_tuning_job",
    project_id=PROJECT_ID,
    region=REGION,
    hyperparameter_tuning_job_id="{{ task_instance.xcom_pull("
    "task_ids='create_hyperparameter_tuning_job', key='hyperparameter_tuning_job_id') }}",
    trigger_rule=TriggerRule.ALL_DONE,
)

要获取超参数调优作业,可以使用 GetHyperparameterTuningJobOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py[源代码]

get_hyperparameter_tuning_job = GetHyperparameterTuningJobOperator(
    task_id="get_hyperparameter_tuning_job",
    project_id=PROJECT_ID,
    region=REGION,
    hyperparameter_tuning_job_id="{{ task_instance.xcom_pull("
    "task_ids='create_hyperparameter_tuning_job', key='hyperparameter_tuning_job_id') }}",
)

要获取超参数调优作业列表,可以使用 ListHyperparameterTuningJobOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py[源代码]

list_hyperparameter_tuning_job = ListHyperparameterTuningJobOperator(
    task_id="list_hyperparameter_tuning_job",
    region=REGION,
    project_id=PROJECT_ID,
)

创建模型服务

要上传 Google VertexAI 模型,可以使用 UploadModelOperator。该运算符在 XCom 中返回模型 ID,其键为 model_id

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py[源代码]

upload_model = UploadModelOperator(
    task_id="upload_model",
    region=REGION,
    project_id=PROJECT_ID,
    model=MODEL_OBJ,
)

要导出模型,可以使用 ExportModelOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py[源代码]

export_model = ExportModelOperator(
    task_id="export_model",
    project_id=PROJECT_ID,
    region=REGION,
    model_id=upload_model.output["model_id"],
    output_config=MODEL_OUTPUT_CONFIG,
)

要删除模型,可以使用 DeleteModelOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py[源代码]

delete_model = DeleteModelOperator(
    task_id="delete_model",
    project_id=PROJECT_ID,
    region=REGION,
    model_id=upload_model.output["model_id"],
    trigger_rule=TriggerRule.ALL_DONE,
)

要获取模型列表,可以使用 ListModelsOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py[源代码]

list_models = ListModelsOperator(
    task_id="list_models",
    region=REGION,
    project_id=PROJECT_ID,
)

要按 ID 检索模型,可以使用 GetModelOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py[源代码]

get_model = GetModelOperator(
    task_id="get_model", region=REGION, project_id=PROJECT_ID, model_id=model_id_v1
)

要列出所有模型版本,可以使用 ListModelVersionsOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py[源代码]

list_model_versions = ListModelVersionsOperator(
    task_id="list_model_versions", region=REGION, project_id=PROJECT_ID, model_id=model_id_v1
)

要将模型的特定版本设为默认版本,可以使用 SetDefaultVersionOnModelOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py[源代码]

set_default_version = SetDefaultVersionOnModelOperator(
    task_id="set_default_version",
    project_id=PROJECT_ID,
    region=REGION,
    model_id=model_id_v2,
)

要向模型的特定版本添加别名,可以使用 AddVersionAliasesOnModelOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py[源代码]

add_version_alias = AddVersionAliasesOnModelOperator(
    task_id="add_version_alias",
    project_id=PROJECT_ID,
    region=REGION,
    version_aliases=["new-version", "beta"],
    model_id=model_id_v2,
)

要从特定版本的模型中删除别名,可以使用 DeleteVersionAliasesOnModelOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py[源代码]

delete_version_alias = DeleteVersionAliasesOnModelOperator(
    task_id="delete_version_alias",
    project_id=PROJECT_ID,
    region=REGION,
    version_aliases=["new-version"],
    model_id=model_id_v2,
)

要删除特定版本的模型,可以使用 DeleteModelVersionOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py[源代码]

delete_model_version = DeleteModelVersionOperator(
    task_id="delete_model_version",
    project_id=PROJECT_ID,
    region=REGION,
    model_id=model_id_v1,
    trigger_rule=TriggerRule.ALL_DONE,
)

运行管道作业

要运行 Google VertexAI 管道作业,可以使用 RunPipelineJobOperator。该操作符在 XCom 中返回管道作业 ID,其键为 pipeline_job_id

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_pipeline_job.py[源代码]

run_pipeline_job = RunPipelineJobOperator(
    task_id="run_pipeline_job",
    display_name=DISPLAY_NAME,
    template_path=TEMPLATE_PATH,
    parameter_values=PARAMETER_VALUES,
    region=REGION,
    project_id=PROJECT_ID,
)

要删除管道作业,可以使用 DeletePipelineJobOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_pipeline_job.py[源代码]

delete_pipeline_job = DeletePipelineJobOperator(
    task_id="delete_pipeline_job",
    project_id=PROJECT_ID,
    region=REGION,
    pipeline_job_id="{{ task_instance.xcom_pull("
    "task_ids='run_pipeline_job', key='pipeline_job_id') }}",
    trigger_rule=TriggerRule.ALL_DONE,
)

要获取管道作业,可以使用 GetPipelineJobOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_pipeline_job.py[源代码]

get_pipeline_job = GetPipelineJobOperator(
    task_id="get_pipeline_job",
    project_id=PROJECT_ID,
    region=REGION,
    pipeline_job_id="{{ task_instance.xcom_pull("
    "task_ids='run_pipeline_job', key='pipeline_job_id') }}",
)

要获取管道作业列表,可以使用 ListPipelineJobOperator

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_pipeline_job.py[源代码]

list_pipeline_job = ListPipelineJobOperator(
    task_id="list_pipeline_job",
    region=REGION,
    project_id=PROJECT_ID,
)

提示生成模型

要提示语言模型,可以使用 PromptLanguageModelOperator。该运算符在 XCom 中返回模型的响应,键为 prompt_response

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_generative_model.py[源代码]

prompt_language_model_task = PromptLanguageModelOperator(
    task_id="prompt_language_model_task",
    project_id=PROJECT_ID,
    location=REGION,
    prompt=PROMPT,
    pretrained_model=LANGUAGE_MODEL,
)

要生成文本嵌入,可以使用 GenerateTextEmbeddingsOperator。该运算符在 XCom 中返回模型的响应,键为 prompt_response

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_generative_model.py[源代码]

generate_text_embeddings_task = GenerateTextEmbeddingsOperator(
    task_id="generate_text_embeddings_task",
    project_id=PROJECT_ID,
    location=REGION,
    prompt=PROMPT,
    pretrained_model=TEXT_EMBEDDING_MODEL,
)

要提示多模态模型,可以使用 PromptMultimodalModelOperator。该运算符在 XCom 中返回模型的响应,键为 prompt_response

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_generative_model.py[源代码]

prompt_multimodal_model_task = PromptMultimodalModelOperator(
    task_id="prompt_multimodal_model_task",
    project_id=PROJECT_ID,
    location=REGION,
    prompt=PROMPT,
    generation_config=GENERATION_CONFIG,
    safety_settings=SAFETY_SETTINGS,
    pretrained_model=MULTIMODAL_MODEL,
)

要提示带有媒体的多模态模型,可以使用 PromptMultimodalModelWithMediaOperator。该运算符在 XCom 中返回模型的响应,键为 prompt_response

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_generative_model.py[源代码]

prompt_multimodal_model_with_media_task = PromptMultimodalModelWithMediaOperator(
    task_id="prompt_multimodal_model_with_media_task",
    project_id=PROJECT_ID,
    location=REGION,
    prompt=VISION_PROMPT,
    generation_config=GENERATION_CONFIG,
    safety_settings=SAFETY_SETTINGS,
    pretrained_model=MULTIMODAL_VISION_MODEL,
    media_gcs_path=MEDIA_GCS_PATH,
    mime_type=MIME_TYPE,
)

参考

有关更多信息,请参阅

此条目是否有用?