讀懂HikariCP一百行代碼,多線程就是個(gè)孫子!
總結(jié):Java屆很難得有讀百十行代碼就能增加修煉的機(jī)會(huì),這里有一個(gè)。
通常,我在看書的時(shí)候一般不寫代碼,因?yàn)槲业哪X袋被設(shè)定成單線程的,一旦同時(shí)喂給它不同的信息,它就無法處理。
但多線程對電腦來說就是小菜一碟,它可以同時(shí)做很多事,看起來匪夷所思。好希望把自己的大腦皮層移植到這些牛x的設(shè)備上。
用人腦思考電腦正在思考的問題,這本身就是一種折磨。但平常的工作和面試中,又不得不面對這樣的場景,所以多線程就成了編程路上一塊難啃的骨頭。
HikariCP是SpringBoot默認(rèn)的數(shù)據(jù)庫連接池,它毫不謙虛的的起了一個(gè)叫做光的名字,這讓國產(chǎn)Druid很沒面子。
還是言歸正傳,看一下Hikari中的ConcurrentBag吧。
核心數(shù)據(jù)結(jié)構(gòu)
多線程代碼一個(gè)讓人比較頭疼的問題,就是每個(gè)API我都懂,但就是不會(huì)用。很多對concurrent包倒背如流的同學(xué),在面對現(xiàn)實(shí)的問題時(shí),到最后依然不得不被迫加上Lock或者synchronized。
ConcurrentBag是一個(gè)Lock free的數(shù)據(jù)結(jié)構(gòu),主要用作數(shù)據(jù)庫連接的存儲(chǔ),可以說整個(gè)HikariCP的核心就是它。刪掉亂七八糟的注釋和異常處理,可以說關(guān)鍵的代碼也就百十來行,但里面的道道卻非常的多。
ConcurrentBag速度很快,要達(dá)到這個(gè)目標(biāo),就需要一定的核心數(shù)據(jù)結(jié)構(gòu)支持。
private final CopyOnWriteArrayList<T> sharedList;
private final ThreadLocal<List<Object>> threadList;
private final AtomicInteger waiters;
private final SynchronousQueue<T> handoffQueue;
- sharedList 用來緩存所有的連接,是一個(gè)CopyOnWriteArrayList結(jié)構(gòu)。
- threadList 用來緩存某個(gè)線程所使用的所有連接,相當(dāng)于快速引用,是一個(gè)ThreadLocal類型的ArrayList。
- waiters 當(dāng)前正在獲取連接的等待者數(shù)量。AtomicInteger,就是一個(gè)自增對象。當(dāng)waiters的數(shù)量大于0時(shí)候,意味著有線程正在獲取資源。
- handoffQueue 0容量的快速傳遞隊(duì)列,SynchronousQueue類型的隊(duì)列,非常有用。
ConcurrentBag里面的元素,為了能夠無鎖化操作,需要使用一些變量來標(biāo)識(shí)現(xiàn)在處于的狀態(tài)。抽象的接口如下:
public interface IConcurrentBagEntry{
int STATE_NOT_IN_USE = 0;
int STATE_IN_USE = 1;
int STATE_REMOVED = -1;
int STATE_RESERVED = -2;
boolean compareAndSet(int expectState, int newState);
void setState(int newState);
int getState();
}
有了這些數(shù)據(jù)結(jié)構(gòu)的支持,我們的ConcurrentBag就可以實(shí)現(xiàn)它光的宣稱了。
獲取連接
連接的獲取是borrow方法,還可以傳入一個(gè)timeout作為超時(shí)控制。
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
首先,如果某個(gè)線程執(zhí)行非常快,使用了比較多的連接,就可以使用ThreadLocal的方式快速獲取連接對象,而不用跑到大池子里面去獲取。代碼如下。
// Try the thread-local list first
final var list = threadList.get();
for (int i = list.size() - 1; i >= 0; i--) {
final var entry = list.remove(i);
final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
}
我們都知道,包括ArrayList和HashMap一些基礎(chǔ)的結(jié)構(gòu),都是Fail Fast的,如果你在遍歷的時(shí)候,刪掉一些數(shù)據(jù),有可能會(huì)引起問題。幸運(yùn)的是,由于我們的List是從ThreadLocal獲取的,它首先就避免了線程安全的問題。
接下來就是遍歷。這段代碼采用的是尾遍歷(頭遍歷會(huì)出現(xiàn)錯(cuò)誤),用于快速的從列表中找到一個(gè)可以復(fù)用的對象,然后使用CAS來把狀態(tài)置為使用中。但如果對象正在被使用,則直接刪除它。
在ConcurrentBag里,每個(gè)ThreadLocal最多緩存50個(gè)連接對象引用。
當(dāng)ThreadLocal里找不到可復(fù)用的對象,它就會(huì)到大池子里去拿。也就是下面這段代碼。
// Otherwise, scan the shared list ... then poll the handoff queue
final int waiting = waiters.incrementAndGet();
try {
for (T bagEntry : sharedList) {
if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
// If we may have stolen another waiter's connection, request another bag add.
if (waiting > 1) {
listener.addBagItem(waiting - 1);
}
return bagEntry;
}
}
listener.addBagItem(waiting);
// 還拿不到,就需要等待別人釋放了
timeout = timeUnit.toNanos(timeout);
do {
final var start = currentTime();
final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
timeout -= elapsedNanos(start);
} while (timeout > 10_000);
return null;
}
finally {
waiters.decrementAndGet();
}
首先要注意,這段代碼可能是由不同的線程執(zhí)行的,所以必須要考慮線程安全問題。由于shardList是線程安全的CopyOnWriteArrayList,適合讀多寫少的場景,我們可以直接進(jìn)行遍歷。
這段代碼的目的是一樣的,需要從sharedList找到一個(gè)空閑的連接對象。這里把自增的waiting變量傳遞到外面的代碼進(jìn)行處理,主要是由于想要根據(jù)waiting的大小來確定是否創(chuàng)建新的對象。
如果無法從池子里獲取連接,則需要等待別的線程釋放一些資源。
創(chuàng)建對象的過程是異步的,要想獲取它,還需要依賴一段循環(huán)代碼。while循環(huán)代碼是納秒精度,會(huì)嘗試從handoffQueue里獲取。最終會(huì)調(diào)用SynchronousQueue的transfer方法。
歸還連接
有借就有還,當(dāng)某個(gè)連接使用完畢,它將被歸還到池子中。
public void requite(final T bagEntry)
{
bagEntry.setState(STATE_NOT_IN_USE);
for (var i = 0; waiters.get() > 0; i++) {
if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
return;
}
else if ((i & 0xff) == 0xff) {
parkNanos(MICROSECONDS.toNanos(10));
}
else {
Thread.yield();
}
}
final var threadLocalList = threadList.get();
if (threadLocalList.size() < 50) {
threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
}
}
首先,把這個(gè)對象置為可用狀態(tài)。然后,代碼會(huì)進(jìn)入一個(gè)循環(huán),等待使用方把這個(gè)連接接手過去。當(dāng)連接處于STATE_NOT_IN_USE狀態(tài),或者隊(duì)列中的數(shù)據(jù)被取走了,那么就可以直接返回了。
由于waiters.get()是實(shí)時(shí)獲取的,有可能長時(shí)間一直大于0,這樣代碼就會(huì)變成死循環(huán),浪費(fèi)CPU。代碼會(huì)嘗試不同層次的睡眠,一個(gè)是每隔255個(gè)waiter睡10ns,一個(gè)是使用yield讓出cpu時(shí)間片。
如果歸還連接的時(shí)候并沒有被其他線程獲取到,那么最后我們會(huì)把歸還的連接放入到相對應(yīng)的ThreadLocal里,因?yàn)閷σ粋€(gè)連接來說,借和還,通常是一個(gè)線程。
知識(shí)點(diǎn)
看起來平平無奇的幾行代碼,為什么搞懂了就能Hold住大部分的并發(fā)編程場景呢?主要還是這里面的知識(shí)點(diǎn)太多。下面我簡單羅列一下,你可以逐個(gè)攻破。
- 使用ThreadLocal來緩存本地資源引用,使用線程封閉的資源來減少鎖的沖突。
- 采用讀多寫少的線程安全的CopyOnWriteArrayList來緩存所有對象,幾乎不影響讀取效率。
- 使用基于CAS的AtomicInteger來計(jì)算等待者的數(shù)量,無鎖操作使得計(jì)算更加快速。
- 0容量的交換隊(duì)列SynchronousQueue,使得對象傳遞更加迅速。
- 采用compareAndSet的CAS原語來控制狀態(tài)的變更,安全且效率高。很多核心代碼都是這么設(shè)計(jì)的。
- 在循環(huán)中使用park、yield等方法,避免死循環(huán)占用大量CPU。
- 需要了解并發(fā)數(shù)據(jù)結(jié)構(gòu)中的offer、poll、peek、put、take、add、remove方法的區(qū)別,并靈活應(yīng)用。
- CAS在設(shè)置狀態(tài)時(shí),采用了volatile關(guān)鍵字修飾,對于volatile的使用也是一個(gè)常見的優(yōu)化點(diǎn)。
- 需要了解WeakReference弱引用在垃圾回收時(shí)候的表現(xiàn)。
麻雀雖小,五臟俱全。如果你想要你的多線程編程能力更上一層樓,讀一讀這個(gè)短小精悍的ConcurrentBag吧。當(dāng)你掌握了它,多線程的那些東西,不過是小菜一碟。
作者簡介:小姐姐味道 (xjjdog),一個(gè)不允許程序員走彎路的公眾號。聚焦基礎(chǔ)架構(gòu)和Linux。十年架構(gòu),日百億流量,與你探討高并發(fā)世界,給你不一樣的味道。