Google Cloud VertexAI 操作符¶
Google Cloud VertexAI 将 AutoML 和 AI Platform 整合到一个统一的 API、客户端库和用户界面中。AutoML 允许您在图像、表格、文本和视频数据集上训练模型,而无需编写代码,而在 AI Platform 中训练允许您运行自定义训练代码。使用 Vertex AI,AutoML 训练和自定义训练都是可用的选项。无论您选择哪种训练选项,您都可以使用 Vertex AI 保存模型、部署模型和请求预测。
创建数据集¶
要创建 Google VertexAI 数据集,您可以使用 CreateDatasetOperator
。该操作符在 XCom 中以 dataset_id
键返回数据集 ID。
create_image_dataset_job = CreateDatasetOperator(
task_id="image_dataset",
dataset=IMAGE_DATASET,
region=REGION,
project_id=PROJECT_ID,
)
create_tabular_dataset_job = CreateDatasetOperator(
task_id="tabular_dataset",
dataset=TABULAR_DATASET,
region=REGION,
project_id=PROJECT_ID,
)
create_text_dataset_job = CreateDatasetOperator(
task_id="text_dataset",
dataset=TEXT_DATASET,
region=REGION,
project_id=PROJECT_ID,
)
create_video_dataset_job = CreateDatasetOperator(
task_id="video_dataset",
dataset=VIDEO_DATASET,
region=REGION,
project_id=PROJECT_ID,
)
create_time_series_dataset_job = CreateDatasetOperator(
task_id="time_series_dataset",
dataset=TIME_SERIES_DATASET,
region=REGION,
project_id=PROJECT_ID,
)
创建数据集后,您可以使用 ImportDataOperator
导入一些数据。
import_data_job = ImportDataOperator(
task_id="import_data",
dataset_id=create_image_dataset_job.output["dataset_id"],
region=REGION,
project_id=PROJECT_ID,
import_configs=TEST_IMPORT_CONFIG,
)
要导出数据集,您可以使用 ExportDataOperator
。
export_data_job = ExportDataOperator(
task_id="export_data",
dataset_id=create_image_dataset_job.output["dataset_id"],
region=REGION,
project_id=PROJECT_ID,
export_config=TEST_EXPORT_CONFIG,
)
要删除数据集,您可以使用 DeleteDatasetOperator
。
delete_dataset_job = DeleteDatasetOperator(
task_id="delete_dataset",
dataset_id=create_text_dataset_job.output["dataset_id"],
region=REGION,
project_id=PROJECT_ID,
)
要获取数据集,您可以使用 GetDatasetOperator
。
get_dataset = GetDatasetOperator(
task_id="get_dataset",
project_id=PROJECT_ID,
region=REGION,
dataset_id=create_tabular_dataset_job.output["dataset_id"],
)
要获取数据集列表,您可以使用 ListDatasetsOperator
。
list_dataset_job = ListDatasetsOperator(
task_id="list_dataset",
region=REGION,
project_id=PROJECT_ID,
)
要更新数据集,您可以使用 UpdateDatasetOperator
。
update_dataset_job = UpdateDatasetOperator(
task_id="update_dataset",
project_id=PROJECT_ID,
region=REGION,
dataset_id=create_video_dataset_job.output["dataset_id"],
dataset=DATASET_TO_UPDATE,
update_mask=TEST_UPDATE_MASK,
)
创建训练作业¶
要创建 Google Vertex AI 训练作业,您可以使用三个操作符:CreateCustomContainerTrainingJobOperator
、CreateCustomPythonPackageTrainingJobOperator
和 CreateCustomTrainingJobOperator
。它们都将等待操作完成。每个操作符的结果将是用户使用这些操作符训练的模型。
准备步骤
对于每个操作符,您必须准备并创建数据集。然后将数据集 ID 放入操作符的 dataset_id
参数中。
如何运行自定义容器训练作业 CreateCustomContainerTrainingJobOperator
在开始运行此作业之前,您应该创建一个包含训练脚本的 Docker 镜像。有关如何创建镜像的文档,您可以通过以下链接找到:https://cloud.google.com/vertex-ai/docs/training/create-custom-container。之后,您应该将镜像的链接放入 container_uri
参数中。您还可以在 command
参数中输入将从此镜像创建的容器的执行命令。
create_custom_container_training_job = CreateCustomContainerTrainingJobOperator(
task_id="custom_container_task",
staging_bucket=f"gs://{CUSTOM_CONTAINER_GCS_BUCKET_NAME}",
display_name=CONTAINER_DISPLAY_NAME,
container_uri=CUSTOM_CONTAINER_URI,
model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
# run params
dataset_id=tabular_dataset_id,
command=["python3", "task.py"],
model_display_name=MODEL_DISPLAY_NAME,
replica_count=REPLICA_COUNT,
machine_type=MACHINE_TYPE,
accelerator_type=ACCELERATOR_TYPE,
accelerator_count=ACCELERATOR_COUNT,
training_fraction_split=TRAINING_FRACTION_SPLIT,
validation_fraction_split=VALIDATION_FRACTION_SPLIT,
test_fraction_split=TEST_FRACTION_SPLIT,
region=REGION,
project_id=PROJECT_ID,
)
CreateCustomContainerTrainingJobOperator
还提供了可延期模式
create_custom_container_training_job_deferrable = CreateCustomContainerTrainingJobOperator(
task_id="custom_container_task_deferrable",
staging_bucket=f"gs://{CUSTOM_CONTAINER_GCS_BUCKET_NAME}",
display_name=f"{CONTAINER_DISPLAY_NAME}-def",
container_uri=CUSTOM_CONTAINER_URI,
model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
# run params
dataset_id=tabular_dataset_id,
command=["python3", "task.py"],
model_display_name=f"{MODEL_DISPLAY_NAME}-def",
replica_count=REPLICA_COUNT,
machine_type=MACHINE_TYPE,
accelerator_type=ACCELERATOR_TYPE,
accelerator_count=ACCELERATOR_COUNT,
training_fraction_split=TRAINING_FRACTION_SPLIT,
validation_fraction_split=VALIDATION_FRACTION_SPLIT,
test_fraction_split=TEST_FRACTION_SPLIT,
region=REGION,
project_id=PROJECT_ID,
deferrable=True,
)
如何运行 Python 包训练作业 CreateCustomPythonPackageTrainingJobOperator
在开始运行此作业之前,您应该创建一个包含训练脚本的 Python 包。有关如何创建的文档,您可以通过以下链接找到:https://cloud.google.com/vertex-ai/docs/training/create-python-pre-built-container。接下来,您应该将包的链接放入 python_package_gcs_uri
参数中,并且 python_module_name
参数应包含将运行您的训练任务的脚本的名称。
create_custom_python_package_training_job = CreateCustomPythonPackageTrainingJobOperator(
task_id="python_package_task",
staging_bucket=f"gs://{CUSTOM_PYTHON_GCS_BUCKET_NAME}",
display_name=PACKAGE_DISPLAY_NAME,
python_package_gcs_uri=PYTHON_PACKAGE_GCS_URI,
python_module_name=PYTHON_MODULE_NAME,
container_uri=CONTAINER_URI,
model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
# run params
dataset_id=tabular_dataset_id,
model_display_name=MODEL_DISPLAY_NAME,
replica_count=REPLICA_COUNT,
machine_type=MACHINE_TYPE,
accelerator_type=ACCELERATOR_TYPE,
accelerator_count=ACCELERATOR_COUNT,
training_fraction_split=TRAINING_FRACTION_SPLIT,
validation_fraction_split=VALIDATION_FRACTION_SPLIT,
test_fraction_split=TEST_FRACTION_SPLIT,
region=REGION,
project_id=PROJECT_ID,
)
CreateCustomPythonPackageTrainingJobOperator
还提供了可延期模式
create_custom_python_package_training_job_deferrable = CreateCustomPythonPackageTrainingJobOperator(
task_id="python_package_task_deferrable",
staging_bucket=f"gs://{CUSTOM_PYTHON_GCS_BUCKET_NAME}",
display_name=f"{PACKAGE_DISPLAY_NAME}-def",
python_package_gcs_uri=PYTHON_PACKAGE_GCS_URI,
python_module_name=PYTHON_MODULE_NAME,
container_uri=CONTAINER_URI,
model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
# run params
dataset_id=tabular_dataset_id,
model_display_name=f"{MODEL_DISPLAY_NAME}-def",
replica_count=REPLICA_COUNT,
machine_type=MACHINE_TYPE,
accelerator_type=ACCELERATOR_TYPE,
accelerator_count=ACCELERATOR_COUNT,
training_fraction_split=TRAINING_FRACTION_SPLIT,
validation_fraction_split=VALIDATION_FRACTION_SPLIT,
test_fraction_split=TEST_FRACTION_SPLIT,
region=REGION,
project_id=PROJECT_ID,
deferrable=True,
)
如何运行自定义训练作业 CreateCustomTrainingJobOperator
。
要创建并运行自定义训练作业,您应该将本地训练脚本的路径放入 script_path
参数中。
create_custom_training_job = CreateCustomTrainingJobOperator(
task_id="custom_task",
staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
display_name=CUSTOM_DISPLAY_NAME,
script_path=LOCAL_TRAINING_SCRIPT_PATH,
container_uri=CONTAINER_URI,
requirements=["gcsfs==0.7.1"],
model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
# run params
dataset_id=tabular_dataset_id,
replica_count=REPLICA_COUNT,
model_display_name=MODEL_DISPLAY_NAME,
region=REGION,
project_id=PROJECT_ID,
)
model_id_v1 = create_custom_training_job.output["model_id"]
相同的操作也可以在可延期模式下执行
create_custom_training_job_deferrable = CreateCustomTrainingJobOperator(
task_id="custom_task_deferrable",
staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
display_name=f"{CUSTOM_DISPLAY_NAME}-def",
script_path=LOCAL_TRAINING_SCRIPT_PATH,
container_uri=CONTAINER_URI,
requirements=["gcsfs==0.7.1"],
model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
# run params
dataset_id=tabular_dataset_id,
replica_count=REPLICA_COUNT,
model_display_name=f"{MODEL_DISPLAY_NAME}-def",
region=REGION,
project_id=PROJECT_ID,
deferrable=True,
)
model_id_deferrable_v1 = create_custom_training_job_deferrable.output["model_id"]
此外,您可以创建现有自定义训练作业的新版本。它将用另一个版本替换现有模型,而不是在模型注册表中创建新模型。这可以通过在运行自定义训练作业时指定 parent_model
参数来完成。
create_custom_training_job_v2 = CreateCustomTrainingJobOperator(
task_id="custom_task_v2",
staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
display_name=CUSTOM_DISPLAY_NAME,
script_path=LOCAL_TRAINING_SCRIPT_PATH,
container_uri=CONTAINER_URI,
requirements=["gcsfs==0.7.1"],
model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
parent_model=model_id_v1,
# run params
dataset_id=tabular_dataset_id,
replica_count=REPLICA_COUNT,
model_display_name=MODEL_DISPLAY_NAME,
region=REGION,
project_id=PROJECT_ID,
)
相同的操作也可以在可延期模式下执行
create_custom_training_job_deferrable_v2 = CreateCustomTrainingJobOperator(
task_id="custom_task_deferrable_v2",
staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
display_name=f"{CUSTOM_DISPLAY_NAME}-def",
script_path=LOCAL_TRAINING_SCRIPT_PATH,
container_uri=CONTAINER_URI,
requirements=["gcsfs==0.7.1"],
model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
parent_model=model_id_deferrable_v1,
# run params
dataset_id=tabular_dataset_id,
replica_count=REPLICA_COUNT,
model_display_name=f"{MODEL_DISPLAY_NAME}-def",
region=REGION,
project_id=PROJECT_ID,
deferrable=True,
)
您可以使用 ListCustomTrainingJobOperator
获取训练作业列表。
list_custom_training_job = ListCustomTrainingJobOperator(
task_id="list_custom_training_job",
region=REGION,
project_id=PROJECT_ID,
)
如果您希望删除自定义训练作业,可以使用 DeleteCustomTrainingJobOperator
。
delete_custom_training_job = DeleteCustomTrainingJobOperator(
task_id="delete_custom_training_job",
training_pipeline_id="{{ task_instance.xcom_pull(task_ids='custom_task', key='training_id') }}",
custom_job_id="{{ task_instance.xcom_pull(task_ids='custom_task', key='custom_job_id') }}",
region=REGION,
project_id=PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
创建 AutoML 训练作业¶
要创建 Google Vertex AI Auto ML 训练作业,您有五个操作符:CreateAutoMLForecastingTrainingJobOperator
CreateAutoMLImageTrainingJobOperator
CreateAutoMLTabularTrainingJobOperator
SupervisedFineTuningTrainOperator
CreateAutoMLVideoTrainingJobOperator
。它们每个都会等待操作完成。每个操作符的结果将是由用户使用这些操作符训练的模型。
如何运行 AutoML 预测训练作业 CreateAutoMLForecastingTrainingJobOperator
在开始运行此作业之前,您必须准备并创建 TimeSeries
数据集。之后,您应该将数据集 ID 放入操作符中的 dataset_id
参数。
create_auto_ml_forecasting_training_job = CreateAutoMLForecastingTrainingJobOperator(
task_id="auto_ml_forecasting_task",
display_name=FORECASTING_DISPLAY_NAME,
optimization_objective="minimize-rmse",
column_specs=COLUMN_SPECS,
# run params
dataset_id=forecast_dataset_id,
target_column=TEST_TARGET_COLUMN,
time_column=TEST_TIME_COLUMN,
time_series_identifier_column=TEST_TIME_SERIES_IDENTIFIER_COLUMN,
available_at_forecast_columns=[TEST_TIME_COLUMN],
unavailable_at_forecast_columns=[TEST_TARGET_COLUMN],
time_series_attribute_columns=["city", "zip_code", "county"],
forecast_horizon=30,
context_window=30,
data_granularity_unit="day",
data_granularity_count=1,
weight_column=None,
budget_milli_node_hours=1000,
model_display_name=MODEL_DISPLAY_NAME,
predefined_split_column_name=None,
region=REGION,
project_id=PROJECT_ID,
)
如何运行 AutoML 图像训练作业 CreateAutoMLImageTrainingJobOperator
在开始运行此作业之前,您必须准备并创建 Image
数据集。之后,您应该将数据集 ID 放入操作符中的 dataset_id
参数。
create_auto_ml_image_training_job = CreateAutoMLImageTrainingJobOperator(
task_id="auto_ml_image_task",
display_name=IMAGE_DISPLAY_NAME,
dataset_id=image_dataset_id,
prediction_type="classification",
multi_label=False,
model_type="CLOUD",
training_fraction_split=0.6,
validation_fraction_split=0.2,
test_fraction_split=0.2,
budget_milli_node_hours=8000,
model_display_name=MODEL_DISPLAY_NAME,
disable_early_stopping=False,
region=REGION,
project_id=PROJECT_ID,
)
如何运行 AutoML 表格训练作业 CreateAutoMLTabularTrainingJobOperator
在开始运行此作业之前,您必须准备并创建 Tabular
数据集。之后,您应该将数据集 ID 放入操作符中的 dataset_id
参数。
create_auto_ml_tabular_training_job = CreateAutoMLTabularTrainingJobOperator(
task_id="auto_ml_tabular_task",
display_name=TABULAR_DISPLAY_NAME,
optimization_prediction_type="classification",
column_transformations=COLUMN_TRANSFORMATIONS,
dataset_id=tabular_dataset_id,
target_column="Adopted",
training_fraction_split=0.8,
validation_fraction_split=0.1,
test_fraction_split=0.1,
model_display_name=MODEL_DISPLAY_NAME,
disable_early_stopping=False,
region=REGION,
project_id=PROJECT_ID,
)
如何运行 AutoML 视频训练作业 CreateAutoMLVideoTrainingJobOperator
在开始运行此作业之前,您必须准备并创建 Video
数据集。之后,您应该将数据集 ID 放入操作符中的 dataset_id
参数。
create_auto_ml_video_training_job = CreateAutoMLVideoTrainingJobOperator(
task_id="auto_ml_video_task",
display_name=VIDEO_DISPLAY_NAME,
prediction_type="classification",
model_type="CLOUD",
dataset_id=video_dataset_id,
model_display_name=MODEL_DISPLAY_NAME,
region=REGION,
project_id=PROJECT_ID,
)
model_id_v1 = create_auto_ml_video_training_job.output["model_id"]
此外,您可以创建现有 AutoML 视频训练作业的新版本。在这种情况下,结果将是现有模型的新版本,而不是在模型注册表中创建的新模型。这可以通过在运行 AutoML 视频训练作业时指定 parent_model
参数来完成。
create_auto_ml_video_training_job_v2 = CreateAutoMLVideoTrainingJobOperator(
task_id="auto_ml_video_v2_task",
display_name=VIDEO_DISPLAY_NAME,
prediction_type="classification",
model_type="CLOUD",
dataset_id=video_dataset_id,
model_display_name=MODEL_DISPLAY_NAME,
parent_model=model_id_v1,
region=REGION,
project_id=PROJECT_ID,
)
您可以使用 ListAutoMLTrainingJobOperator
获取 AutoML 训练作业的列表。
list_auto_ml_training_job = ListAutoMLTrainingJobOperator(
task_id="list_auto_ml_training_job",
region=REGION,
project_id=PROJECT_ID,
)
如果您希望删除 Auto ML 训练作业,可以使用 DeleteAutoMLTrainingJobOperator
。
delete_auto_ml_forecasting_training_job = DeleteAutoMLTrainingJobOperator(
task_id="delete_auto_ml_forecasting_training_job",
training_pipeline_id="{{ task_instance.xcom_pull(task_ids='auto_ml_forecasting_task', "
"key='training_id') }}",
region=REGION,
project_id=PROJECT_ID,
)
创建批量预测作业¶
要创建 Google VertexAI 批量预测作业,您可以使用 CreateBatchPredictionJobOperator
。该操作符在 XCom 中的 batch_prediction_job_id
键下返回批量预测作业 ID。
create_batch_prediction_job = CreateBatchPredictionJobOperator(
task_id="create_batch_prediction_job",
job_display_name=JOB_DISPLAY_NAME,
model_name="{{ti.xcom_pull('auto_ml_forecasting_task')['name']}}",
predictions_format="csv",
bigquery_source=BIGQUERY_SOURCE,
gcs_destination_prefix=GCS_DESTINATION_PREFIX,
model_parameters=MODEL_PARAMETERS,
region=REGION,
project_id=PROJECT_ID,
)
CreateBatchPredictionJobOperator
还提供可延迟模式
create_batch_prediction_job_def = CreateBatchPredictionJobOperator(
task_id="create_batch_prediction_job_def",
job_display_name=JOB_DISPLAY_NAME,
model_name="{{ti.xcom_pull('auto_ml_forecasting_task')['name']}}",
predictions_format="csv",
bigquery_source=BIGQUERY_SOURCE,
gcs_destination_prefix=GCS_DESTINATION_PREFIX,
model_parameters=MODEL_PARAMETERS,
region=REGION,
project_id=PROJECT_ID,
deferrable=True,
)
要删除批量预测作业,您可以使用 DeleteBatchPredictionJobOperator
。
delete_batch_prediction_job = DeleteBatchPredictionJobOperator(
task_id="delete_batch_prediction_job",
batch_prediction_job_id=create_batch_prediction_job.output["batch_prediction_job_id"],
region=REGION,
project_id=PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
要获取批量预测作业列表,您可以使用 ListBatchPredictionJobsOperator
。
list_batch_prediction_job = ListBatchPredictionJobsOperator(
task_id="list_batch_prediction_jobs",
region=REGION,
project_id=PROJECT_ID,
)
创建端点服务¶
要创建 Google VertexAI 端点,您可以使用 CreateEndpointOperator
。该操作符在 XCom 中的 endpoint_id
键下返回端点 ID。
create_endpoint = CreateEndpointOperator(
task_id="create_endpoint",
endpoint=ENDPOINT_CONF,
region=REGION,
project_id=PROJECT_ID,
)
创建端点后,您可以使用它来部署一些模型,使用 DeployModelOperator
。
deploy_model = DeployModelOperator(
task_id="deploy_model",
endpoint_id=create_endpoint.output["endpoint_id"],
deployed_model=DEPLOYED_MODEL,
traffic_split={"0": 100},
region=REGION,
project_id=PROJECT_ID,
)
要取消部署模型,您可以使用 UndeployModelOperator
。
undeploy_model = UndeployModelOperator(
task_id="undeploy_model",
endpoint_id=create_endpoint.output["endpoint_id"],
deployed_model_id=deploy_model.output["deployed_model_id"],
region=REGION,
project_id=PROJECT_ID,
)
要删除端点,您可以使用 DeleteEndpointOperator
。
delete_endpoint = DeleteEndpointOperator(
task_id="delete_endpoint",
endpoint_id=create_endpoint.output["endpoint_id"],
region=REGION,
project_id=PROJECT_ID,
)
要获取端点列表,您可以使用 ListEndpointsOperator
。
list_endpoints = ListEndpointsOperator(
task_id="list_endpoints",
region=REGION,
project_id=PROJECT_ID,
)
创建超参数调整作业¶
要创建 Google VertexAI 超参数调整作业,您可以使用 CreateHyperparameterTuningJobOperator
。该操作符在 XCom 中的 hyperparameter_tuning_job_id
键下返回超参数调整作业 ID。
create_hyperparameter_tuning_job = CreateHyperparameterTuningJobOperator(
task_id="create_hyperparameter_tuning_job",
staging_bucket=STAGING_BUCKET,
display_name=DISPLAY_NAME,
worker_pool_specs=WORKER_POOL_SPECS,
region=REGION,
project_id=PROJECT_ID,
parameter_spec=PARAM_SPECS,
metric_spec=METRIC_SPEC,
max_trial_count=15,
parallel_trial_count=3,
)
CreateHyperparameterTuningJobOperator
还支持可延迟模式
create_hyperparameter_tuning_job_def = CreateHyperparameterTuningJobOperator(
task_id="create_hyperparameter_tuning_job_def",
staging_bucket=STAGING_BUCKET,
display_name=DISPLAY_NAME,
worker_pool_specs=WORKER_POOL_SPECS,
region=REGION,
project_id=PROJECT_ID,
parameter_spec=PARAM_SPECS,
metric_spec=METRIC_SPEC,
max_trial_count=15,
parallel_trial_count=3,
deferrable=True,
)
要删除超参数调整作业,您可以使用 DeleteHyperparameterTuningJobOperator
。
delete_hyperparameter_tuning_job = DeleteHyperparameterTuningJobOperator(
task_id="delete_hyperparameter_tuning_job",
project_id=PROJECT_ID,
region=REGION,
hyperparameter_tuning_job_id="{{ task_instance.xcom_pull("
"task_ids='create_hyperparameter_tuning_job', key='hyperparameter_tuning_job_id') }}",
trigger_rule=TriggerRule.ALL_DONE,
)
要获取超参数调整作业,您可以使用 GetHyperparameterTuningJobOperator
。
get_hyperparameter_tuning_job = GetHyperparameterTuningJobOperator(
task_id="get_hyperparameter_tuning_job",
project_id=PROJECT_ID,
region=REGION,
hyperparameter_tuning_job_id="{{ task_instance.xcom_pull("
"task_ids='create_hyperparameter_tuning_job', key='hyperparameter_tuning_job_id') }}",
)
要获取超参数调整作业列表,您可以使用 ListHyperparameterTuningJobOperator
。
list_hyperparameter_tuning_job = ListHyperparameterTuningJobOperator(
task_id="list_hyperparameter_tuning_job",
region=REGION,
project_id=PROJECT_ID,
)
创建模型服务¶
要上传 Google VertexAI 模型,您可以使用 UploadModelOperator
。该操作符在 XCom 中,使用 model_id
键返回模型 ID。
upload_model = UploadModelOperator(
task_id="upload_model",
region=REGION,
project_id=PROJECT_ID,
model=MODEL_OBJ,
)
upload_model_v1 = upload_model.output["model_id"]
要导出模型,您可以使用 ExportModelOperator
。
export_model = ExportModelOperator(
task_id="export_model",
project_id=PROJECT_ID,
region=REGION,
model_id=upload_model.output["model_id"],
output_config=MODEL_OUTPUT_CONFIG,
)
要删除模型,您可以使用 DeleteModelOperator
。
delete_model = DeleteModelOperator(
task_id="delete_model",
project_id=PROJECT_ID,
region=REGION,
model_id=upload_model.output["model_id"],
trigger_rule=TriggerRule.ALL_DONE,
)
要获取模型列表,您可以使用 ListModelsOperator
。
list_models = ListModelsOperator(
task_id="list_models",
region=REGION,
project_id=PROJECT_ID,
)
要通过 ID 检索模型,您可以使用 GetModelOperator
。
get_model = GetModelOperator(
task_id="get_model", region=REGION, project_id=PROJECT_ID, model_id=model_id_v1
)
要列出所有模型版本,您可以使用 ListModelVersionsOperator
。
list_model_versions = ListModelVersionsOperator(
task_id="list_model_versions", region=REGION, project_id=PROJECT_ID, model_id=model_id_v1
)
要将模型的特定版本设置为默认版本,您可以使用 SetDefaultVersionOnModelOperator
。
set_default_version = SetDefaultVersionOnModelOperator(
task_id="set_default_version",
project_id=PROJECT_ID,
region=REGION,
model_id=model_id_v2,
)
要向模型的特定版本添加别名,您可以使用 AddVersionAliasesOnModelOperator
。
add_version_alias = AddVersionAliasesOnModelOperator(
task_id="add_version_alias",
project_id=PROJECT_ID,
region=REGION,
version_aliases=["new-version", "beta"],
model_id=model_id_v2,
)
要从模型的特定版本删除别名,您可以使用 DeleteVersionAliasesOnModelOperator
。
delete_version_alias = DeleteVersionAliasesOnModelOperator(
task_id="delete_version_alias",
project_id=PROJECT_ID,
region=REGION,
version_aliases=["new-version"],
model_id=model_id_v2,
)
要删除模型的特定版本,您可以使用 DeleteModelVersionOperator
。
delete_model_version = DeleteModelVersionOperator(
task_id="delete_model_version",
project_id=PROJECT_ID,
region=REGION,
model_id=model_id_v1,
trigger_rule=TriggerRule.ALL_DONE,
)
运行管道作业¶
要运行 Google VertexAI 管道作业,您可以使用 RunPipelineJobOperator
。该操作符在 XCom 中,使用 pipeline_job_id
键返回管道作业 ID。
run_pipeline_job = RunPipelineJobOperator(
task_id="run_pipeline_job",
display_name=DISPLAY_NAME,
template_path=TEMPLATE_PATH,
parameter_values=PARAMETER_VALUES,
region=REGION,
project_id=PROJECT_ID,
)
要删除管道作业,您可以使用 DeletePipelineJobOperator
。
delete_pipeline_job = DeletePipelineJobOperator(
task_id="delete_pipeline_job",
project_id=PROJECT_ID,
region=REGION,
pipeline_job_id="{{ task_instance.xcom_pull("
"task_ids='run_pipeline_job', key='pipeline_job_id') }}",
trigger_rule=TriggerRule.ALL_DONE,
)
要获取管道作业,您可以使用 GetPipelineJobOperator
。
get_pipeline_job = GetPipelineJobOperator(
task_id="get_pipeline_job",
project_id=PROJECT_ID,
region=REGION,
pipeline_job_id="{{ task_instance.xcom_pull("
"task_ids='run_pipeline_job', key='pipeline_job_id') }}",
)
要获取管道作业列表,您可以使用 ListPipelineJobOperator
。
list_pipeline_job = ListPipelineJobOperator(
task_id="list_pipeline_job",
region=REGION,
project_id=PROJECT_ID,
)
与生成式 AI 交互¶
要生成文本嵌入,您可以使用 TextEmbeddingModelGetEmbeddingsOperator
。该操作符在 XCom 中,使用 model_response
键返回模型响应。
generate_embeddings_task = TextEmbeddingModelGetEmbeddingsOperator(
task_id="generate_embeddings_task",
project_id=PROJECT_ID,
location=REGION,
prompt=PROMPT,
pretrained_model=TEXT_EMBEDDING_MODEL,
)
要使用生成式模型生成内容,您可以使用 GenerativeModelGenerateContentOperator
。该操作符在 XCom 中,使用 model_response
键返回模型响应。
generate_content_task = GenerativeModelGenerateContentOperator(
task_id="generate_content_task",
project_id=PROJECT_ID,
contents=CONTENTS,
tools=TOOLS,
location=REGION,
generation_config=GENERATION_CONFIG,
safety_settings=SAFETY_SETTINGS,
pretrained_model=MULTIMODAL_MODEL,
)
要运行监督式微调作业,您可以使用 SupervisedFineTuningTrainOperator
。该操作符在 XCom 中,使用 tuned_model_endpoint_name
键返回微调后的模型端点名称。
sft_train_task = SupervisedFineTuningTrainOperator(
task_id="sft_train_task",
project_id=PROJECT_ID,
location=REGION,
source_model=SOURCE_MODEL,
train_dataset=TRAIN_DATASET,
tuned_model_display_name=TUNED_MODEL_DISPLAY_NAME,
)
要在向 Gemini API 发送请求之前计算输入令牌的数量,您可以使用:CountTokensOperator
。该操作符在 XCom 中,使用 total_tokens
键返回总令牌数。
count_tokens_task = CountTokensOperator(
task_id="count_tokens_task",
project_id=PROJECT_ID,
contents=CONTENTS,
location=REGION,
pretrained_model=MULTIMODAL_MODEL,
)
要评估模型,您可以使用 RunEvaluationOperator
。该操作符在 XCom 中,使用 summary_metrics
键返回评估摘要指标。
run_evaluation_task = RunEvaluationOperator(
task_id="run_evaluation_task",
project_id=PROJECT_ID,
location=REGION,
pretrained_model=MULTIMODAL_MODEL,
eval_dataset=EVAL_DATASET,
metrics=METRICS,
experiment_name=EXPERIMENT_NAME,
experiment_run_name=EXPERIMENT_RUN_NAME,
prompt_template=PROMPT_TEMPLATE,
)
要创建缓存内容,您可以使用 CreateCachedContentOperator
。该操作符在 XCom 中,使用 return_value
键返回缓存内容资源名称。
create_cached_content_task = CreateCachedContentOperator(
task_id="create_cached_content_task",
project_id=PROJECT_ID,
location=REGION,
model_name=CACHED_MODEL,
system_instruction=CACHED_SYSTEM_INSTRUCTION,
contents=CACHED_CONTENTS,
ttl_hours=1,
display_name="example-cache",
)
要从缓存内容生成响应,您可以使用 GenerateFromCachedContentOperator
。该操作符在 XCom 中,使用 return_value
键返回缓存内容响应。
generate_from_cached_content_task = GenerateFromCachedContentOperator(
task_id="generate_from_cached_content_task",
project_id=PROJECT_ID,
location=REGION,
cached_content_name="{{ task_instance.xcom_pull(task_ids='create_cached_content_task', key='return_value') }}",
contents=["What are the papers about?"],
generation_config=GENERATION_CONFIG,
safety_settings=SAFETY_SETTINGS,
)
与 Vertex AI Feature Store 交互¶
要获取特征视图同步作业,您可以使用 GetFeatureViewSyncOperator
。该操作符在 XCom 中,使用 return_value
键返回同步作业结果。
get_task = GetFeatureViewSyncOperator(
task_id="get_task",
location=REGION,
feature_view_sync_name="{{ task_instance.xcom_pull(task_ids='sync_task', key='return_value')}}",
)
要同步特征视图,您可以使用 SyncFeatureViewOperator
。该操作符在 XCom 中,使用 return_value
键返回同步作业名称。
sync_task = SyncFeatureViewOperator(
task_id="sync_task",
project_id=PROJECT_ID,
location=REGION,
feature_online_store_id=FEATURE_ONLINE_STORE_ID,
feature_view_id=FEATURE_VIEW_ID,
)
要检查 Feature View 同步是否成功,您可以使用 FeatureViewSyncSensor
。
wait_for_sync = FeatureViewSyncSensor(
task_id="wait_for_sync",
location=REGION,
feature_view_sync_name="{{ task_instance.xcom_pull(task_ids='sync_task', key='return_value')}}",
poke_interval=60, # Check every minute
timeout=600, # Timeout after 10 minutes
mode="reschedule",
)