pyspark訪問hive數(shù)據(jù)實(shí)戰(zhàn)
數(shù)據(jù)分析都是直接使用hive腳本進(jìn)行調(diào)用,隨著APP用戶行為和日志數(shù)據(jù)量的逐漸累積,跑每天的腳本運(yùn)行需要花的時(shí)間越來越長(zhǎng),雖然進(jìn)行了sql優(yōu)化,但是上spark已經(jīng)提上日程。
直接進(jìn)行spark開發(fā)需要去學(xué)習(xí)scala,為了降低數(shù)據(jù)分析師的學(xué)習(xí)成本,決定前期先試用sparkSQL,能夠讓計(jì)算引擎無縫從MR切換到spark,現(xiàn)在主要使用pyspark訪問hive數(shù)據(jù)。
以下是安裝配置過程中的詳細(xì)步驟:
1.安裝spark
需要先安裝JDK和scala,這不必多說,由于現(xiàn)有hadoop集群版本是采用的2.6.3,所以spark版本是下載的穩(wěn)定版本spark-1.4.0-bin-hadoop2.6.tgz
我是先在一臺(tái)機(jī)器上完成了Spark的部署,Master和Slave都在一臺(tái)機(jī)器上。注意要配置免秘鑰ssh登陸。
1.1 環(huán)境變量配置
- export JAVA_HOME=/usr/jdk1.8.0_73
- export HADOOP_HOME=/usr/hadoop
- export HADOOP_CONF_DIR=/usr/hadoop/etc/hadoop
- export SCALA_HOME=/usr/local/scala-2.11.7
- export SPARK_HOME=/home/hadoop/spark_folder/spark-1.4.0-bin-hadoop2.6
- export SPARK_MASTER_IP=127.0.0.1
- export SPARK_MASTER_PORT=7077
- export SPARK_MASTER_WEBUI_PORT=8099
- export SPARK_WORKER_CORES=3 //每個(gè)Worker使用的CPU核數(shù)
- export SPARK_WORKER_INSTANCES=1 //每個(gè)Slave中啟動(dòng)幾個(gè)Worker實(shí)例
- export SPARK_WORKER_MEMORY=10G //每個(gè)Worker使用多大的內(nèi)存
- export SPARK_WORKER_WEBUI_PORT=8081 //Worker的WebUI端口號(hào)
- export SPARK_EXECUTOR_CORES=1 //每個(gè)Executor使用使用的核數(shù)
- export SPARK_EXECUTOR_MEMORY=1G //每個(gè)Executor使用的內(nèi)存
- export HIVE_HOME=/home/hadoop/hive
- export SPARK_CLASSPATH=$HIVE_HOME/lib/mysql-connector-java-5.1.31-bin.jar:$SPARK_CLASSPATH
- export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$HADOOP_HOME/lib/native
1.2 配置slaves
- cp slaves.template slaves
- vi slaves 添加以下內(nèi)容:localhost
1.3 啟動(dòng)master和slave
- cd $SPARK_HOME/sbin/
- ./start-master.sh
- 啟動(dòng)日志位于 $SPARK_HOME/logs/目錄,訪問 http://localhost:8099,即可看到Spark的WebUI界面
- 執(zhí)行 ./bin/spark-shell,打開Scala到Spark的連接窗口
2.SparkSQL與Hive的整合
- 拷貝$HIVE_HOME/conf/hive-site.xml和hive-log4j.properties到 $SPARK_HOME/conf/
- 在$SPARK_HOME/conf/目錄中,修改spark-env.sh,添加
- export HIVE_HOME=/home/hadoop/hive
- export SPARK_CLASSPATH=$HIVE_HOME/lib/mysql-connector-java-5.1.31-bin.jar:$SPARK_CLASSPATH
- 另外也可以設(shè)置一下Spark的log4j配置文件,使得屏幕中不打印額外的INFO信息(如果不想受干擾可設(shè)置為更高):
- log4j.rootCategory=WARN, console
- 進(jìn)入$SPARK_HOME/bin,執(zhí)行 ./spark-sql –master spark://127.0.0.1:7077 進(jìn)入spark-sql CLI:
- [hadoop@hadoop spark]$ bin/spark-sql --help
- Usage: ./bin/spark-sql [options] [cli option]
- CLI options:
- -d,--define <keykey=value> Variable subsitution to apply to hive
- commands. e.g. -d A=B or --define A=B
- --database <databasename> Specify the database to use
- -e <quoted-query-string> SQL from command line
- -f <filename> SQL from files
- -h <hostname> connecting to Hive Server on remote host
- --hiveconf <propertyproperty=value> Use value for given property
- --hivevar <keykey=value> Variable subsitution to apply to hive
- commands. e.g. --hivevar A=B
- -i <filename> Initialization SQL file
- -p <port> connecting to Hive Server on port number
- -S,--silent Silent mode in interactive shell
- -v,--verbose Verbose mode (echo executed SQL to the
- console)
需要注意的是CLI不是使用JDBC連接,所以不能連接到ThriftServer;但可以配置conf/hive-site.xml連接到hive的metastore,然后對(duì)hive數(shù)據(jù)進(jìn)行查詢。下面我們接著說如何在python中連接hive數(shù)據(jù)表查詢。
3.配置pyspark和示例代碼
3.1 配置pyspark
- 打開/etc/profile:
- #PythonPath 將Spark中的pySpark模塊增加的Python環(huán)境中
- export PYTHONPATH=/opt/spark-hadoop/python
- source /etc/profile
執(zhí)行./bin/pyspark ,打開Python到Spark的連接窗口,確認(rèn)沒有報(bào)錯(cuò)。
打開命令行窗口,輸入python,Python版本為2.7.6,如圖所示,注意Spark暫時(shí)不支持Python3。輸入import pyspark不報(bào)錯(cuò),證明開發(fā)前工作已經(jīng)完成。
3.2 啟動(dòng)ThriftServer
啟動(dòng)ThriftServer,使之運(yùn)行在spark集群中:
sbin/start-thriftserver.sh --master spark://localhost:7077 --executor-memory 5g
ThriftServer可以連接多個(gè)JDBC/ODBC客戶端,并相互之間可以共享數(shù)據(jù)。
3.3 請(qǐng)求示例
查看spark官方文檔說明,spark1.4和2.0對(duì)于sparksql調(diào)用hive數(shù)據(jù)的API變化并不大。都是用sparkContext 。
- from pyspark import SparkConf, SparkContext
- from pyspark.sql import HiveContext
- conf = (SparkConf()
- .setMaster("spark://127.0.0.1:7077")
- .setAppName("My app")
- .set("spark.executor.memory", "1g"))
- sc = SparkContext(conf = conf)
- sqlContext = HiveContext(sc)
- my_dataframe = sqlContext.sql("Select count(1) from logs.fmnews_dim_where")
- my_dataframe.show()
返回結(jié)果:
運(yùn)行以后在webUI界面看到j(luò)ob運(yùn)行詳情。
4.性能比較
截取了接近一個(gè)月的用戶行為數(shù)據(jù),數(shù)據(jù)大小為2G,總共接近1600w條記錄。
為了測(cè)試不同sql需求情況下的結(jié)果,我們選取了日常運(yùn)行的2類sql:
1.統(tǒng)計(jì)數(shù)據(jù)條數(shù):
- select count(1) from fmnews_user_log2;
2.統(tǒng)計(jì)用戶行為:
- SELECT device_id, min_time FROM
- (SELECT device_id,min(import_time) min_time FROM fmnews_user_log2
- GROUP BY device_id)a
- WHERE from_unixtime(int(substr(min_time,0,10)),'yyyy-MM-dd') = '2017-03-02';
3. 用戶行為分析:
- select case when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '06:00' and '07:59' then 1
- when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '08:00' and '09:59' then 2
- when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '10:00' and '11:59' then 3
- when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '12:00' and '13:59' then 4
- when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '14:00' and '15:59' then 5
- when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '16:00' and '17:59' then 6
- when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '18:00' and '19:59' then 7
- when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '20:00' and '21:59' then 8
- when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '22:00' and '23:59' then 9
- else 0 end fmnews_time_type, count(distinct device_id) device_count,count(1) click_count
- from fmcm.fmnews_user_log2
- where from_unixtime(int(substr(import_time,0,10)),'yyyy-MM-dd') = '2017-03-02'
- group by case when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '06:00' and '07:59' then 1
- when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '08:00' and '09:59' then 2
- when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '10:00' and '11:59' then 3
- when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '12:00' and '13:59' then 4
- when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '14:00' and '15:59' then 5
- when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '16:00' and '17:59' then 6
- when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '18:00' and '19:59' then 7
- when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '20:00' and '21:59' then 8
- when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '22:00' and '23:59' then 9
- else 0 end;
第一條sql的執(zhí)行結(jié)果對(duì)比:hive 35.013 seconds
第一條sql的執(zhí)行結(jié)果對(duì)比:sparksql 1.218 seconds
第二條sql的執(zhí)行結(jié)果對(duì)比:hive 78.101 seconds
第二條sql的執(zhí)行結(jié)果對(duì)比:sparksql 8.669 seconds
第三條sql的執(zhí)行結(jié)果對(duì)比:hive 101.228 seconds
第三條sql的執(zhí)行結(jié)果對(duì)比:sparksql 14.221 seconds
可以看到,雖然沒有官網(wǎng)吹破天的100倍性能提升,但是根據(jù)sql的復(fù)雜度來看10~30倍的效率還是可以達(dá)到的。
不過這里要注意到2個(gè)影響因子:
1. 我們數(shù)據(jù)集并沒有采取全量,在數(shù)據(jù)量達(dá)到TB級(jí)別兩者的差距應(yīng)該會(huì)有所減小。同時(shí)sql也沒有針對(duì)hive做優(yōu)化。
2. spark暫時(shí)是單機(jī)(內(nèi)存足夠)并沒有搭建集群,hive使用的hadoop集群有4臺(tái)datanode。