Microsoft Graph API 操作符¶
先决条件任务¶
要使用这些操作符,您必须执行以下几项操作
使用AZURE PORTAL或AZURE CLI创建必要的资源。
通过 pip 安装 API 库。
pip install 'apache-airflow[azure]'详细信息请参考 Airflow® 的安装
设置连接.
MSGraphAsyncOperator¶
使用MSGraphAsyncOperator
调用 Microsoft Graph API。
以下是使用此操作符获取 Sharepoint 站点的示例。
site_task = MSGraphAsyncOperator(
task_id="news_site",
conn_id="msgraph_api",
url="sites/850v1v.sharepoint.com:/sites/news",
result_processor=lambda context, response: response["id"].split(",")[1], # only keep site_id
)
以下是使用此操作符获取 Sharepoint 站点页面的示例。
site_pages_task = MSGraphAsyncOperator(
task_id="news_pages",
conn_id="msgraph_api",
api_version="beta",
url=("sites/%s/pages" % "{{ ti.xcom_pull(task_ids='news_site') }}"), # noqa: UP031
)
以下是使用此操作符获取 PowerBI 工作区的示例。
workspaces_task = MSGraphAsyncOperator(
task_id="workspaces",
conn_id="powerbi",
url="myorg/admin/workspaces/modified",
result_processor=lambda context, response: list(map(lambda workspace: workspace["id"], response)),
)
以下是使用此操作符获取 PowerBI 工作区信息的示例。
workspaces_info_task = MSGraphAsyncOperator(
task_id="get_workspace_info",
conn_id="powerbi",
url="myorg/admin/workspaces/getInfo",
method="POST",
query_parameters={
"lineage": True,
"datasourceDetails": True,
"datasetSchema": True,
"datasetExpressions": True,
"getArtifactUsers": True,
},
data={"workspaces": workspaces_task.output},
result_processor=lambda context, response: {"scanId": response["id"]},
)
以下是使用此操作符刷新 PowerBI 数据集的示例。
refresh_dataset_task = MSGraphAsyncOperator(
task_id="refresh_dataset",
conn_id="powerbi_api",
url="myorg/groups/{workspaceId}/datasets/{datasetId}/refreshes",
method="POST",
path_parameters={
"workspaceId": "9a7e14c6-9a7d-4b4c-b0f2-799a85e60a51",
"datasetId": "ffb6096e-d409-4826-aaeb-b5d4b165dc4d",
},
data={"type": "full"}, # Needed for enhanced refresh
result_processor=lambda context, response: response["requestid"],
)
refresh_dataset_history_task = MSGraphSensor(
task_id="refresh_dataset_history",
conn_id="powerbi_api",
url="myorg/groups/{workspaceId}/datasets/{datasetId}/refreshes/{refreshId}",
path_parameters={
"workspaceId": "9a7e14c6-9a7d-4b4c-b0f2-799a85e60a51",
"datasetId": "ffb6096e-d409-4826-aaeb-b5d4b165dc4d",
"refreshId": refresh_dataset_task.output,
},
timeout=350.0,
event_processor=lambda context, event: event["status"] == "Completed",
)
以下是使用此操作符在 Fabric 中创建项目计划的示例。
# https://learn.microsoft.com/en-us/rest/api/fabric/core/job-scheduler/create-item-schedule?tabs=HTTP
workspaces_task = MSGraphAsyncOperator(
task_id="schedule_datapipeline",
conn_id="powerbi",
method="POST",
url="workspaces/{workspaceId}/items/{itemId}/jobs/instances",
path_parameters={
"workspaceId": "e90b2873-4812-4dfb-9246-593638165644",
"itemId": "65448530-e5ec-4aeb-a97e-7cebf5d67c18",
},
query_parameters={"jobType": "Pipeline"},
dag=dag,
outlets=[
Asset(
"workspaces/e90b2873-4812-4dfb-9246-593638165644/items/65448530-e5ec-4aeb-a97e-7cebf5d67c18/jobs/instances?jobType=Pipeline"
)
],
)