六種延遲隊列的實現方式,你知道幾種?
1. DelayQueue 延時隊列
DelayQueue 是 Java 并發包 java.util.concurrent 下的一個線程安全的阻塞隊列,它存儲的元素必須實現 Delayed 接口,以便計算元素的延時時間。隊列中的元素只有在其指定的延遲時間到達之后才能從隊列中取出。
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class Order implements Delayed {
private long time;
private String name;
public Order(String name, long delay, TimeUnit unit) {
this.name = name;
this.time = System.currentTimeMillis() + unit.toMillis(delay);
}
@Override
public long getDelay(TimeUnit unit) {
long delay = time - System.currentTimeMillis();
return unit.convert(delay, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed other) {
if (this.time < ((Order) other).time) {
return -1;
} else if (this.time > ((Order) other).time) {
return 1;
}
return 0;
}
}
public class DelayQueueDemo {
public static void main(String[] args) throws InterruptedException {
DelayQueue<Order> delayQueue = new DelayQueue<>();
delayQueue.put(new Order("Order1", 5, TimeUnit.SECONDS));
delayQueue.put(new Order("Order2", 10, TimeUnit.SECONDS));
delayQueue.put(new Order("Order3", 15, TimeUnit.SECONDS));
System.out.println("訂單延遲隊列開始時間:" + java.time.LocalDateTime.now());
while (delayQueue.size() != 0) {
Order order = delayQueue.take(); // 阻塞直到元素可用
System.out.format("訂單: %s 被取消, 取消時間: %s\n", order.name, java.time.LocalDateTime.now());
}
}
}
2. Quartz 定時任務
Quartz 是一個開源的任務調度庫,可以集成到幾乎任何Java應用中,用于定時執行任務。通過定義任務和觸發器,可以很容易地實現定時任務。
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;
public class QuartzJob extends QuartzJobBean {
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
System.out.println("執行定時任務: " + System.currentTimeMillis());
}
}
// 在 Spring 配置文件中配置 Quartz
// ...
3. Redis sorted set
Redis 的有序集合(sorted set)可以利用 score 來實現延時隊列。通過設置元素的 score 為過期時間戳,可以實現在特定時間自動過期并被消費。
import redis.clients.jedis.Jedis;
public class RedisDelayQueue {
private static final String DELAY_QUEUE = "delayQueue";
public void addToQueue(String key, long delaySeconds) {
double score = System.currentTimeMillis() / 1000 + delaySeconds;
new Jedis().zadd(DELAY_QUEUE, score, key);
}
public void consume() {
long now = System.currentTimeMillis() / 1000;
while (true) {
Set<String> keys = new Jedis().zrangeByScore(DELAY_QUEUE, 0, now);
for (String key : keys) {
new Jedis().zrem(DELAY_QUEUE, key);
System.out.println("消費元素: " + key);
}
if (keys.isEmpty()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
4. Redis 過期回調
Redis 可以配置過期事件通知,當一個鍵過期時,Redis 會發送一個事件通知給訂閱了該事件的客戶端。
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
public class RedisKeyExpirationListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
String expiredKey = new String(message.getBody());
System.out.println("監聽到key:" + expiredKey + "已過期");
}
}
// 在 Spring 配置中配置 RedisMessageListenerContainer
// ...
5. RabbitMQ 延時隊列
RabbitMQ 通過消息的 TTL(Time To Live)和死信交換機(DLX)來實現延時隊列。
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.core.Message;
public class RabbitMQDelayQueue {
private final RabbitTemplate rabbitTemplate;
public RabbitMQDelayQueue(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendDelayedMessage(String message, long delay) {
Message msg = new Message(message.getBytes(), new MessageProperties() {
{
setExpiration(String.valueOf(delay));
// 設置消息的其他屬性
}
});
rabbitTemplate.send("delayQueueExchange", "delayQueueRoutingKey", msg);
}
// 配置交換機、隊列和綁定
// ...
}
6. 時間輪算法
時間輪算法是一種高效的定時任務管理算法,Netty 提供了 HashedWheelTimer 來實現時間輪。
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
public class NettyDelayQueue {
public static void main(String[] args) {
Timer timer = new HashedWheelTimer();
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("任務執行: " + System.currentTimeMillis());
}
}, 5, TimeUnit.SECONDS);
}
}