Google Cloud Stackdriver 运算符

先决任务

要使用这些运算符,您必须执行以下几项操作

StackdriverListAlertPoliciesOperator

使用StackdriverListAlertPoliciesOperator获取由给定筛选器标识的所有警报策略。

使用运算符

您可以使用或不使用项目 ID 来获取所有警报策略。如果缺少项目 ID,它将从所使用的 Google Cloud 连接中检索。

tests/system/providers/google/cloud/stackdriver/example_stackdriver.py[源代码]

list_alert_policies = StackdriverListAlertPoliciesOperator(
    task_id="list-alert-policies",
)

StackdriverEnableAlertPoliciesOperator

使用 StackdriverEnableAlertPoliciesOperator 启用由给定筛选器标识的警报策略。

使用运算符

您可以使用或不使用项目 ID 来获取所有警报策略。如果缺少项目 ID,它将从所使用的 Google Cloud 连接中检索。

tests/system/providers/google/cloud/stackdriver/example_stackdriver.py[源代码]

enable_alert_policy = StackdriverEnableAlertPoliciesOperator(
    task_id="enable-alert-policies",
    filter_=f'(displayName="{ALERT_1_NAME}" OR displayName="{ALERT_2_NAME}")',
)

StackdriverDisableAlertPoliciesOperator

使用 StackdriverDisableAlertPoliciesOperator 禁用由给定筛选器标识的警报策略。

使用运算符

您可以使用或不使用项目 ID 来获取所有警报策略。如果缺少项目 ID,它将从所使用的 Google Cloud 连接中检索。

tests/system/providers/google/cloud/stackdriver/example_stackdriver.py[源代码]

disable_alert_policy = StackdriverDisableAlertPoliciesOperator(
    task_id="disable-alert-policies",
    filter_=f'displayName="{ALERT_1_NAME}"',
)

StackdriverUpsertAlertOperator

使用 StackdriverUpsertAlertOperator 更新或插入由给定筛选器 JSON 字符串标识的警报策略。如果具有给定名称的警报已存在,则运算符更新现有策略,否则创建一个新策略。

使用运算符

您可以使用或不使用项目 ID 来获取所有警报策略。如果缺少项目 ID,它将从所使用的 Google Cloud 连接中检索。

tests/system/providers/google/cloud/stackdriver/example_stackdriver.py[源代码]

create_alert_policy = StackdriverUpsertAlertOperator(
    task_id="create-alert-policies",
    alerts=json.dumps({"policies": [TEST_ALERT_POLICY_1, TEST_ALERT_POLICY_2]}),
)

StackdriverDeleteAlertOperator

使用 StackdriverDeleteAlertOperator 删除由给定名称标识的警报策略。

使用运算符

要删除的警报的名称应采用以下格式给出:projects/<PROJECT_NAME>/alertPolicies/<ALERT_NAME>

tests/system/providers/google/cloud/stackdriver/example_stackdriver.py[源代码]

delete_alert_policy = StackdriverDeleteAlertOperator(
    task_id="delete-alert-policy",
    name="{{ task_instance.xcom_pull('list-alert-policies')[0]['name'] }}",
)

StackdriverListNotificationChannelsOperator

使用 StackdriverListNotificationChannelsOperator 提取由给定过滤器标识的所有通知渠道。

使用操作符

您可以使用或不使用项目 ID 来获取所有警报策略。如果缺少项目 ID,它将从所使用的 Google Cloud 连接中检索。

tests/system/providers/google/cloud/stackdriver/example_stackdriver.py[源代码]

list_notification_channel = StackdriverListNotificationChannelsOperator(
    task_id="list-notification-channel", filter_='type="pubsub"'
)

StackdriverEnableNotificationChannelsOperator

使用 StackdriverEnableNotificationChannelsOperator 启用由给定过滤器标识的通知渠道。

使用操作符

您可以使用或不使用项目 ID 来获取所有警报策略。如果缺少项目 ID,它将从所使用的 Google Cloud 连接中检索。

tests/system/providers/google/cloud/stackdriver/example_stackdriver.py[源代码]

enable_notification_channel = StackdriverEnableNotificationChannelsOperator(
    task_id="enable-notification-channel", filter_='type="pubsub"'
)

StackdriverDisableNotificationChannelsOperator

使用 StackdriverDisableNotificationChannelsOperator 禁用由给定过滤器标识的通知渠道。

使用操作符

您可以使用或不使用项目 ID 来获取所有警报策略。如果缺少项目 ID,它将从所使用的 Google Cloud 连接中检索。

tests/system/providers/google/cloud/stackdriver/example_stackdriver.py[源代码]

disable_notification_channel = StackdriverDisableNotificationChannelsOperator(
    task_id="disable-notification-channel", filter_=f'displayName="{CHANNEL_1_NAME}"'
)

StackdriverUpsertNotificationChannelOperator

使用 StackdriverUpsertNotificationChannelOperator upsert 由给定的 channel JSON 字符串标识的通知渠道。如果具有给定名称的渠道已存在,则操作符更新现有渠道,否则创建一个新渠道。

使用操作符

您可以使用或不使用项目 ID 来获取所有警报策略。如果缺少项目 ID,它将从所使用的 Google Cloud 连接中检索。

tests/system/providers/google/cloud/stackdriver/example_stackdriver.py[源代码]

disable_notification_channel = StackdriverDisableNotificationChannelsOperator(
    task_id="disable-notification-channel", filter_=f'displayName="{CHANNEL_1_NAME}"'
)

StackdriverDeleteNotificationChannelOperator

要删除的警报的名称应采用以下格式给出:projects/<PROJECT_NAME>/notificationChannels/<CHANNEL_NAME>

使用操作符

您可以使用或不使用项目 ID 来获取所有警报策略。如果缺少项目 ID,它将从所使用的 Google Cloud 连接中检索。

tests/system/providers/google/cloud/stackdriver/example_stackdriver.py[源代码]

delete_notification_channel = StackdriverDeleteNotificationChannelOperator(
    task_id="delete-notification-channel",
    name="{{ task_instance.xcom_pull('list-notification-channel')[0]['name'] }}",
)

此条目是否有用?