如何設(shè)計一個實時流計算系統(tǒng)?
實時流計算的場景歸納起來多半是:業(yè)務(wù)系統(tǒng)根據(jù)實時的操作,不斷生成事件(消息/調(diào)用),然后引起一系列的處理分析,這個過程是分散在多臺計算機上并行完成的,看上去就像事件連續(xù)不斷的流經(jīng)多個計算節(jié)點處理,形成一個實時流計算系統(tǒng)。
市場上流計算產(chǎn)品有很多,主要是通過消息中樞結(jié)合工人模式實現(xiàn),大致過程如下:
1、開發(fā)者實現(xiàn)好流程輸入輸出節(jié)點邏輯,上傳job到任務(wù)生產(chǎn)者
2、任務(wù)生產(chǎn)者將任務(wù)發(fā)送到zookeeper,然后監(jiān)控任務(wù)狀態(tài)
3、任務(wù)消費者從zookeeper上獲取任務(wù)
4、任務(wù)消費者啟動多個工人進程,每個進程又啟動多個線程執(zhí)行任務(wù)
5、工人之間通過zeroMQ交互
我們看看如何做一個簡單的流計算系統(tǒng),做法跟上面有些不同:
1、首先不過多依賴zookeerper,任務(wù)的分配最好直接給到工人,并能直接監(jiān)控工人完成狀態(tài),這樣效率會更高。
2、工人之間直接通訊,不依賴zeroMQ轉(zhuǎn)發(fā)。
3、并行管理扁平化,多進程下再分多線程意義不大,增加管理成本,實際上一臺機器8個進程,每個進程再開8個線程,總體跟8-10個進程或者線程的效果差不多(數(shù)量視機器性能不同)。
4、做成一個流計算系統(tǒng),而不是平臺。
這里我們借助fourinone提供的api和框架去實現(xiàn),第一次使用可以參考分布式計算上手demo指南,開發(fā)包下載地址 http://code.google.com/p/fourinone/
大致思路:用工頭去做任務(wù)生產(chǎn)和分配,用工人去做任務(wù)執(zhí)行,為了達到流的效果,需要在工人里面調(diào)用工頭的方式,將多個工人節(jié)點串起來。
下面程序演示了連續(xù)多個消息先發(fā)到一個工人節(jié)點A處理,然后再發(fā)到兩個工人節(jié)點B并行處理的流計算過程,并且獲取到最后處理結(jié)果打印輸出(如果不需要獲取結(jié)果可以直接返回)。
- StreamCtorA:工頭A實現(xiàn),它獲取到線上工人A,然后將消息發(fā)給它處理,并輪循等待結(jié)果。工頭A的main函數(shù)模擬了多個消息的連續(xù)調(diào)用。
- StreamWorkerA:工人A實現(xiàn),它接收到工頭A的消息進行處理,然后創(chuàng)建一個工頭B,通過工頭B將結(jié)果同時發(fā)給兩個工人B處理,然后將結(jié)果返回工頭A。
- StreamCtorB:工頭B實現(xiàn),它獲取到線上兩個工人B,調(diào)用doTaskBatch等待兩個工人處理完成,然后返回結(jié)果給工人A。
- StreamWorkerB:工人B實現(xiàn),它接收到任務(wù)消息后模擬處理后返回結(jié)果。
運行步驟(在本地模擬):
1、啟動ParkServerDemo(它的IP端口已經(jīng)在配置文件指定)
- java -cp fourinone.jar; ParkServerDemo
2、啟動工人A
- java -cp fourinone.jar; StreamWorkerA localhost 2008
3、啟動兩個工人B
- java -cp fourinone.jar; StreamWorkerB localhost 2009
- java -cp fourinone.jar; StreamWorkerB localhost 2010
4、啟動工頭A
- java -cp fourinone.jar; StreamCtorA
多機部署說明:StreamCtorA可以單獨部署一臺機器,StreamWorkerA和StreamCtorB部署一臺機器,兩個StreamWorkerB可以部署兩臺機器。
總結(jié):計算平臺和計算系統(tǒng)的區(qū)別
如果我們只有幾臺機器,但是每天有人開發(fā)不同的流處理應(yīng)用要在這幾臺機器上運行,我們需要一個計算平臺來管理好job,讓開發(fā)者按照規(guī)范配置好流程和運行時節(jié)點申請,打包成job上傳,然后平臺根據(jù)每個job配置動態(tài)分配資源依次執(zhí)行每個job內(nèi)容。
如果我們的幾臺機器只為一個流處理業(yè)務(wù)服務(wù),比如實時營銷,我們需要一個流計算系統(tǒng),按照業(yè)務(wù)流程部署好計算節(jié)點即可,不需要運行多個job和動態(tài)分配資源,按照計算平臺的方式做只會增加復(fù)雜性,開發(fā)者也不清楚每臺機器上到底運行了什么邏輯。
如果你想實現(xiàn)一個計算平臺,可以參考動態(tài)部署和進程管理功能(開發(fā)包內(nèi)有指南)
//完整源碼
// ParkServerDemo
- import com.fourinone.BeanContext;
- public class ParkServerDemo
- {
- public static void main(String[] args)
- {
- BeanContext.startPark();
- }
- }
//StreamCtorA
- import com.fourinone.Contractor;
- import com.fourinone.WareHouse;
- import com.fourinone.WorkerLocal;
- import java.util.ArrayList;
- public class StreamCtorA extends Contractor
- {
- public WareHouse giveTask(WareHouse inhouse)
- {
- WorkerLocal[] wks = getWaitingWorkers("StreamWorkerA");
- System.out.println("wks.length:"+wks.length);
- WareHouse result = wks[0].doTask(inhouse);
- while(true){
- if(result.getStatus()!=WareHouse.NOTREADY)
- {
- break;
- }
- }
- return result;
- }
- public static void main(String[] args)
- {
- StreamCtorA sc = new StreamCtorA();
- for(int i=0;i<10;i++){
- WareHouse msg = new WareHouse();
- msg.put("msg","hello"+i);
- WareHouse wh = sc.giveTask(msg);
- System.out.println(wh);
- }
- sc.exit();
- }
- }
//StreamWorkerA
- import com.fourinone.MigrantWorker;
- import com.fourinone.WareHouse;
- public class StreamWorkerA extends MigrantWorker
- {
- public WareHouse doTask(WareHouse inhouse)
- {
- System.out.println(inhouse);
//do something
- StreamCtorB sc = new StreamCtorB();
- WareHouse msg = new WareHouse();
- msg.put("msg",inhouse.getString("msg")+",from StreamWorkerA");
- WareHouse wh = sc.giveTask(msg);
- sc.exit();
- return wh;
- }
- public static void main(String[] args)
- {
- StreamWorkerA wd = new StreamWorkerA();
- wd.waitWorking(args[0],Integer.parseInt(args[1]),"StreamWorkerA");
- }
- }
//StreamCtorB
- import com.fourinone.Contractor;
- import com.fourinone.WareHouse;
- import com.fourinone.WorkerLocal;
- import java.util.ArrayList;
- public class StreamCtorB extends Contractor
- {
- public WareHouse giveTask(WareHouse inhouse)
- {
- WorkerLocal[] wks = getWaitingWorkers("StreamWorkerB");
- System.out.println("wks.length:"+wks.length);
- WareHouse[] hmarr = doTaskBatch(wks, inhouse);
- WareHouse result = new WareHouse();
- result.put("B1",hmarr[0]);
- result.put("B2",hmarr[1]);
- return result;
- }
- }
//StreamWorkerB
- view sourceprint?
- import com.fourinone.MigrantWorker;
- import com.fourinone.WareHouse;
- public class StreamWorkerB extends MigrantWorker
- {
- public WareHouse doTask(WareHouse inhouse)
- {
- System.out.println(inhouse);
//do something
- inhouse.put("msg",inhouse.getString("msg")+",from StreamWorkerB");
- return inhouse;
- }
- public static void main(String[] args)
- {
- StreamWorkerB wd = new StreamWorkerB();
- wd.waitWorking(args[0],Integer.parseInt(args[1]),"StreamWorkerB");
- }
- }