RabbitMQ消息傳遞模式和NetCore案例
RabbitMQ中有四種基本的消息傳遞模式,它們是:
1. Direct模式:Exchange將消息路由到與Routing Key完全匹配的Queue中。
2. Fanout模式:Exchange將消息路由到所有與其綁定的Queue中。
3. Topic模式:Exchange將消息路由到所有與其綁定的Queue中,同時根據指定的通配符規則進行匹配,實現靈活的消息路由。
4. Header模式:Exchange不使用Routing Key進行消息路由,而是利用消息Header中設置的鍵值對進行路由。
下面是詳細介紹:
Direct模式
在Direct模式中,Exchange將消息路由到與Routing Key完全匹配的Queue中。這種模式下,可以使用RabbitMQ的默認交換機(direct類型),也可以創建自定義的交換機。
在生產者發送消息時,需要將消息指定一個Routing Key,該Routing Key與消費者綁定的隊列名稱相同,Exchange將消息路由到和該Routing Key相同的隊列中,消費者就可以獲取到隊列中的消息了。
Fanout模式
在Fanout模式中,Exchange將消息路由到所有與其綁定的Queue中。這種模式下,只能使用自定義的交換機(fanout類型),Exchange不會考慮Routing Key的情況,直接把消息分發給所有綁定的隊列。
Topic模式
在Topic模式中,Exchange將消息路由到所有與其綁定的Queue中,同時根據指定的通配符規則進行匹配,實現靈活的消息路由。這種模式下,可以使用自定義的交換機(topic類型)。
在生產者發送消息時,需要將消息指定一個Routing Key,而消費者則需要指定一個匹配模式(例如"*.logs"),當Exchange收到消息時,會根據Routing Key和通配符規則來判斷應該將消息發送給哪些隊列。
Header模式
在Header模式中,Exchange不使用Routing Key進行消息路由,而是利用消息Header中設置的鍵值對進行路由。這種模式下,可以使用自定義的交換機(header類型)。
在生產者發送消息時,需要指定一個包含鍵值對的Header,消費者則需要指定一組鍵值對,只有當消息Header中的鍵值對滿足消費者指定的條件時,才會將消息發送給消費者。
在Netcore開發的項目中引入RabbitMQ可以實現應用程序和其他服務之間的異步通信,這種方式可以大大提高系統的可靠性、擴展性和性能。下面給出一個案例來說明引入RabbitMQ的具體應用。
假設我們正在開發一個電商網站,當用戶下單時,需要通知訂單處理系統進行訂單處理和庫存管理。采用傳統同步方式,應用程序會等待訂單處理完成之后才繼續進行,這會降低應用程序的響應速度和吞吐量;另外,如果訂單處理系統出現故障或者繁忙,應用程序也會出現阻塞。而通過引入RabbitMQ,我們可以將訂單信息發送到一個隊列中,然后由訂單處理系統異步地從隊列中獲取訂單信息進行處理,這樣就可以使得應用程序能夠快速響應客戶請求同時保證訂單的處理不被阻塞。如果訂單處理系統出現故障或者繁忙,消息可以在隊列中等待并重試,這樣可以提高系統的可靠性。
以下是一個基于Netcore開發的電商網站采用RabbitMQ異步處理訂單的實現示例:
安裝RabbitMQ.Client
在Netcore項目中,我們可以通過NuGet包管理器安裝RabbitMQ.Client庫來引入RabbitMQ客戶端。
配置RabbitMQ連接信息
在appsettings.json文件中添加RabbitMQ連接信息的配置:
{
"RabbitMQ": {
"HostName": "localhost",
"UserName": "guest",
"Password": "guest",
"VirtualHost": "/"
}
}
創建RabbitMQ服務
創建一個名為RabbitMQService的服務,在這個服務中我們可以封裝一些RabbitMQ方法,比如發送消息到隊列等,具體實現如下:
using RabbitMQ.Client;
using System.Text;
public class RabbitMQService
{
private readonly IConfiguration _configuration;
public RabbitMQService(IConfiguration configuration)
{
_configuration = configuration;
}
public void SendMessage(string queueName, string message)
{
var factory = new ConnectionFactory()
{
HostName = _configuration["RabbitMQ:HostName"],
UserName = _configuration["RabbitMQ:UserName"],
Password = _configuration["RabbitMQ:Password"],
VirtualHost = _configuration["RabbitMQ:VirtualHost"]
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
}
}
}
}
創建訂單控制器
在Netcore項目中,我們可以創建一個名為OrderController的控制器,當用戶下單時,控制器通過調用RabbitMQService中的方法將訂單信息發送到一個隊列中,然后立即返回一個成功響應。
[ApiController]
public class OrderController : ControllerBase
{
private readonly RabbitMQService _rabbitMQService;
public OrderController(RabbitMQService rabbitMQService)
{
_rabbitMQService = rabbitMQService;
}
[HttpPost]
public IActionResult CreateOrder(Order order)
{
// 處理訂單邏輯
...
// 發送訂單消息到RabbitMQ
_rabbitMQService.SendMessage("order_queue", JsonConvert.SerializeObject(order));
return Ok("Order created successfully.");
}
}
創建訂單處理服務
創建一個名為OrderProcessingService的服務,用于從隊列中獲取訂單信息,并進行訂單處理和庫存管理等。具體實現如下:
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
public class OrderProcessingService : BackgroundService
{
private readonly IConfiguration _configuration;
public OrderProcessingService(IConfiguration configuration)
{
_configuration = configuration;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var factory = new ConnectionFactory()
{
HostName = _configuration["RabbitMQ:HostName"],
UserName = _configuration["RabbitMQ:UserName"],
Password = _configuration["RabbitMQ:Password"],
VirtualHost = _configuration["RabbitMQ:VirtualHost"]
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "order_queue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var order = JsonConvert.DeserializeObject<Order>(message);
Console.WriteLine($"Order processed: {order.OrderNumber}");
};
channel.BasicConsume(queue: "order_queue", autoAck: true, consumer: consumer);
await Task.Delay(Timeout.Infinite, stoppingToken);
}
}
}
在以上代碼中,我們使用了Netcore中的BackgroundService類來創建一個訂單處理服務。在ExecuteAsync方法中,我們通過創建RabbitMQ連接,然后從隊列中獲取訂單信息,并進行訂單處理。當消息被成功消費后,隊列會自動將消息刪除。
引入RabbitMQ可以實現應用程序和其他服務之間的異步通信,這種方式可以大大提高系統的可靠性、擴展性和性能。特別是在高并發的情況下,使用RabbitMQ可以避免系統響應變慢,提高系統的吞吐量。