成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

SpringBoot+Redis自定義注解實現發布訂閱

數據庫 Redis
我們內部組件抽象了兩個方法,生產和消費,但這兩個方法邏輯截然不同,生產方法是暴露給serverice層接口調用,調用方在調用生產方法后能直接知道生產了幾條數據和成功與否。而消費方法是配合Spring生命周期函數服務啟動時建立常駐消費線程的。

前言

最近開發了一個內部消息組件,邏輯大體是通過定義注解 @MessageHub,在啟動時掃描全部bean中有使用了該注解的方法后臺創建一個常駐線程代理消費數據,當線程消費到數據就回寫到對應加了注解的方法里。

@Slf4j
@Service
public class RedisConsumerDemo {
    @MessageHub(topic = "${uptown.topic}", type = "REDIS_PUBSUB")
    public void consumer(Object message) {
        log.info("pubsub info {} ", message);
    }   
}

實現redis的隊列、stream方式實現都很簡單,唯獨發布訂閱方式,網上的demo全都是一個固定套路,通過redis容器注入監聽器,而且回寫非常死板。那么如何將這塊的邏輯統一呢。

常規寫法

常規實現reids的發布訂閱模式寫法一共三步

1.創建消息監聽器

@Bean 
public MessageListenerAdapter smsExpirationListener(TestSubscriber messageListener) {
    return new MessageListenerAdapter(messageListener, "onMessage");
}

2.創建訂閱器

@Component
public class TestSubscriber implements MessageListener {
 
    @Override
    public void onMessage(Message message, byte[] pattern) {
        log.info("get data :{}", msg);
    }
 
}

3.向redis容器中添加消息監聽器

@Configuration
public class RedisConfig {

    @Bean
    public RedisMessageListenerContainer container(
        RedisConnectionFactory redisConnectionFactory,
        MessageListenerAdapter smsExpirationListener) {
    
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        container.addMessageListener(smsExpirationListener, new PatternTopic("test"));
        return container;
    }
}

這樣定義非常簡單明了,但是有個問題是太代碼僵硬了,創建監聽者很不靈活,只能指定內部的onMessage方法,那么怎么才能融入到我們的內部消息流轉中間件里呢。

自定義注解實現

我們內部組件抽象了兩個方法,生產和消費,但這兩個方法邏輯截然不同,生產方法是暴露給serverice層接口調用,調用方在調用生產方法后能直接知道生產了幾條數據和成功與否。而消費方法是配合Spring生命周期函數服務啟動時建立常駐消費線程的。

/**
 * 生產消息
 */
Integer producer(MessageForm messageForm);

/**
 * 消費消息
 */
void consumer(ConsumerAdapterForm adapterForm);

生產消息當然很容易實現,只需要調用已經封裝好的convertAndSend方法。

stringRedisTemplate.convertAndSend(messageForm.getTopic(), messageForm.getMessage());

消費方法就有說法了,動態生成監聽者的場景下使用redis容器用代碼挨個注冊已經滿足不了了,但仔細過一遍源代碼就會發現,監聽類的構造方法的入參只有兩個,第一個需要回調的代理類,第二個消費到數據后回調的方法。

/**
 * Create a new {@link MessageListenerAdapter} for the given delegate.
 *
 * @param delegate the delegate object
 * @param defaultListenerMethod method to call when a message comes
 * @see #getListenerMethodName
 */
public MessageListenerAdapter(Object delegate, String defaultListenerMethod) {
   this(delegate);
   setDefaultListenerMethod(defaultListenerMethod);
}

方案有了,本質上就是把RedisMessageListenerContainer注入進來之后,掃描項目里所有加了 @MessageHub 的bean,包裝成監聽類加載到容器里就完事了。

怎么掃描的代碼就不再贅述了,實現Spring的生命周期函數BeanPostProcessor#postProcessAfterInitialization,在這里用AnnotationUtils判斷是否標注了注解。

MessageHub annotation = AnnotationUtils.findAnnotation(method, MessageHub.class);
if (annotation == null) {
    continue;
}

標注了后判斷如果是發布訂閱,進入發布訂閱的實現類。

@Scope(proxyMode = ScopedProxyMode.TARGET_CLASS)
@Service("redisPubSubProcessor")
public class RedisPubSubProcessor extends MessageHubServiceImpl {

    @Resource
    RedisMessageListenerContainer redisPubSubContainer;



    @Override
    public void produce(ProducerAdapterForm producerAdapterForm) {
        stringRedisTemplate.convertAndSend(producerAdapterForm.getTopic(), producerAdapterForm.getMessage());
    }

    @Override
    public void consume(ConsumerAdapterForm messageForm) {
        MessageListenerAdapter adapter = new MessageListenerAdapter(messageForm.getBean(), messageForm.getInvokeMethod().getName());
        adapter.afterPropertiesSet();
        redisPubSubContainer.addMessageListener(adapter, new PatternTopic(messageForm.getTopic()));
    }


    @Bean
    public RedisMessageListenerContainer redisPubSubContainer(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }
}

先將RedisMessageListenerContainer注入到Spring容器里,produce方法只需要調用下現成的api。consume方法由于上一步我們獲取了bean和對應的method,直接用MessageListenerAdapter的構造器創建出監聽器來,這里有坑,需要手動調用adapter.afterPropertiesSet()設置一些必要的屬性,這個在常規寫法里框架幫忙做了。如果不調用的話會出一些空指針之類的bug。

隨后把監聽器add到容器就實現了方法代理,背后的線程監聽到數據會回調到標注了 @MessageHub 的方法里

責任編輯:武曉燕 來源: 一安未來
相關推薦

2024-10-09 10:46:41

springboot緩存redis

2023-10-09 07:37:01

2023-10-11 07:57:23

springboot微服務

2023-10-24 13:48:50

自定義注解舉值驗證

2021-02-20 11:40:35

SpringBoot占位符開發技術

2022-12-02 07:28:58

Event訂閱模式Spring

2024-11-15 10:39:11

2024-12-27 15:37:23

2021-12-30 12:30:01

Java注解編譯器

2017-08-03 17:00:54

Springmvc任務執行器

2022-02-17 07:10:39

Nest自定義注解

2024-10-14 17:18:27

2023-03-03 09:11:12

高并發SpringBoot

2022-12-13 09:19:06

高并發SpringBoot

2023-09-04 08:12:16

分布式鎖Springboot

2025-02-25 09:29:34

2025-03-05 10:49:32

2025-05-08 08:30:00

Redis自定義序列化數據庫

2024-04-03 09:18:03

Redis數據結構接口防刷

2009-09-07 22:00:15

LINQ自定義
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 夜夜摸夜夜操 | 亚洲欧洲一区二区 | 久久久久国产 | 99综合 | 久久久这里只有17精品 | 国产精品久久久久久婷婷天堂 | 欧美色性 | 黑人精品欧美一区二区蜜桃 | 欧美一级淫片免费视频黄 | 日韩中文字幕在线 | 一区视频| 欧美日韩国产一区 | 中文字幕日韩一区 | 麻豆毛片 | 日韩在线不卡 | 免费在线观看一区二区 | 精品国产乱码久久久久久闺蜜 | 91传媒在线观看 | av中文字幕网站 | 日韩中文字幕一区二区三区 | 毛片a级| 成人日韩精品 | 欧美a级成人淫片免费看 | 国产精品视频97 | 91人人爽| 中文字幕电影在线观看 | 成人日b视频 | a级黄色片在线观看 | a网站在线观看 | 国产精品免费在线 | 91在线精品视频 | 免费午夜视频在线观看 | 日日夜夜天天 | 亚洲精品美女在线观看 | 91久久| 亚洲欧洲日韩 | 美女视频黄色的 | 99精品一区二区三区 | 中文字幕成人av | 精品久久久久久亚洲综合网 | 成人三区|