成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

這樣實現(xiàn)異步線程間 數(shù)據(jù)傳遞,太優(yōu)雅了!

開發(fā) 前端
Spring Boot 自定義線程池實現(xiàn)異步開發(fā)相信看過陳某的文章都了解,但是在實際開發(fā)中需要在父子線程之間傳遞一些數(shù)據(jù),比如用戶信息,鏈路信息等等

Spring Boot 自定義線程池實現(xiàn)異步開發(fā)相信看過陳某的文章都了解,但是在實際開發(fā)中需要在父子線程之間傳遞一些數(shù)據(jù),比如用戶信息,鏈路信息等等

比如用戶登錄信息使用ThreadLocal存放保證線程隔離,代碼如下:

/**
 * @author 公眾號:碼猿技術(shù)專欄
 * @description 用戶上下文信息
 */
public class OauthContext {
    private static  final  ThreadLocal<LoginVal> loginValThreadLocal=new ThreadLocal<>();

    public static  LoginVal get(){
        return loginValThreadLocal.get();
    }
    public static void set(LoginVal loginVal){
        loginValThreadLocal.set(loginVal);
    }
    public static void clear(){
        loginValThreadLocal.remove();
    }
}

那么子線程想要獲取這個LoginVal如何做呢?

今天就來介紹幾種優(yōu)雅的方式實現(xiàn)Spring Boot 內(nèi)部的父子線程的數(shù)據(jù)傳遞。

圖片

1. 手動設(shè)置

每執(zhí)行一次異步線程都要分為兩步:

  1. 獲取父線程的LoginVal
  2. 將LoginVal設(shè)置到子線程,達(dá)到復(fù)用

代碼如下:

public void handlerAsync() {
        //1. 獲取父線程的loginVal
        LoginVal loginVal = OauthContext.get();
        log.info("父線程的值:{}",OauthContext.get());
        CompletableFuture.runAsync(()->{
            //2. 設(shè)置子線程的值,復(fù)用
           OauthContext.set(loginVal);
           log.info("子線程的值:{}",OauthContext.get());
        });
    }

雖然能夠?qū)崿F(xiàn)目的,但是每次開異步線程都需要手動設(shè)置,重復(fù)代碼太多,看了頭疼,你認(rèn)為優(yōu)雅嗎?

2. 線程池設(shè)置TaskDecorator

TaskDecorator是什么?官方api的大致意思:這是一個執(zhí)行回調(diào)方法的裝飾器,主要應(yīng)用于傳遞上下文,或者提供任務(wù)的監(jiān)控/統(tǒng)計信息。

知道有這么一個東西,如何去使用?

TaskDecorator是一個接口,首先需要去實現(xiàn)它,代碼如下:

/**
 * @author 公眾號:碼猿技術(shù)專欄
 * @description 上下文裝飾器
 */
public class ContextTaskDecorator implements TaskDecorator {
    @Override
    public Runnable decorate(Runnable runnable) {
        //獲取父線程的loginVal
        LoginVal loginVal = OauthContext.get();
        return () -> {
            try {
                // 將主線程的請求信息,設(shè)置到子線程中
                OauthContext.set(loginVal);
                // 執(zhí)行子線程,這一步不要忘了
                runnable.run();
            } finally {
                // 線程結(jié)束,清空這些信息,否則可能造成內(nèi)存泄漏
                OauthContext.clear();
            }
        };
    }
}

這里我只是設(shè)置了LoginVal,實際開發(fā)中其他的共享數(shù)據(jù),比如SecurityContext,RequestAttributes....

TaskDecorator需要結(jié)合線程池使用,實際開發(fā)中異步線程建議使用線程池,只需要在對應(yīng)的線程池配置一下,代碼如下:

@Bean("taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
        poolTaskExecutor.setCorePoolSize(xx);
        poolTaskExecutor.setMaxPoolSize(xx);
        // 設(shè)置線程活躍時間(秒)
        poolTaskExecutor.setKeepAliveSeconds(xx);
        // 設(shè)置隊列容量
        poolTaskExecutor.setQueueCapacity(xx);
        //設(shè)置TaskDecorator,用于解決父子線程間的數(shù)據(jù)復(fù)用
        poolTaskExecutor.setTaskDecorator(new ContextTaskDecorator());
        poolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 等待所有任務(wù)結(jié)束后再關(guān)閉線程池
        poolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        return poolTaskExecutor;
    }

此時業(yè)務(wù)代碼就不需要去設(shè)置子線程的值,直接使用即可,代碼如下:

public void handlerAsync() {
        log.info("父線程的用戶信息:{}", OauthContext.get());
        //執(zhí)行異步任務(wù),需要指定的線程池
        CompletableFuture.runAsync(()-> log.info("子線程的用戶信息:{}", OauthContext.get()),taskExecutor);
    }

來看一下結(jié)果,如下圖:

圖片

這里使用的是CompletableFuture執(zhí)行異步任務(wù),使用@Async這個注解同樣是可行的。

注意:無論使用何種方式,都需要指定線程池

3. InheritableThreadLocal

這種方案不建議使用,InheritableThreadLocal雖然能夠?qū)崿F(xiàn)父子線程間的復(fù)用,但是在線程池中使用會存在復(fù)用的問題,具體的可以看陳某之前的文章:微服務(wù)中使用阿里開源的TTL,優(yōu)雅的實現(xiàn)身份信息的線程間復(fù)用

這種方案使用也是非常簡單,直接用InheritableThreadLocal替換ThreadLocal即可,代碼如下:

/**
 * @author 公眾號:碼猿技術(shù)專欄
 * @description 用戶上下文信息
 */
public class OauthContext {
    private static  final  InheritableThreadLocal<LoginVal> loginValThreadLocal=new InheritableThreadLocal<>();

    public static  LoginVal get(){
        return loginValThreadLocal.get();
    }
    public static void set(LoginVal loginVal){
        loginValThreadLocal.set(loginVal);
    }
    public static void clear(){
        loginValThreadLocal.remove();
    }
}

4. TransmittableThreadLocal

TransmittableThreadLocal是阿里開源的工具,彌補(bǔ)了InheritableThreadLocal的缺陷,在使用線程池等會池化復(fù)用線程的執(zhí)行組件情況下,提供ThreadLocal值的傳遞功能,解決異步執(zhí)行時上下文傳遞的問題。

使用起來也是非常簡單,添加依賴如下:

<dependency>
 <groupId>com.alibaba</groupId>
 <artifactId>transmittable-thread-local</artifactId>
 <version>2.14.2</version>
</dependency>

OauthContext改造代碼如下:

/**
 * @author 公眾號:碼猿技術(shù)專欄
 * @description 用戶上下文信息
 */
public class OauthContext {
    private static  final TransmittableThreadLocal<LoginVal> loginValThreadLocal=new TransmittableThreadLocal<>();

    public static  LoginVal get(){
        return loginValThreadLocal.get();
    }
    public static void set(LoginVal loginVal){
        loginValThreadLocal.set(loginVal);
    }
    public static void clear(){
        loginValThreadLocal.remove();
    }
}

關(guān)于TransmittableThreadLocal想深入了解其原理可以看陳某之前的文章:微服務(wù)中使用阿里開源的TTL,優(yōu)雅的實現(xiàn)身份信息的線程間復(fù)用,應(yīng)用還是非常廣泛的

TransmittableThreadLocal原理

從定義來看,TransimittableThreadLocal繼承于InheritableThreadLocal,并實現(xiàn)TtlCopier接口,它里面只有一個copy方法。所以主要是對InheritableThreadLocal的擴(kuò)展。

public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> implements TtlCopier<T>

在TransimittableThreadLocal中添加holder屬性。這個屬性的作用就是被標(biāo)記為具備線程傳遞資格的對象都會被添加到這個對象中。

要標(biāo)記一個類,比較容易想到的方式,就是給這個類新增一個Type字段,還有一個方法就是將具備這種類型的的對象都添加到一個靜態(tài)全局集合中。之后使用時,這個集合里的所有值都具備這個標(biāo)記。

// 1. holder本身是一個InheritableThreadLocal對象
// 2. 這個holder對象的value是WeakHashMap<TransmittableThreadLocal<Object>, ?>
//   2.1 WeekHashMap的value總是null,且不可能被使用。
//    2.2 WeekHasshMap支持value=null
private static InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder = new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
  @Override
  protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {
    return new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
  }
 
  /**
   * 重寫了childValue方法,實現(xiàn)上直接將父線程的屬性作為子線程的本地變量對象。
   */
  @Override
  protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {
    return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue);
  }
};

應(yīng)用代碼是通過TtlExecutors工具類對線程池對象進(jìn)行包裝。工具類只是簡單的判斷,輸入的線程池是否已經(jīng)被包裝過、非空校驗等,然后返回包裝類ExecutorServiceTtlWrapper。根據(jù)不同的線程池類型,有不同和的包裝類。

@Nullable
public static ExecutorService getTtlExecutorService(@Nullable ExecutorService executorService) {
  if (TtlAgent.isTtlAgentLoaded() || executorService == null || executorService instanceof TtlEnhanced) {
    return executorService;
  }
  return new ExecutorServiceTtlWrapper(executorService);
}

進(jìn)入包裝類ExecutorServiceTtlWrapper。可以注意到不論是通過ExecutorServiceTtlWrapper#submit方法或者是ExecutorTtlWrapper#execute方法,都會將線程對象包裝成TtlCallable或者TtlRunnable,用于在真正執(zhí)行run方法前做一些業(yè)務(wù)邏輯。

/**
 * 在ExecutorServiceTtlWrapper實現(xiàn)submit方法
 */
@NonNull
@Override
public <T> Future<T> submit(@NonNull Callable<T> task) {
  return executorService.submit(TtlCallable.get(task));
}

/**
 * 在ExecutorTtlWrapper實現(xiàn)execute方法
 */
@Override
public void execute(@NonNull Runnable command) {
  executor.execute(TtlRunnable.get(command));
}

所以,重點的核心邏輯應(yīng)該是在TtlCallable#call()或者TtlRunnable#run()中。以下以TtlCallable為例,TtlRunnable同理類似。在分析call()方法之前,先看一個類Transmitter

public static class Transmitter {
  /**
    * 捕獲當(dāng)前線程中的是所有TransimittableThreadLocal和注冊ThreadLocal的值。
    */
  @NonNull
  public static Object capture() {
    return new Snapshot(captureTtlValues(), captureThreadLocalValues());
  }
 
    /**
    * 捕獲TransimittableThreadLocal的值,將holder中的所有值都添加到HashMap后返回。
    */
  private static HashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
    HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = 
      new HashMap<TransmittableThreadLocal<Object>, Object>();
    for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
      ttl2Value.put(threadLocal, threadLocal.copyValue());
    }
    return ttl2Value;
  }

  /**
    * 捕獲注冊的ThreadLocal的值,也就是原本線程中的ThreadLocal,可以注冊到TTL中,在
    * 進(jìn)行線程池本地變量傳遞時也會被傳遞。
    */
  private static HashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() {
    final HashMap<ThreadLocal<Object>, Object> threadLocal2Value = 
      new HashMap<ThreadLocal<Object>, Object>();
    for(Map.Entry<ThreadLocal<Object>,TtlCopier<Object>>entry:threadLocalHolder.entrySet()){
      final ThreadLocal<Object> threadLocal = entry.getKey();
      final TtlCopier<Object> copier = entry.getValue();
      threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get()));
    }
    return threadLocal2Value;
  }

  /**
    * 將捕獲到的本地變量進(jìn)行替換子線程的本地變量,并且返回子線程現(xiàn)有的本地變量副本backup。
    * 用于在執(zhí)行run/call方法之后,將本地變量副本恢復(fù)。
    */
  @NonNull
  public static Object replay(@NonNull Object captured) {
    final Snapshot capturedSnapshot = (Snapshot) captured;
    return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), 
                        replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
  }
 
  /**
    * 替換TransmittableThreadLocal
    */
  @NonNull
  private static HashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> captured) {
    // 創(chuàng)建副本backup
    HashMap<TransmittableThreadLocal<Object>, Object> backup = 
      new HashMap<TransmittableThreadLocal<Object>, Object>();

    for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
      TransmittableThreadLocal<Object> threadLocal = iterator.next();
      // 對當(dāng)前線程的本地變量進(jìn)行副本拷貝
      backup.put(threadLocal, threadLocal.get());

      // 若出現(xiàn)調(diào)用線程中不存在某個線程變量,而線程池中線程有,則刪除線程池中對應(yīng)的本地變量
      if (!captured.containsKey(threadLocal)) {
        iterator.remove();
        threadLocal.superRemove();
      }
    }
    // 將捕獲的TTL值打入線程池獲取到的線程TTL中。
    setTtlValuesTo(captured);
    // 是一個擴(kuò)展點,調(diào)用TTL的beforeExecute方法。默認(rèn)實現(xiàn)為空
    doExecuteCallback(true);
    return backup;
  }

  private static HashMap<ThreadLocal<Object>, Object> replayThreadLocalValues(@NonNull HashMap<ThreadLocal<Object>, Object> captured) {
    final HashMap<ThreadLocal<Object>, Object> backup = 
      new HashMap<ThreadLocal<Object>, Object>();
    for (Map.Entry<ThreadLocal<Object>, Object> entry : captured.entrySet()) {
      final ThreadLocal<Object> threadLocal = entry.getKey();
      backup.put(threadLocal, threadLocal.get());
      final Object value = entry.getValue();
      if (value == threadLocalClearMark) threadLocal.remove();
      else threadLocal.set(value);
    }
    return backup;
  }

  /**
    * 清除單線線程的所有TTL和TL,并返回清除之氣的backup
    */
  @NonNull
  public static Object clear() {
    final HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = 
      new HashMap<TransmittableThreadLocal<Object>, Object>();

    final HashMap<ThreadLocal<Object>, Object> threadLocal2Value = 
      new HashMap<ThreadLocal<Object>, Object>();
    for(Map.Entry<ThreadLocal<Object>,TtlCopier<Object>>entry:threadLocalHolder.entrySet()){
      final ThreadLocal<Object> threadLocal = entry.getKey();
      threadLocal2Value.put(threadLocal, threadLocalClearMark);
    }
    return replay(new Snapshot(ttl2Value, threadLocal2Value));
  }

  /**
    * 還原
    */
  public static void restore(@NonNull Object backup) {
    final Snapshot backupSnapshot = (Snapshot) backup;
    restoreTtlValues(backupSnapshot.ttl2Value);
    restoreThreadLocalValues(backupSnapshot.threadLocal2Value);
  }

  private static void restoreTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> backup) {
    // 擴(kuò)展點,調(diào)用TTL的afterExecute
    doExecuteCallback(false);

    for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
      TransmittableThreadLocal<Object> threadLocal = iterator.next();

      if (!backup.containsKey(threadLocal)) {
        iterator.remove();
        threadLocal.superRemove();
      }
    }

    // 將本地變量恢復(fù)成備份版本
    setTtlValuesTo(backup);
  }

  private static void setTtlValuesTo(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> ttlValues) {
    for (Map.Entry<TransmittableThreadLocal<Object>, Object> entry : ttlValues.entrySet()) {
      TransmittableThreadLocal<Object> threadLocal = entry.getKey();
      threadLocal.set(entry.getValue());
    }
  }

  private static void restoreThreadLocalValues(@NonNull HashMap<ThreadLocal<Object>, Object> backup) {
    for (Map.Entry<ThreadLocal<Object>, Object> entry : backup.entrySet()) {
      final ThreadLocal<Object> threadLocal = entry.getKey();
      threadLocal.set(entry.getValue());
    }
  }

  /**
   * 快照類,保存TTL和TL
   */
  private static class Snapshot {
    final HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value;
    final HashMap<ThreadLocal<Object>, Object> threadLocal2Value;

    private Snapshot(HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value,
                     HashMap<ThreadLocal<Object>, Object> threadLocal2Value) {
      this.ttl2Value = ttl2Value;
      this.threadLocal2Value = threadLocal2Value;
    }
  }

進(jìn)入TtlCallable#call()方法。

@Override
public V call() throws Exception {
  Object captured = capturedRef.get();
  if (captured == null || releaseTtlValueReferenceAfterCall && 
      !capturedRef.compareAndSet(captured, null)) {
    throw new IllegalStateException("TTL value reference is released after call!");
  }
  // 調(diào)用replay方法將捕獲到的當(dāng)前線程的本地變量,傳遞給線程池線程的本地變量,
  // 并且獲取到線程池線程覆蓋之前的本地變量副本。
  Object backup = replay(captured);
  try {
    // 線程方法調(diào)用
    return callable.call();
  } finally {
    // 使用副本進(jìn)行恢復(fù)。
    restore(backup);
  }
}

到這基本上線程池方式傳遞本地變量的核心代碼已經(jīng)大概看完了??偟膩碚f在創(chuàng)建TtlCallable對象是,調(diào)用capture()方法捕獲調(diào)用方的本地線程變量,在call()執(zhí)行時,將捕獲到的線程變量,替換到線程池所對應(yīng)獲取到的線程的本地變量中,并且在執(zhí)行完成之后,將其本地變量恢復(fù)到調(diào)用之前。

總結(jié)

上述列舉了4種方案,陳某這里推薦方案2和方案4,其中兩種方案的缺點非常明顯,實際開發(fā)中也是采用的方案2或者方案4

責(zé)任編輯:武曉燕 來源: 碼猿技術(shù)專欄
相關(guān)推薦

2025-02-05 14:28:19

2023-01-04 08:38:43

Spring異步線程

2024-04-30 08:05:15

Rust代碼計算

2021-02-09 09:51:58

異步傳遞數(shù)據(jù)

2024-08-06 09:43:54

Java 8工具編程

2023-12-20 10:04:45

線程池Java

2023-11-01 08:20:51

Intent數(shù)據(jù)傳遞對象

2023-11-09 08:01:41

Spring緩存注解

2025-03-03 08:49:59

2024-10-28 08:32:22

統(tǒng)一接口響應(yīng)SpringBoot響應(yīng)框架

2019-09-26 15:06:29

數(shù)據(jù)平臺架構(gòu)

2023-04-12 16:20:00

同步數(shù)據(jù)異步數(shù)據(jù)傳輸

2010-01-28 16:30:16

Android數(shù)據(jù)傳遞

2020-12-14 08:43:56

線程進(jìn)程資源

2025-01-09 11:24:59

線程池美團(tuán)動態(tài)配置中心

2024-09-03 10:44:32

2021-04-07 15:11:26

鴻蒙HarmonyOS應(yīng)用

2023-08-01 08:54:02

接口冪等網(wǎng)絡(luò)

2024-12-31 08:54:38

2025-04-09 02:02:00

Spring框架開發(fā)
點贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 日韩小视频 | 欧美精品久久久久久久久久 | 午夜成人免费视频 | 亚洲色欲色欲www | 亚洲综合在线播放 | 野狼在线社区2017入口 | 亚洲视频精品在线 | 久草在线 | 四虎成人av| 四季久久免费一区二区三区四区 | 亚洲欧美日韩久久 | 国产精品夜间视频香蕉 | 精品久久久久久久久久久久久 | 国产探花在线观看视频 | 高清国产午夜精品久久久久久 | 国产视频黄色 | 国产精品视频导航 | 久久夜视频 | 欧美激情精品久久久久 | 亚洲国产精品一区二区三区 | 夜夜草 | 国产欧美精品一区二区 | 国产欧美在线 | 久草青青草 | 干干干操操操 | www在线视频| 日韩视频一区二区 | 99热精品在线 | 视频在线亚洲 | 日韩欧美国产一区二区三区 | 日韩中文字幕一区二区 | 日韩伦理电影免费在线观看 | aaaaaa大片免费看最大的 | 无人区国产成人久久三区 | 一级毛片视频 | 日日摸夜夜爽人人添av | 九九热在线视频 | www国产成人 | 午夜手机在线视频 | 一区二区三区四区在线视频 | 羞羞视频网站 |