KeyAffinityExecutor 線程池
線上案例
有一批量的數據,可以按照一個固定的 key 分組并發,但是要保證組內并行的處理。 比如:商城中,不同的用戶可以并發下單,但是一個用戶只能進行順序的下單。在全局并發的場景下保證局部有序,保證最小事務單元操作的原子性。
針對上面的場景我們可以通過 KeyAffinityExecutor (KeyAffinityExecutor 是一個可以按照指定的Key親和順序消費的執行器) 來解決這個問題,我們下面一起來了解下 KeyAffinityExecutor 。
基本使用
導入依賴
<dependency>
<groupId>com.github.phantomthief</groupId>
<artifactId>more-lambdas</artifactId>
<version>0.1.55</version>
</dependency>
創建線程池
public class KeyAffinityExecutorTest {
@Test
public void submitTaskKeyAffinityExecutor() {
//線程池
KeyAffinityExecutor keyAffinityExecutor = KeyAffinityExecutor
.newSerializingExecutor(2, 200, "測試-%d");
//需要下單的信息
List<Order> orders = new ArrayList<>();
orders.add(new Order(1, "iPhone 16 Max"));
orders.add(new Order(1, "Thinking In Java"));
orders.add(new Order(1, "MengNiu Milk"));
orders.add(new Order(2, "Thinking In Java"));
orders.add(new Order(3, "HUAWEI 100P"));
orders.add(new Order(4, "XIAOMI 20"));
orders.add(new Order(5, "OPPO 98"));
orders.add(new Order(6, "HP EC80"));
orders.add(new Order(7, "BBK 100P"));
orders.add(new Order(8, "TCL 1380"));
orders.add(new Order(9, "CHANGHONG 32"));
orders.forEach(order -> keyAffinityExecutor.submit(order.getAccountId(), () -> {
System.out.println(Thread.currentThread() + " accountId:" + order.getAccountId() +
", skuNo:" + order.getSkuNo() + " checkout success!");
return null;
}));
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Assert.assertTrue(true);
}
@Data
@AllArgsConstructor
public static class Order {
long accountId;
String skuNo;
}
}
輸出結果如下:
Thread[測試-0,5,main] accountId:1, skuNo:iPhone 16 Max checkout success!
Thread[測試-1,5,main] accountId:2, skuNo:Thinking In Java checkout success!
Thread[測試-1,5,main] accountId:3, skuNo:HUAWEI 100P checkout success!
Thread[測試-1,5,main] accountId:4, skuNo:XIAOMI 20 checkout success!
Thread[測試-0,5,main] accountId:1, skuNo:Thinking In Java checkout success!
Thread[測試-1,5,main] accountId:6, skuNo:HP EC80 checkout success!
Thread[測試-0,5,main] accountId:1, skuNo:MengNiu Milk checkout success!
Thread[測試-1,5,main] accountId:8, skuNo:TCL 1380 checkout success!
Thread[測試-0,5,main] accountId:5, skuNo:OPPO 98 checkout success!
Thread[測試-0,5,main] accountId:7, skuNo:BBK 100P checkout success!
Thread[測試-0,5,main] accountId:9, skuNo:CHANGHONG 32 checkout success!
結論:對于 acccountId = 1 有三條數據都是在同一個線程下面執行,線程ID:測試-0 所以可以保證局部有序。
實現原理
- 選擇執行的線程池, 這里我們可以看到,如果當前 key 存在線程池就直接返回,如果不存在就創建,或者選擇一個任務比較少的線程池,這里可以保證任務分發的均勻性。
//通過 key 選出一個執行線程
@Nonnull
public V select(K key) {
int thisCount = count.getAsInt();
tryCheckCount(thisCount);
KeyRef keyRef = mapping.compute(key, (k, v) -> {
// 如果不存在就創建一個
if (v == null) {
if (usingRandom.test(thisCount)) {
do {
try {
v = new KeyRef(all.get(ThreadLocalRandom.current().nextInt(all.size())));
} catch (IndexOutOfBoundsException e) {
// ignore
}
} while (v == null);
} else {
v = all.stream()
.min(comparingInt(ValueRef::concurrency))
.map(KeyRef::new)
.orElseThrow(IllegalStateException::new);
}
}
v.incrConcurrency();
return v;
});
return keyRef.ref();
}
- 執行線程池的初始化, 這里的本質是創建只有一個線程的線程池。這樣就可以保證,任務被路由到同一個 key 下面,那么就可以保證順序執行。
static Supplier<ExecutorService> executor(String threadName, int queueBufferSize) {
return new Supplier<ExecutorService>() {
// ThreadFactory
private final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(threadName)
.build();
@Override
public ExecutorService get() {
LinkedBlockingQueue<Runnable> queue;
if (queueBufferSize > 0) {
// blockingQueue
queue = new LinkedBlockingQueue<Runnable>(queueBufferSize) {
@Override
public boolean offer(Runnable e) {
try {
//讓 offer 方法阻塞,
//為什么這么做可以看 ThreadPoolExecutor 1347 行
put(e);
return true;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
return false;
}
};
} else {
queue = new LinkedBlockingQueue<>();
}
//創建一個線程的線程池
return new ThreadPoolExecutor(1, 1, 0L, MILLISECONDS, queue, threadFactory);
}
};
}
- 最后任務執行完畢,回收線程。
//當每一個key執行完之后回收處理這個key的線程池.
public void finishCall(K key) {
//如果執行完畢后返回 null
mapping.computeIfPresent(key, (k, v) -> {
if (v.decrConcurrency()) {
return null;
} else {
return v;
}
});
}
總結,這里其實我們也可以通過只有一個線程的線程數組實現,來實現按照唯一key,進行 hash 路由。