成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

使用 AirFlow 調度 MaxCompute

開發 后端
airflow是Airbnb開源的一個用python編寫的調度工具,基于有向無環圖(DAG),airflow可以定義一組有依賴的任務,按照依賴依次執行,通過python代碼定義子任務,并支持各種Operate操作器,靈活性大,能滿足用戶的各種需求。本文主要介紹使用Airflow的python Operator調度MaxCompute 任務

背景

airflow是Airbnb開源的一個用python編寫的調度工具,基于有向無環圖(DAG),airflow可以定義一組有依賴的任務,按照依賴依次執行,通過python代碼定義子任務,并支持各種Operate操作器,靈活性大,能滿足用戶的各種需求。本文主要介紹使用Airflow的python Operator調度MaxCompute 任務

一、環境準備

Python 2.7.5 PyODPS支持Python2.6以上版本
Airflow apache-airflow-1.10.7

1.安裝MaxCompute需要的包

pip install setuptools>=3.0

pip install requests>=2.4.0

pip install greenlet>=0.4.10 # 可選,安裝后能加速Tunnel上傳。

pip install cython>=0.19.0 # 可選,不建議Windows用戶安裝。

pip install pyodps

注意:如果requests包沖突,先卸載再安裝對應的版本

2.執行如下命令檢查安裝是否成功

python -c "from odps import ODPS"

二、開發步驟

1.在Airflow家目錄編寫python調度腳本Airiflow_MC.py

 

  1. # -*- coding: UTF-8 -*- 
  2.  
  3. import sys 
  4.  
  5. import os 
  6.  
  7. from odps import ODPS 
  8.  
  9. from odps import options 
  10.  
  11. from airflow import DAG 
  12.  
  13. from airflow.operators.python_operator import PythonOperator 
  14.  
  15. from datetime import datetime, timedelta 
  16.  
  17. from configparser import ConfigParser 
  18.  
  19. import time 
  20.  
  21. reload(sys) 
  22.  
  23. sys.setdefaultencoding('utf8'
  24.  
  25. #修改系統默認編碼。 
  26.  
  27. # MaxCompute參數設置 
  28.  
  29. options.sql.settings = {'options.tunnel.limit_instance_tunnel'False'odps.sql.allow.fullscan'True
  30.  
  31. cfg = ConfigParser() 
  32.  
  33. cfg.read("odps.ini"
  34.  
  35. print(cfg.items()) 
  36.  
  37. odps = ODPS(cfg.get("odps","access_id"),cfg.get("odps","secret_access_key"),cfg.get("odps","project"),cfg.get("odps","endpoint")) 

  1. default_args = { 
  2.  
  3. 'owner''airflow'
  4.  
  5. 'depends_on_past'False
  6.  
  7. 'retry_delay': timedelta(minutes=5), 
  8.  
  9. 'start_date':datetime(2020,1,15) 
  10.  
  11. 'email': ['airflow@example.com'], 
  12.  
  13. 'email_on_failure'False
  14.  
  15. 'email_on_retry'False
  16.  
  17. 'retries': 1, 
  18.  
  19. 'queue''bash_queue'
  20.  
  21. 'pool''backfill'
  22.  
  23. 'priority_weight': 10, 
  24.  
  25. 'end_date': datetime(2016, 1, 1), 
  26.  
  27.  
  28. dag = DAG( 
  29.  
  30. 'Airiflow_MC', default_args=default_args, schedule_interval=timedelta(seconds=30)) 
  31.  
  32. def read_sql(sqlfile): 
  33.  
  34. with io.open(sqlfile, encoding='utf-8', mode='r'as f: 
  35.  
  36. sql=f.read() 
  37.  
  38. f.closed 
  39.  
  40. return sql 
  41.  
  42. def get_time(): 
  43.  
  44. print '當前時間是{}'.format(time.time()) 
  45.  
  46. return time.time() 
  47.  
  48. def mc_job (): 
  49.  
  50. project = odps.get_project() # 取到默認項目。 
  51.  
  52. instance=odps.run_sql("select * from long_chinese;"
  53.  
  54. print(instance.get_logview_address()) 
  55.  
  56. instance.wait_for_success() 
  57.  
  58. with instance.open_reader() as reader: 
  59.  
  60. count = reader.count 
  61.  
  62. print("查詢表數據條數:{}".format(count)) 
  63.  
  64. for record in reader: 
  65.  
  66. print record 
  67.  
  68. return count 
  69.  
  70. t1 = PythonOperator ( 
  71.  
  72. task_id = 'get_time' , 
  73.  
  74. provide_context = False , 
  75.  
  76. python_callable = get_time, 
  77.  
  78. dag = dag ) 
  79.  
  80. t2 = PythonOperator ( 
  81.  
  82. task_id = 'mc_job' , 
  83.  
  84. provide_context = False , 
  85.  
  86. python_callable = mc_job , 
  87.  
  88. dag = dag ) 
  89.  
  90. t2.set_upstream(t1) 

2.提交

  1. python Airiflow_MC.py 

3.進行測試

  1. # print the list of active DAGs 
  2.  
  3. airflow list_dags 
  4.  
  5. # prints the list of tasks the "tutorial" dag_id 
  6.  
  7. airflow list_tasks Airiflow_MC 
  8.  
  9. # prints the hierarchy of tasks in the tutorial DAG 
  10.  
  11. airflow list_tasks Airiflow_MC --tree 
  12.  
  13. #測試task 
  14.  
  15. airflow test Airiflow_MC get_time 2010-01-16 
  16.  
  17. airflow test Airiflow_MC mc_job 2010-01-16 

4.運行調度任務

登錄到web界面點擊按鈕運行

5.查看任務運行結果

1.點擊view log

2.查看結果

責任編輯:梁菲 來源: 阿里云云棲號
相關推薦

2017-07-04 13:37:57

調度工具Airflow開源

2021-11-29 08:48:00

K8S KubernetesAirflow

2021-07-27 15:56:28

MaxCompute 資源優化

2021-07-08 09:51:18

MaxCompute SQL數據處理

2022-09-16 11:23:59

Python框架Celery

2021-05-13 12:00:51

cron調度任務系統運維

2022-01-05 19:34:18

AirflowCeleryMYSQL

2022-01-03 23:59:15

任務調度框架

2021-07-29 11:30:41

SaaS云數據倉庫MaxCompute

2022-01-05 00:03:32

場景容器Airflow

2025-04-07 04:00:00

教學型任務調度系統

2022-01-25 18:24:20

KubernetesDeschedule

2021-07-15 17:35:28

MaxCompute logview 阿里云

2009-08-05 10:08:55

MySQL查詢優化調度鎖定

2024-10-21 09:18:47

2024-04-17 07:21:52

物化視圖查詢加速器數據倉庫

2022-12-30 12:02:59

數據

2021-06-21 17:00:05

云計算Hologres云原生

2023-08-30 07:14:27

MaxCompute湖倉一體

2021-09-02 10:15:50

計算平臺MaxCompute 阿里云
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 欧美激情a∨在线视频播放 成人免费共享视频 | 天天干天天操天天看 | 91日日| 麻豆精品一区二区三区在线观看 | 亚洲狠狠爱一区二区三区 | 色久在线 | 成人在线小视频 | 综合国产第二页 | 看亚洲a级一级毛片 | www.中文字幕| 亚洲欧美一区二区三区国产精品 | 精品国产乱码久久久久久88av | 国产精品免费一区二区 | 日韩一二三区视频 | 欧美特级黄色 | 日韩二三区| 一区二区三区 在线 | 国产精品成人一区二区三区 | 自拍视频国产 | 国产一区二区在线免费 | 九九久久精品 | 日本亚洲欧美 | 国产欧美精品在线观看 | 亚洲网站在线观看 | 日韩精品久久一区二区三区 | 91亚洲精品国偷拍自产在线观看 | 亚洲a视频 | 亚洲精品成人 | 欧美成年人视频在线观看 | 日韩不卡视频在线 | 五月婷婷丁香 | 欧美片网站免费 | 特级毛片爽www免费版 | yeyeav| 欧美一区 | 日韩中文欧美 | 夜夜夜夜夜夜曰天天天 | 91视频国产一区 | 中文在线a在线 | 色婷婷久久综合 | 国产91 在线播放 |