Flume架構與源碼分析-核心組件分析-1
首先所有核心組件都會實現org.apache.flume.lifecycle.LifecycleAware接口:
Java代碼
- public interface LifecycleAware {
- public void start();
- public void stop();
- public LifecycleState getLifecycleState();
- }
start方法在整個Flume啟動時或者初始化組件時都會調用start方法進行組件初始化,Flume組件出現異常停止時會調用stop,getLifecycleState返回組件的生命周期狀態,有IDLE, START, STOP, ERROR四個狀態。
如果開發的組件需要配置,如設置一些屬性;可以實現org.apache.flume.conf.Configurable接口:
Java代碼
- public interface Configurable {
- public void configure(Context context);
- }
Flume在啟動組件之前會調用configure來初始化組件一些配置。
1、Source
Source用于采集日志數據,有兩種實現方式:輪訓拉取和事件驅動機制;Source接口如下:
Java代碼
- public interface Source extends LifecycleAware, NamedComponent {
- public void setChannelProcessor(ChannelProcessor channelProcessor);
- public ChannelProcessor getChannelProcessor();
- }
Source接口首先繼承了LifecycleAware接口,然后只提供了ChannelProcessor的setter和getter接口,也就是說它的的所有邏輯的實現應該在LifecycleAware接口的start和stop中實現;ChannelProcessor之前介紹過用來進行日志流的過濾和Channel的選擇及調度。
而Source是通過SourceFactory工廠創建,默認提供了DefaultSourceFactory,其首先通過Enum類型org.apache.flume.conf.source.SourceType查找默認實現,如exec,則找到org.apache.flume.source.ExecSource實現,如果找不到直接Class.forName(className)創建。
Source提供了兩種機制: PollableSource(輪訓拉取)和EventDrivenSource(事件驅動):
PollableSource默認提供了如下實現:
比如JMSSource實現使用javax.jms.MessageConsumer.receive(pollTimeout)主動去拉取消息。
EventDrivenSource默認提供了如下實現:
比如NetcatSource、HttpSource就是事件驅動,即被動等待;比如HttpSource就是內部啟動了一個內嵌的Jetty啟動了一個Servlet容器,通過FlumeHTTPServlet去接收消息。
Flume提供了SourceRunner用來啟動Source的流轉:
Java代碼
- public class EventDrivenSourceRunner extends SourceRunner {
- private LifecycleState lifecycleState;
- public EventDrivenSourceRunner() {
- lifecycleState = LifecycleState.IDLE; //啟動之前是空閑狀態
- }
- @Override
- public void start() {
- Source source = getSource(); //獲取Source
- ChannelProcessor cp = source.getChannelProcessor(); //Channel處理器
- cp.initialize(); //初始化Channel處理器
- source.start(); //啟動Source
- lifecycleState = LifecycleState.START; //本組件狀態改成啟動狀態
- }
- @Override
- public void stop() {
- Source source = getSource(); //先停Source
- source.stop();
- ChannelProcessor cp = source.getChannelProcessor();
- cp.close();//再停Channel處理器
- lifecycleState = LifecycleState.STOP; //本組件狀態改成停止狀態
- }
- }
從本組件也可以看出:1、首先要初始化ChannelProcessor,其實現時初始化過濾器鏈;2、接著啟動Source并更改本組件的狀態。
Java代碼
- public class PollableSourceRunner extends SourceRunner {
- @Override
- public void start() {
- PollableSource source = (PollableSource) getSource();
- ChannelProcessor cp = source.getChannelProcessor();
- cp.initialize();
- source.start();
- runner = new PollingRunner();
- runner.source = source;
- runner.counterGroup = counterGroup;
- runner.shouldStop = shouldStop;
- runnerThread = new Thread(runner);
- runnerThread.setName(getClass().getSimpleName() + "-" +
- source.getClass().getSimpleName() + "-" + source.getName());
- runnerThread.start();
- lifecycleState = LifecycleState.START;
- }
- }
而PollingRunner首先初始化組件,但是又啟動了一個線程PollingRunner,其作用就是輪訓拉取數據:
Java代碼
- @Override
- public void run() {
- while (!shouldStop.get()) { //如果沒有停止,則一直在死循環運行
- counterGroup.incrementAndGet("runner.polls");
- try {
- //調用PollableSource的process方法進行輪訓拉取,然后判斷是否遇到了失敗補償
- if (source.process().equals(PollableSource.Status.BACKOFF)) {/
- counterGroup.incrementAndGet("runner.backoffs");
- //失敗補償時暫停線程處理,等待超時時間之后重試
- Thread.sleep(Math.min(
- counterGroup.incrementAndGet("runner.backoffs.consecutive")
- * source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval()));
- } else {
- counterGroup.set("runner.backoffs.consecutive", 0L);
- }
- } catch (InterruptedException e) {
- }
- }
- }
- }
- }
Flume在啟動時會判斷Source是PollableSource還是EventDrivenSource來選擇使用PollableSourceRunner還是EventDrivenSourceRunner。
比如HttpSource實現,其通過FlumeHTTPServlet接收消息然后:
Java代碼
- List<Event> events = Collections.emptyList(); //create empty list
- //首先從請求中獲取Event
- events = handler.getEvents(request);
- //然后交給ChannelProcessor進行處理
- getChannelProcessor().processEventBatch(events);
到此基本的Source流程就介紹完了,其作用就是監聽日志,采集,然后交給ChannelProcessor進行處理。
2、Channel
Channel用于連接Source和Sink,Source生產日志發送到Channel,Sink從Channel消費日志;也就是說通過Channel實現了Source和Sink的解耦,可以實現多對多的關聯,和Source、Sink的異步化。
之前Source采集到日志后會交給ChannelProcessor處理,那么接下來我們先從ChannelProcessor入手,其依賴三個組件:
Java代碼
- private final ChannelSelector selector; //Channel選擇器
- private final InterceptorChain interceptorChain; //攔截器鏈
- private ExecutorService execService; //用于實現可選Channel的ExecutorService,默認是單線程實現
接下來看下其是如何處理Event的:
Java代碼
- public void processEvent(Event event) {
- event = interceptorChain.intercept(event); //首先進行攔截器鏈過濾
- if (event == null) {
- return;
- }
- List<Event> events = new ArrayList<Event>(1);
- events.add(event);
- //通過Channel選擇器獲取必須成功處理的Channel,然后事務中執行
- List<Channel> requiredChannels = selector.getRequiredChannels(event);
- for (Channel reqChannel : requiredChannels) {
- executeChannelTransaction(reqChannel, events, false);
- }
- //通過Channel選擇器獲取可選的Channel,這些Channel失敗是可以忽略,不影響其他Channel的處理
- List<Channel> optionalChannels = selector.getOptionalChannels(event);
- for (Channel optChannel : optionalChannels) {
- execService.submit(new OptionalChannelTransactionRunnable(optChannel, events));
- }
- }
另外內部還提供了批處理實現方法processEventBatch;對于內部事務實現的話可以參考executeChannelTransaction方法,整體事務機制類似于JDBC:
Java代碼
- private static void executeChannelTransaction(Channel channel, List<Event> batch, boolean isOptional) {
- //1、獲取Channel上的事務
- Transaction tx = channel.getTransaction();
- Preconditions.checkNotNull(tx, "Transaction object must not be null");
- try {
- //2、開啟事務
- tx.begin();
- //3、在Channel上執行批量put操作
- for (Event event : batch) {
- channel.put(event);
- }
- //4、成功后提交事務
- tx.commit();
- } catch (Throwable t) {
- //5、異常后回滾事務
- tx.rollback();
- if (t instanceof Error) {
- LOG.error("Error while writing to channel: " +
- channel, t);
- throw (Error) t;
- } else if(!isOptional) {//如果是可選的Channel,異常忽略
- throw new ChannelException("Unable to put batch on required " +
- "channel: " + channel, t);
- }
- } finally {
- //***關閉事務
- tx.close();
- }
- }
Interceptor用于過濾Event,即傳入一個Event然后進行過濾加工,然后返回一個新的Event,接口如下:
Java代碼
- public interface Interceptor {
- public void initialize();
- public Event intercept(Event event);
- public List<Event> intercept(List<Event> events);
- public void close();
- }
可以看到其提供了initialize和close方法用于啟動和關閉;intercept方法用于過濾或加工Event。比如HostInterceptor攔截器用于獲取本機IP然后默認添加到Event的字段為host的Header中。
接下來就是ChannelSelector選擇器了,其通過如下方式創建:
Java代碼
- //獲取ChannelSelector配置,比如agent.sources.s1.selector.type = replicating
- ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration();
- //使用Source關聯的Channel創建,比如agent.sources.s1.channels = c1 c2
- ChannelSelector selector = ChannelSelectorFactory.create(sourceChannels, selectorConfig);
ChannelSelector默認提供了兩種實現:復制和多路復用:
默認實現是復制選擇器ReplicatingChannelSelector,即把接收到的消息復制到每一個Channel;多路復用選擇器MultiplexingChannelSelector會根據Event Header中的參數進行選擇,以此來選擇使用哪個Channel。
而Channel是Event中轉的地方,Source發布Event到Channel,Sink消費Channel的Event;Channel接口提供了如下接口用來實現Event流轉:
Java代碼
- public interface Channel extends LifecycleAware, NamedComponent {
- public void put(Event event) throws ChannelException;
- public Event take() throws ChannelException;
- public Transaction getTransaction();
- }
put用于發布Event,take用于消費Event,getTransaction用于事務支持。默認提供了如下Channel的實現:
對于Channel的實現我們后續單獨章節介紹。
3、Sink
Sink從Channel消費Event,然后進行轉移到收集/聚合層或存儲層。Sink接口如下所示:
Java代碼
- public interface Sink extends LifecycleAware, NamedComponent {
- public void setChannel(Channel channel);
- public Channel getChannel();
- public Status process() throws EventDeliveryException;
- public static enum Status {
- READY, BACKOFF
- }
- }
類似于Source,其首先繼承了LifecycleAware,然后提供了Channel的getter/setter方法,并提供了process方法進行消費,此方法會返回消費的狀態,READY或BACKOFF。
Sink也是通過SinkFactory工廠來創建,其也提供了DefaultSinkFactory默認工廠,比如傳入hdfs,會先查找Enum org.apache.flume.conf.sink.SinkType,然后找到相應的默認處理類org.apache.flume.sink.hdfs.HDFSEventSink,如果沒找到默認處理類,直接通過Class.forName(className)進行反射創建。
我們知道Sink還提供了分組功能,用于把多個Sink聚合為一組進行使用,內部提供了SinkGroup用來完成這個事情。此時問題來了,如何去調度多個Sink,其內部使用了SinkProcessor來完成這個事情,默認提供了故障轉移和負載均衡兩個策略。
首先SinkGroup就是聚合多個Sink為一組,然后將多個Sink傳給SinkProcessorFactory進行創建SinkProcessor,而策略是根據配置文件中配置的如agent.sinkgroups.g1.processor.type = load_balance來選擇的。
SinkProcessor提供了如下實現:
DefaultSinkProcessor:默認實現,用于單個Sink的場景使用。
FailoverSinkProcessor:故障轉移實現:
Java代碼
- public Status process() throws EventDeliveryException {
- Long now = System.currentTimeMillis();
- //1、首先檢查失敗隊列的頭部的Sink是否已經過了失敗補償等待時間了
- while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {
- //2、如果可以使用了,則從失敗Sink隊列獲取隊列***個Sink
- FailedSink cur = failedSinks.poll();
- Status s;
- try {
- s = cur.getSink().process(); //3、使用此Sink進行處理
- if (s == Status.READY) { //4、如果處理成功
- liveSinks.put(cur.getPriority(), cur.getSink()); //4.1、放回存活Sink隊列
- activeSink = liveSinks.get(liveSinks.lastKey());
- } else {
- failedSinks.add(cur); //4.2、如果此時不是READY,即BACKOFF期間,再次放回失敗隊列
- }
- return s;
- } catch (Exception e) {
- cur.incFails(); //5、如果遇到異常了,則增加失敗次數,并放回失敗隊列
- failedSinks.add(cur);
- }
- }
- Status ret = null;
- while(activeSink != null) { //6、此時失敗隊列中沒有Sink能處理了,那么需要使用存活Sink隊列進行處理
- try {
- ret = activeSink.process();
- return ret;
- } catch (Exception e) { //7、處理失敗進行轉移到失敗隊列
- activeSink = moveActiveToDeadAndGetNext();
- }
- }
- throw new EventDeliveryException("All sinks failed to process, " +
- "nothing left to failover to");
- }
失敗隊列是一個優先級隊列,使用refresh屬性排序,而refresh是通過如下機制計算的:
Java代碼
- refresh = System.currentTimeMillis()
- + Math.min(maxPenalty, (1 << sequentialFailures) * FAILURE_PENALTY);
其中maxPenalty是***等待時間,默認30s,而(1 << sequentialFailures) * FAILURE_PENALTY)用于實現指數級等待時間遞增, FAILURE_PENALTY是1s。
LoadBalanceSinkProcessor:用于實現Sink的負載均衡,其通過SinkSelector進行實現,類似于ChannelSelector。LoadBalanceSinkProcessor在啟動時會根據配置,如agent.sinkgroups.g1.processor.selector = random進行選擇,默認提供了兩種選擇器:
LoadBalanceSinkProcessor使用如下機制進行負載均衡:
Java代碼
- public Status process() throws EventDeliveryException {
- Status status = null;
- //1、使用選擇器創建相應的迭代器,也就是用來選擇Sink的迭代器
- Iterator<Sink> sinkIterator = selector.createSinkIterator();
- while (sinkIterator.hasNext()) {
- Sink sink = sinkIterator.next();
- try {
- //2、選擇器迭代Sink進行處理,如果成功直接break掉這次處理,此次負載均衡就算完成了
- status = sink.process();
- break;
- } catch (Exception ex) {
- //3、失敗后會通知選擇器,采取相應的失敗退避補償算法進行處理
- selector.informSinkFailed(sink);
- LOGGER.warn("Sink failed to consume event. "
- + "Attempting next sink if available.", ex);
- }
- }
- if (status == null) {
- throw new EventDeliveryException("All configured sinks have failed");
- }
- return status;
- }
如上的核心就是怎么創建迭代器,如何進行失敗退避補償處理,首先我們看下RoundRobinSinkSelector實現,其內部是通過通用的RoundRobinOrderSelector選擇器實現:
Java代碼
- public Iterator<T> createIterator() {
- //1、獲取存活的Sink索引,
- List<Integer> activeIndices = getIndexList();
- int size = activeIndices.size();
- //2、如果上次記錄的下一個存活Sink的位置超過了size,那么從隊列頭重新開始計數
- if (nextHead >= size) {
- nextHead = 0;
- }
- //3、獲取本次使用的起始位置
- int begin = nextHead++;
- if (nextHead == activeIndices.size()) {
- nextHead = 0;
- }
- //4、從該位置開始迭代,其實現類似于環形隊列,比如整個隊列是5,起始位置是3,則按照 3、4、0、1、2的順序進行輪訓,實現了輪訓算法
- int[] indexOrder = new int[size];
- for (int i = 0; i < size; i++) {
- indexOrder[i] = activeIndices.get((begin + i) % size);
- }
- //indexOrder是迭代順序,getObjects返回相關的Sinks;
- return new SpecificOrderIterator<T>(indexOrder, getObjects());
- }
getIndexList實現如下:
Java代碼
- protected List<Integer> getIndexList() {
- long now = System.currentTimeMillis();
- List<Integer> indexList = new ArrayList<Integer>();
- int i = 0;
- for (T obj : stateMap.keySet()) {
- if (!isShouldBackOff() || stateMap.get(obj).restoreTime < now) {
- indexList.add(i);
- }
- i++;
- }
- return indexList;
- }
isShouldBackOff()表示是否開啟退避算法支持,如果不開啟,則認為每個Sink都是存活的,每次都會重試,通過agent.sinkgroups.g1.processor.backoff = true配置開啟,默認false;restoreTime和之前介紹的refresh一樣,是退避補償等待時間,算法類似,就不多介紹了。
那么什么時候調用Sink進行消費呢?其類似于SourceRunner,Sink提供了SinkRunner進行輪訓拉取處理,SinkRunner會輪訓調度SinkProcessor消費Channel的消息,然后調用Sink進行轉移。SinkProcessor之前介紹過,其負責消息復制/路由。
SinkRunner實現如下:
Java代碼
- public void start() {
- SinkProcessor policy = getPolicy();
- policy.start();
- runner = new PollingRunner();
- runner.policy = policy;
- runner.counterGroup = counterGroup;
- runner.shouldStop = new AtomicBoolean();
- runnerThread = new Thread(runner);
- runnerThread.setName("SinkRunner-PollingRunner-" +
- policy.getClass().getSimpleName());
- runnerThread.start();
- lifecycleState = LifecycleState.START;
- }
即獲取SinkProcessor然后啟動它,接著啟動輪訓線程去處理。PollingRunner線程負責輪訓消息,核心實現如下:
Java代碼
- public void run() {
- while (!shouldStop.get()) { //如果沒有停止
- try {
- if (policy.process().equals(Sink.Status.BACKOFF)) {//如果處理失敗了,進行退避補償處理
- counterGroup.incrementAndGet("runner.backoffs");
- Thread.sleep(Math.min(
- counterGroup.incrementAndGet("runner.backoffs.consecutive")
- * backoffSleepIncrement, maxBackoffSleep)); //暫停退避補償設定的超時時間
- } else {
- counterGroup.set("runner.backoffs.consecutive", 0L);
- }
- } catch (Exception e) {
- try {
- Thread.sleep(maxBackoffSleep); //如果遇到異常則等待***退避時間
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- }
- }
- }
整體實現類似于PollableSourceRunner實現,整體處理都是交給SinkProcessor完成的。SinkProcessor會輪訓Sink的process方法進行處理;此處以LoggerSink為例:
Java代碼
- @Override
- public Status process() throws EventDeliveryException {
- Status result = Status.READY;
- Channel channel = getChannel();
- //1、獲取事務
- Transaction transaction = channel.getTransaction();
- Event event = null;
- try {
- //2、開啟事務
- transaction.begin();
- //3、從Channel獲取Event
- event = channel.take();
- if (event != null) {
- if (logger.isInfoEnabled()) {
- logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog));
- }
- } else {//4、如果Channel中沒有Event,則默認進入故障補償機制,即防止死循環造成CPU負載高
- result = Status.BACKOFF;
- }
- //5、成功后提交事務
- transaction.commit();
- } catch (Exception ex) {
- //6、失敗后回滾事務
- transaction.rollback();
- throw new EventDeliveryException("Failed to log event: " + event, ex);
- } finally {
- //7、關閉事務
- transaction.close();
- }
- return result;
- }
Sink中一些實現是支持批處理的,比如RollingFileSink:
Java代碼
- //1、開啟事務
- //2、批處理
- for (int i = 0; i < batchSize; i++) {
- event = channel.take();
- if (event != null) {
- sinkCounter.incrementEventDrainAttemptCount();
- eventAttemptCounter++;
- serializer.write(event);
- }
- }
- //3、提交/回滾事務、關閉事務
定義一個批處理大小然后在事務中執行批處理。
【本文是51CTO專欄作者張開濤的原創文章,作者微信公眾號:開濤的博客,id:kaitao-1234567】