SpringBoot+Redis自定義注解實現發布訂閱
前言
最近開發了一個內部消息組件,邏輯大體是通過定義注解 @MessageHub,在啟動時掃描全部bean中有使用了該注解的方法后臺創建一個常駐線程代理消費數據,當線程消費到數據就回寫到對應加了注解的方法里。
@Slf4j
@Service
public class RedisConsumerDemo {
@MessageHub(topic = "${uptown.topic}", type = "REDIS_PUBSUB")
public void consumer(Object message) {
log.info("pubsub info {} ", message);
}
}
實現redis的隊列、stream方式實現都很簡單,唯獨發布訂閱方式,網上的demo全都是一個固定套路,通過redis容器注入監聽器,而且回寫非常死板。那么如何將這塊的邏輯統一呢。
常規寫法
常規實現reids的發布訂閱模式寫法一共三步
1.創建消息監聽器
@Bean
public MessageListenerAdapter smsExpirationListener(TestSubscriber messageListener) {
return new MessageListenerAdapter(messageListener, "onMessage");
}
2.創建訂閱器
@Component
public class TestSubscriber implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
log.info("get data :{}", msg);
}
}
3.向redis容器中添加消息監聽器
@Configuration
public class RedisConfig {
@Bean
public RedisMessageListenerContainer container(
RedisConnectionFactory redisConnectionFactory,
MessageListenerAdapter smsExpirationListener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
container.addMessageListener(smsExpirationListener, new PatternTopic("test"));
return container;
}
}
這樣定義非常簡單明了,但是有個問題是太代碼僵硬了,創建監聽者很不靈活,只能指定內部的onMessage方法,那么怎么才能融入到我們的內部消息流轉中間件里呢。
自定義注解實現
我們內部組件抽象了兩個方法,生產和消費,但這兩個方法邏輯截然不同,生產方法是暴露給serverice層接口調用,調用方在調用生產方法后能直接知道生產了幾條數據和成功與否。而消費方法是配合Spring生命周期函數服務啟動時建立常駐消費線程的。
/**
* 生產消息
*/
Integer producer(MessageForm messageForm);
/**
* 消費消息
*/
void consumer(ConsumerAdapterForm adapterForm);
生產消息當然很容易實現,只需要調用已經封裝好的convertAndSend方法。
stringRedisTemplate.convertAndSend(messageForm.getTopic(), messageForm.getMessage());
消費方法就有說法了,動態生成監聽者的場景下使用redis容器用代碼挨個注冊已經滿足不了了,但仔細過一遍源代碼就會發現,監聽類的構造方法的入參只有兩個,第一個需要回調的代理類,第二個消費到數據后回調的方法。
/**
* Create a new {@link MessageListenerAdapter} for the given delegate.
*
* @param delegate the delegate object
* @param defaultListenerMethod method to call when a message comes
* @see #getListenerMethodName
*/
public MessageListenerAdapter(Object delegate, String defaultListenerMethod) {
this(delegate);
setDefaultListenerMethod(defaultListenerMethod);
}
方案有了,本質上就是把RedisMessageListenerContainer注入進來之后,掃描項目里所有加了 @MessageHub 的bean,包裝成監聽類加載到容器里就完事了。
怎么掃描的代碼就不再贅述了,實現Spring的生命周期函數BeanPostProcessor#postProcessAfterInitialization,在這里用AnnotationUtils判斷是否標注了注解。
MessageHub annotation = AnnotationUtils.findAnnotation(method, MessageHub.class);
if (annotation == null) {
continue;
}
標注了后判斷如果是發布訂閱,進入發布訂閱的實現類。
@Scope(proxyMode = ScopedProxyMode.TARGET_CLASS)
@Service("redisPubSubProcessor")
public class RedisPubSubProcessor extends MessageHubServiceImpl {
@Resource
RedisMessageListenerContainer redisPubSubContainer;
@Override
public void produce(ProducerAdapterForm producerAdapterForm) {
stringRedisTemplate.convertAndSend(producerAdapterForm.getTopic(), producerAdapterForm.getMessage());
}
@Override
public void consume(ConsumerAdapterForm messageForm) {
MessageListenerAdapter adapter = new MessageListenerAdapter(messageForm.getBean(), messageForm.getInvokeMethod().getName());
adapter.afterPropertiesSet();
redisPubSubContainer.addMessageListener(adapter, new PatternTopic(messageForm.getTopic()));
}
@Bean
public RedisMessageListenerContainer redisPubSubContainer(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}
先將RedisMessageListenerContainer注入到Spring容器里,produce方法只需要調用下現成的api。consume方法由于上一步我們獲取了bean和對應的method,直接用MessageListenerAdapter的構造器創建出監聽器來,這里有坑,需要手動調用adapter.afterPropertiesSet()設置一些必要的屬性,這個在常規寫法里框架幫忙做了。如果不調用的話會出一些空指針之類的bug。
隨后把監聽器add到容器就實現了方法代理,背后的線程監聽到數據會回調到標注了 @MessageHub 的方法里