Qubole¶
Qubole 是一个开放、简单、安全的机器学习、流媒体和即席分析数据湖平台。Qubole 提供了一个基于亚马逊网络服务、微软和谷歌云构建的大数据分析自助服务平台。
Airflow 提供操作员在 QDS 上执行任务(命令)并对 Qubole 命令执行检查。此外,还提供了传感器,用于等待文件、文件夹或分区出现在云存储中,并通过 QDS API 检查其是否存在
执行任务¶
要运行以下命令,请使用 QuboleOperator
。
运行 Hive 命令¶
要运行显示所有表的查询,可以使用
hive_show_table = QuboleOperator(
task_id="hive_show_table",
command_type="hivecmd",
query="show tables",
cluster_label="{{ params.cluster_label }}",
fetch_logs=True,
# If `fetch_logs`=true, will fetch qubole command logs and concatenate
# them into corresponding airflow task logs
tags="airflow_example_run",
# To attach tags to qubole command, auto attach 3 tags - dag_id, task_id, run_id
params={
"cluster_label": "default",
},
)
您还可以通过传递查询文件的路径来运行位于存储桶中的脚本
hive_s3_location = QuboleOperator(
task_id="hive_s3_location",
command_type="hivecmd",
script_location="s3n://public-qubole/qbol-library/scripts/show_table.hql",
notify=True,
tags=["tag1", "tag2"],
# If the script at s3 location has any qubole specific macros to be replaced
# macros='[{"date": "{{ ds }}"}, {"name" : "abc"}]',
)
运行 Hadoop 命令¶
要在 Hadoop 集群中运行 jar 文件,请使用
hadoop_jar_cmd = QuboleOperator(
task_id="hadoop_jar_cmd",
command_type="hadoopcmd",
sub_command="jar s3://paid-qubole/HadoopAPIExamples/"
"jars/hadoop-0.20.1-dev-streaming.jar "
"-mapper wc "
"-numReduceTasks 0 -input s3://paid-qubole/HadoopAPITests/"
"data/3.tsv -output "
"s3://paid-qubole/HadoopAPITests/data/3_wc",
cluster_label="{{ params.cluster_label }}",
fetch_logs=True,
params={
"cluster_label": "default",
},
)
运行 Pig 命令¶
要在 Hadoop 集群中运行 Pig Latin 脚本,请使用
pig_cmd = QuboleOperator(
task_id="pig_cmd",
command_type="pigcmd",
script_location="s3://public-qubole/qbol-library/scripts/script1-hadoop-s3-small.pig",
parameters="key1=value1 key2=value2",
)
运行 Shell 命令¶
要运行 Shell 脚本,请使用
shell_cmd = QuboleOperator(
task_id="shell_cmd",
command_type="shellcmd",
script_location="s3://public-qubole/qbol-library/scripts/shellx.sh",
parameters="param1 param2",
)
运行 Presto 命令¶
要使用 Presto 运行查询,请使用
presto_cmd = QuboleOperator(task_id="presto_cmd", command_type="prestocmd", query="show tables")
运行数据库命令¶
若要以 DbTap 运行查询,请使用
db_query = QuboleOperator(
task_id="db_query", command_type="dbtapquerycmd", query="show tables", db_tap_id=2064
)
若要运行数据库导出命令,请使用
db_export = QuboleOperator(
task_id="db_export",
command_type="dbexportcmd",
mode=1,
hive_table="default_qubole_airline_origin_destination",
db_table="exported_airline_origin_destination",
partition_spec="dt=20110104-02",
dbtap_id=2064,
)
若要运行数据库导入命令,请使用
db_import = QuboleOperator(
task_id="db_import",
command_type="dbimportcmd",
mode=1,
hive_table="default_qubole_airline_origin_destination",
db_table="exported_airline_origin_destination",
where_clause="id < 10",
parallelism=2,
dbtap_id=2064,
)
运行 Spark 命令¶
若要以 Spark 作业运行 Scala 脚本,请使用
prog = """
import scala.math.random
import org.apache.spark._
/** Computes an approximation to pi */
object SparkPi {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Pi")
val spark = new SparkContext(conf)
val slices = if (args.length > 0) args(0).toInt else 2
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
val count = spark.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / n)
spark.stop()
}
}
"""
spark_cmd = QuboleOperator(
task_id="spark_cmd",
command_type="sparkcmd",
program=prog,
language="scala",
arguments="--class SparkPi",
tags="airflow_example_run",
)
文件传感器¶
QuboleFileSensor
的用法示例。
文件或目录存在¶
若要在集群中等待文件或目录存在,请使用
check_s3_file = QuboleFileSensor(
task_id="check_s3_file",
poke_interval=60,
timeout=600,
data={
"files": [
"s3://paid-qubole/HadoopAPIExamples/jars/hadoop-0.20.1-dev-streaming.jar",
"s3://paid-qubole/HadoopAPITests/data/{{ ds.split('-')[2] }}.tsv",
] # will check for availability of all the files in array
},
)
分区传感器¶
QubolePartitionSensor
的用法示例。
分区存在¶
若要在集群中等待表分区存在,请使用
check_hive_partition = QubolePartitionSensor(
task_id="check_hive_partition",
poke_interval=10,
timeout=60,
data={
"schema": "default",
"table": "my_partitioned_table",
"columns": [
{"column": "month", "values": ["{{ ds.split('-')[1] }}"]},
{"column": "day", "values": ["{{ ds.split('-')[2] }}", "{{ yesterday_ds.split('-')[2] }}"]},
], # will check for partitions like [month=12/day=12,month=12/day=13]
},
)