宜人貸蜂巢API網關技術解密之Netty使用實踐
宜人貸蜂巢團隊,由Michael創立于2013年,通過使用互聯網科技手段助力金融生態和諧健康發展。自成立起一直致力于多維度數據閉環平臺建設。目前團隊規模超過百人,涵蓋征信、電商、金融、社交、五險一金和保險等用戶授信數據的抓取解析業務,輔以先進的數據分析、挖掘和機器學習等技術對用戶信用級別、欺詐風險進行預測評定,全面對外輸出金融反欺詐、社交圖譜、自動化模型定制等服務或產品。
目前宜人貸蜂巢基于用戶授權數據實時抓取解析技術,并結合***大數據技術,快速迭代和自主的創新,已形成了強大而領先的聚合和輸出能力。
為了適應完成宜人貸蜂巢強大的服務輸出能力,蜂巢設計開發了自己的API網關系統,集中實現了鑒權、加解密、路由、限流等功能,使各業務抓取團隊關注其核心抓取和分析工作,而API網關系統更專注于安全、流量、路由等問題,從而更好的保障蜂巢服務系統的質量。今天帶著大家解密API網關的Netty線程池技術實踐細節。
API網關作為宜人貸蜂巢數據開放平臺的統一入口,所有的客戶端及消費端通過統一的API來使用各類抓取服務。從面向對象設計的角度看,它與外觀模式類似,包裝各類不同的實現細節,對外表現出統一的調用形式。
本文首先,簡要地介紹API網關的項目框架,其次對比BIO和NIO的特點,再引入Netty作為項目的基礎框架,然后介紹Netty線程池的原理,***深入Netty線程池的初始化、ServerBootstrap的初始化與啟動及channel與線程池的綁定過程,讓讀者了解Netty在承載高并發訪問的設計思路。
一、項目框架
圖1 - API網關項目框架
圖中描繪了API網關系統的處理流程,以及與服務注冊發現、日志分析、報警系統、各類爬蟲的關系。其中API網關系統接收請求,對請求進行編解碼、鑒權、限流、加解密,再基于Eureka服務注冊發現模塊,將請求發送到有效的服務節點上;網關及抓取系統的日志,會被收集到elk平臺中,做業務分析及報警處理。
二、BIO vs NIO
API網關承載數倍于爬蟲的流量,提升服務器的并發處理能力、縮短系統的響應時間,通信模型的選擇是至關重要的,是選擇BIO,還是NIO?
1. Streamvs Buffer & 阻塞 vs 非阻塞
BIO是面向流的,io的讀寫,每次只能處理一個或者多個bytes,如果數據沒有讀寫完成,線程將一直等待于此,而不能暫時跳過io或者等待io讀寫完成異步通知,線程滯留在io讀寫上,不能充分利用機器有限的線程資源,造成server的吞吐量較低,見圖2。而NIO與此不同,面向Buffer,線程不需要滯留在io讀寫上,采用操作系統的epoll模式,在io數據準備好了,才由線程來處理,見圖3。
圖2 – BIO 從流中讀取數據
圖3 – NIO 從Buffer中讀取數據
2. Selectors
NIO的selector使一個線程可以監控多個channel的讀寫,多個channel注冊到一個selector上,這個selector可以監測到各個channel的數據準備情況,從而使用有限的線程資源處理更多的連接,見圖4。所以可以這樣說,NIO極大的提升了服務器接受并發請求的能力,而服務器性能還是要取決于業務處理時間和業務線程池模型。
圖4 – NIO 單一線程管理多個連接
而BIO采用的是request-per-thread模式,用一個線程負責接收TCP連接請求,并建立鏈路,然后將請求dispatch給負責業務邏輯處理的線程,見圖5。一旦訪問量過多,就會造成機器的線程資源緊張,造成請求延遲,甚至服務宕機。
圖5 – BIO 一連接一線程
對比JDK NIO與諸多NIO框架后,鑒于Netty優雅的設計、易用的API、優越的性能、安全性支持、API網關使用Netty作為通信模型,實現了基礎框架的搭建。
三、線程池
考慮到API網關的高并發訪問需求,線程池設計,見圖6。
圖6 – API網關線程池設計
Netty的線程池理念有點像ForkJoinPool,不是一個線程大池子并發等待一條任務隊列,而是每條線程都有一個任務隊列。而且Netty的線程,并不只是簡單的阻塞地拉取任務,而是在每個循環中做三件事情:
- 先SelectKeys()處理NIO的事件
- 然后獲取本線程的定時任務,放到本線程的任務隊列里
- ***執行其他線程提交給本線程的任務
每個循環里處理NIO事件與其他任務的時間消耗比例,還能通過ioRatio變量來控制,默認是各占50%。可見,Netty的線程根本沒有阻塞等待任務的清閑日子,所以也不使用有鎖的BlockingQueue來做任務隊列了,而是使用無鎖的MpscLinkedQueue(Mpsc 是Multiple Producer, Single Consumer的縮寫)。
四、NioEventLoopGroup初始化
下面分析下Netty線程池NioEventLoopGroup的設計與實現細節,NioEventLoopGroup的類層次關系見圖7:
圖7 –NioEvenrLoopGroup類層次關系
其創建過程——方法調用,見下圖:
圖8 –NioEvenrLoopGroup創建調用關系
NioEvenrLoopGroup的創建,具體執行過程是執行類MultithreadEventExecutorGroup的構造方法:
- /**
- * Create a new instance.
- *
- * @param nThreads the number of threads that will be used by this instance.
- * @param executor the Executor to use, or {@code null} if the default should be used.
- * @param chooserFactory the {@link EventExecutorChooserFactory} to use.
- * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
- */
- protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
- EventExecutorChooserFactory chooserFactory, Object... args) {
- if (nThreads <= 0) {
- throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
- }
- if (executor == null) {
- executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
- }
- children = new EventExecutor[nThreads];
- for (int i = 0; i < nThreads; i ++) {
- boolean success = false;
- try {
- children[i] = newChild(executor, args);
- success = true;
- } catch (Exception e) {
- throw new IllegalStateException("failed to create a child event loop", e);
- } finally {
- if (!success) {
- for (int j = 0; j < i; j ++) {
- children[j].shutdownGracefully();
- }
- for (int j = 0; j < i; j ++) {
- EventExecutor e = children[j];
- try {
- while (!e.isTerminated()) {
- e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
- }
- } catch (InterruptedException interrupted) {
- // Let the caller handle the interruption.
- Thread.currentThread().interrupt();
- break;
- }
- }
- }
- }
- }
- chooser = chooserFactory.newChooser(children);
- final FutureListener<Object> terminationListener = new FutureListener<Object>() {
- @Override
- public void operationComplete(Future<Object> future) throws Exception {
- if (terminatedChildren.incrementAndGet() == children.length) {
- terminationFuture.setSuccess(null);
- }
- }
- };
- for (EventExecutor e: children) {
- e.terminationFuture().addListener(terminationListener);
- }
- Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
- Collections.addAll(childrenSet, children);
- readonlyChildren = Collections.unmodifiableSet(childrenSet);
- }
其中,創建細節見下:
- 線程池中的線程數nThreads必須大于0;
- 如果executor為null,創建默認executor,executor用于創建線程(newChild方法使用executor對象);
- 依次創建線程池中的每一個線程即NioEventLoop,如果其中有一個創建失敗,將關閉之前創建的所有線程;
- chooser為線程池選擇器,用來選擇下一個EventExecutor,可以理解為,用來選擇一個線程來執行task。
chooser的創建細節,見下:
DefaultEventExecutorChooserFactory根據線程數創建具體的EventExecutorChooser,線程數如果等于2^n,可使用按位與替代取模運算,節省cpu的計算資源,見源碼:
- @SuppressWarnings("unchecked")
- @Override
- public EventExecutorChooser newChooser(EventExecutor[] executors) {
- if (isPowerOfTwo(executors.length)) {
- return new PowerOfTowEventExecutorChooser(executors);
- } else {
- return new GenericEventExecutorChooser(executors);
- }
- }
- private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
- private final AtomicInteger idx = new AtomicInteger();
- private final EventExecutor[] executors;
- PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
- this.executors = executors;
- }
- @Override
- public EventExecutor next() {
- return executors[idx.getAndIncrement() & executors.length - 1];
- }
- }
- private static final class GenericEventExecutorChooser implements EventExecutorChooser {
- private final AtomicInteger idx = new AtomicInteger();
- private final EventExecutor[] executors;
- GenericEventExecutorChooser(EventExecutor[] executors) {
- this.executors = executors;
- }
- @Override
- public EventExecutor next() {
- return executors[Math.abs(idx.getAndIncrement() % executors.length)];
- }
- }
newChild(executor, args)的創建細節,見下:
MultithreadEventExecutorGroup的newChild方法是一個抽象方法,故使用NioEventLoopGroup的newChild方法,即調用NioEventLoop的構造函數:
- @Override
- protected EventLoop newChild(Executor executor, Object... args) throws Exception {
- return new NioEventLoop(this, executor, (SelectorProvider) args[0],
- ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
- }
在這里先看下NioEventLoop的類層次關系:
NioEventLoop的繼承關系比較復雜,在AbstractScheduledEventExecutor 中,Netty 實現了 NioEventLoop 的 schedule 功能,即我們可以通過調用一個 NioEventLoop 實例的 schedule 方法來運行一些定時任務。而在 SingleThreadEventLoop 中,又實現了任務隊列的功能,通過它,我們可以調用一個NioEventLoop 實例的 execute 方法來向任務隊列中添加一個 task, 并由 NioEventLoop 進行調度執行。
通常來說,NioEventLoop 肩負著兩種任務,***個是作為 IO 線程,執行與 Channel 相關的 IO 操作,包括調用 select 等待就緒的 IO 事件、讀寫數據與數據的處理等;而第二個任務是作為任務隊列,執行 taskQueue 中的任務,例如用戶調用 eventLoop.schedule 提交的定時任務也是這個線程執行的。
具體的構造過程,見下:
創建任務隊列tailTasks(內部為有界的LinkedBlockingQueue):
創建線程的任務隊列taskQueue(內部為有界的LinkedBlockingQueue),以及任務過多防止系統宕機的拒絕策略rejectedHandler。
其中tailTasks和taskQueue均是任務隊列,而優先級不同,taskQueue的優先級高于tailTasks,定時任務的優先級高于taskQueue。
五、ServerBootstrap初始化及啟動
了解了Netty線程池NioEvenrLoopGroup的創建過程后,下面看下API網關服務ServerBootstrap的是如何使用線程池引入服務中,為高并發訪問服務的。
API網關ServerBootstrap初始化及啟動代碼,見下:
- serverBootstrap = new ServerBootstrap();
- bossGroup = new NioEventLoopGroup(config.getBossGroupThreads());
- workerGroup = new NioEventLoopGroup(config.getWorkerGroupThreads());
- serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
- .option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay())
- .option(ChannelOption.SO_BACKLOG, config.getBacklogSize())
- .option(ChannelOption.SO_KEEPALIVE, config.isSoKeepAlive())
- // Memory pooled
- .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
- .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
- .childHandler(channelInitializer);
- ChannelFuture future = serverBootstrap.bind(config.getPort()).sync();
- log.info("API-gateway started on port: {}", config.getPort());
- future.channel().closeFuture().sync();
API網關系統使用netty自帶的線程池,共有三組線程池,分別為bossGroup、workerGroup和executorGroup(使用在channelInitializer中,本文暫不作介紹)。其中,bossGroup用于接收客戶端的TCP連接,workerGroup用于處理I/O、執行系統task和定時任務,executorGroup用于處理網關業務加解密、限流、路由,及將請求轉發給后端的抓取服務等業務操作。
六、Channel與線程池的綁定
ServerBootstrap初始化后,通過調用bind(port)方法啟動Server,bind的調用鏈如下:
- AbstractBootstrap.bind ->AbstractBootstrap.doBind -> AbstractBootstrap.initAndRegister
其中,ChannelFuture regFuture = config().group().register(channel);中的group()方法返回bossGroup,而channel在serverBootstrap的初始化過程指定channel為NioServerSocketChannel.class,至此將NioServerSocketChannel與bossGroup綁定到一起,bossGroup負責客戶端連接的建立。那么NioSocketChannel是如何與workerGroup綁定到一起的?
調用鏈AbstractBootstrap.initAndRegister -> AbstractBootstrap. init-> ServerBootstrap.init ->ServerBootstrapAcceptor.ServerBootstrapAcceptor ->ServerBootstrapAcceptor.channelRead:
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- final Channel child = (Channel) msg;
- child.pipeline().addLast(childHandler);
- for (Entry<ChannelOption<?>, Object> e: childOptions) {
- try {
- if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
- logger.warn("Unknown channel option: " + e);
- }
- } catch (Throwable t) {
- logger.warn("Failed to set a channel option: " + child, t);
- }
- }
- for (Entry<AttributeKey<?>, Object> e: childAttrs) {
- child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
- }
- try {
- childGroup.register(child).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- forceClose(child, future.cause());
- }
- }
- });
- } catch (Throwable t) {
- forceClose(child, t);
- }
- }
其中,childGroup.register(child)就是將NioSocketChannel與workderGroup綁定到一起,那又是什么觸發了ServerBootstrapAcceptor的channelRead方法?
其實當一個 client 連接到 server 時,Java 底層的 NIO ServerSocketChannel 會有一個SelectionKey.OP_ACCEPT 就緒事件,接著就會調用到 NioServerSocketChannel.doReadMessages方法。
- @Override
- protected int doReadMessages(List<Object> buf) throws Exception {
- SocketChannel ch = javaChannel().accept();
- try {
- if (ch != null) {
- buf.add(new NioSocketChannel(this, ch));
- return 1;
- }
- } catch (Throwable t) { …
- }
- return 0;
- }
javaChannel().accept() 會獲取到客戶端新連接的SocketChannel,實例化為一個 NioSocketChannel, 并且傳入 NioServerSocketChannel 對象(即 this),由此可知, 我們創建的這個NioSocketChannel 的父 Channel 就是 NioServerSocketChannel 實例 。
接下來就經由 Netty 的 ChannelPipeline 機制,將讀取事件逐級發送到各個 handler 中,于是就會觸發前面我們提到的 ServerBootstrapAcceptor.channelRead 方法啦。
至此,分析了Netty線程池的初始化、ServerBootstrap的啟動及channel與線程池的綁定過程,能夠看出Netty中線程池的優雅設計,使用不同的線程池負責連接的建立、IO讀寫等,為API網關項目的高并發訪問提供了技術基礎。
七、總結
至此,對API網關技術的Netty實踐分享就到這里,各位如果對中間的各個環節有什么疑問和建議,歡迎大家指正,我們一起討論,共同學習提高。
參考:
- http://tutorials.jenkov.com/java-nio/nio-vs-io.html
- http://netty.io/wiki/user-guide-for-4.x.html
- http://netty.io/
- http://www.tuicool.com/articles/mUFnqeM
- https://segmentfault.com/a/1190000007403873
- https://segmentfault.com/a/1190000007283053
【本文是51CTO專欄機構宜信技術學院的原創文章,微信公眾號“宜信技術學院( id: CE_TECH)”】