一 RabbitMQ下載
RabbitMQ 官網最新版下載:
RabbitMQ: One broker to queue them all | RabbitMQ
RabbitMQ v3.13.6版本下載:
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.13.6/rabbitmq-server-3.13.6-1.el8.noarch.rpm
RabbitMQ依賴erlang-26.2.5.2-1.el7.x86_64.rpm下載:
https://github.com/rabbitmq/erlang-rpm/releases/download/v26.2.5.2/erlang-26.2.5.2-1.el7.x86_64.rpm
二 RabbitMQ安裝
1 安裝erlang環境
RabbitMQ前要先安裝erlang環境,因為RabbitMQ是用erlang開發的
執行安裝指令如下:
rpm -ivh erlang-26.2.5.2-1.el7.x86_64.rpm
執行後如下圖:
驗證 erlang 安裝是否成功,執行erl可以查看版本,説明安裝成功如下圖:
2 安裝RabbitMQ
執行安裝RabbitMQ指令如下:
rpm -ivh rabbitmq-server-3.13.6-1.el8.noarch.rpm
執行安裝中,如下圖:
注意:如果erlang環境沒有安裝好,或者版本與當前rabbitMQ不匹配則會報錯以下錯誤,提示需要指定範圍的依賴版本,如下圖:
如果出現上圖的錯誤,請參考上一步重新安裝erlang環境即可。
安裝結束後,消息隊列數據保存在哪?日記在哪?想了解更多的信息?
只需一條指令可查詢當前狀態信息:
rabbitmq-diagnostics status
執行後如下圖:
從上圖狀態中可以看出目前沒有使用任何配置文件,以可以看到以下有用的信息:
- 數據目錄: /var/lib/rabbitmq/mnesia/rabbit@server-c868cc62-54b3-4125-80a3-a809f3caff6b
- 日記文件:/var/log/rabbitmq/rabbit@server-c868cc62-54b3-4125-80a3-a809f3caff6b.log
上圖信息很詳細,可以説開發者開發這個工具非常的細心,對軟件有足夠了解使用也安心!
3 配置RabbitMQ(可選項)
安裝好後RabbitMQ沒有使用任何的配置文件(也沒有默認配置文件),但會生成一個空目錄位置在:/etc/rabbitmq/ ,在這裏你可以按照自己的需求參考官方網站配置自己的項目,格式支持有多種,下面我這裏要變更默認端口為例創建一個配置文件:
vi /etc/rabbitmq/rabbitmq.config
配置文件內容:
[
{rabbit, [{tcp_listeners, [{"0.0.0.0", 51091}]}]},
{rabbitmq_management, [
{listener, [{port,59876}, {ssl, false}]}
]}
].
通過配置配置文件實現變更:
- 客户端 51091 用於消費或生產端連接,IP 0.0.0.0 代表綁定服務器內外網IP。
- 管理端口 59876 用於RabbitMQ的Web管理。
再次執行 rabbitmq-diagnostics status 查看新增的配置文件是否被使用,如下圖:
上圖可以看到剛剛創建的配置文件已被引用狀態。
4 RabbitMQ 啓動與關閉
RabbitMQ安裝好後最終是服務狀態,可以通過服務管理控制:
#啓動
systemctl start rabbitmq-server
#停止關閉
systemctl stop rabbitmq-server
#重啓
systemctl restart rabbitmq-server
#開機啓動
systemctl enable rabbitmq-server
#查看狀態
systemctl status rabbitmq-server
操作如下圖:
5 開啓RabbitMQ的Web管理界面(可選項,強烈建議開啓)
RabbitMQ的安裝後自帶Web管理界面,但是需要執行以下指令開啓:
rabbitmq-plugins enable rabbitmq_management
我們平時只需要一名管員即可,後面要增加用户或設置權限直接在Web操作即可。
新增一位 RabbitMQ的Web管理員並增加設置管理權限 ,用於管理RabbitMQ.
#新增人員
rabbitmqctl add_user hua abc123uuPP
#設置權限
rabbitmqctl set_permissions -p / hua ".*" ".*" ".*"
#設置為管理員
rabbitmqctl set_user_tags hua administrator
* 表示授予該用户對該虛擬主機上所有隊列和交換機的 configure、write 和 read 權限。
- 第一個
".*"表示用户可以配置任意隊列和交換機。 - 第二個
".*"表示用户可以向任意隊列和交換機發送消息。 - 第三個
".*"表示用户可以從任意隊列中消費消息。
執行過程如圖:
執行上面命令增加一個Web管理員:
- 用户名稱:hua
- 密碼:abc123uuPP
- 權限 :管理員
本地localhost登陸RabbitWeb管理平台,用默認的賬號登陸即可:
- 默認用户:guest
- 默認密碼:guest
三 RabbitMQ Web 管理
1 RabbitMQ Web 登陸
進入RabbitMQ Web 登陸頁面如下:
首先我們使用默認賬號密碼嘗試登陸,為了安全確實限制本地登陸,如下圖:
使用上面新建的賬號hua登陸,登陸成功如下圖:
2 用户管理
用户管理,用户增加操作簡單,如下圖:
用户管理,用户權限設置操作簡單,如下圖:
用户操作界面非常人性化,可以很方便設置權限,修改用户資料。
3 虛擬主機(重要)
虛擬主機(vhost)是 RabbitMQ 中的一種邏輯隔離機制,它相當於一個獨立的命名空間。每個虛擬主機內部可以擁有自己獨立的隊列、交換機、綁定等資源,彼此之間相互隔離,不能共享資源。
- 命名空間:每個虛擬主機都有自己的隊列、交換機、綁定等資源。
- 資源隔離:不同虛擬主機之間的資源(如隊列和交換機)完全隔離,防止不同應用間的資源衝突。
- 用户權限:不同的用户可以被授予不同虛擬主機的訪問權限,確保用户只能訪問指定的虛擬主機中的資源。
虛擬主機提供了一種隔離和權限管理的方式,適用於以下場景:
- 多租户架構:在 SaaS(軟件即服務)或多租户應用中,你可以為不同的租户創建不同的虛擬主機,以確保數據隔離。
- 開發與生產環境隔離:你可以為開發環境和生產環境創建不同的虛擬主機,避免資源衝突和干擾。
- 權限管理:不同的用户或應用可以通過虛擬主機進行權限分離,確保只有特定用户才能訪問某些資源。
默認虛擬主機
RabbitMQ 默認創建一個虛擬主機 /,這是一個特殊的虛擬主機,通常用於測試或默認情況下的資源管理。生產環境中,建議創建和使用新的虛擬主機,以更好地管理資源和權限。
虛擬主機操作也非常簡單,如下圖:
在用户管理界面選擇用户綁定指定的虛擬主機,非常方便,如下圖:
功能強大,非常好用。
四 java代碼接入
方式一 java通用:
1 引入mvn依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
JAVA 連接RabbitMQ生產消息與接收消費測試代碼:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author hua
* @date 2024-08-21 18:01
*/
public class TestRabbitMQ {
private final static String QUEUE_NAME = "hello";
public static void main1(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("xx.xx.xx.xx");
factory.setPort(51091);
factory.setUsername("java_producer");
factory.setPassword("java_producer");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] argv) throws Exception {
// 創建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("xx.xx.xx.xx");
factory.setPort(51091);
factory.setUsername("java_consumer");
factory.setPassword("java_consumer");
// 連接到 RabbitMQ 服務器
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 聲明隊列(確保隊列存在)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 定義回調函數,當有消息送達時執行
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// 消費消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
}
測試運行發送消息,發送成功。如下圖:
測試運行接收消息,消費成功。如下圖:
上面測試通過後,改成服務類方便生產環境使用來發送消息代碼:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author hua
* @date 2024-08-22
*/
@Service
public class RabbitMqServiceImpl {
private static final Logger logger = LogManager.getLogger(RabbitMqServiceImpl.class);
private static final String QUEUE_NAME = "test";
private Connection connection;
private Channel channel;
public RabbitMqServiceImpl() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("xx.xx.xx.xx");
factory.setPort(51091);
factory.setUsername("java_producer");
factory.setPassword("java_producer");
//如果不指定虛擬機默認會使用/
factory.setVirtualHost("test");
try {
this.connection = factory.newConnection();
this.channel = connection.createChannel();
this.channel.queueDeclare(QUEUE_NAME, false, false, false, null);
logger.info("RabbitMqServiceImpl initialized successfully.");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
logger.error("Failed to initialize RabbitMqServiceImpl: {}", e.getMessage());
throw new RuntimeException("Failed to initialize RabbitMqServiceImpl", e);
}
}
public void sendMessage(String message) {
try {
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
} catch (IOException e) {
e.printStackTrace();
logger.error("Failed to send message: {}", e.getMessage());
}
}
public void close() {
try {
if (channel != null && channel.isOpen()) {
channel.close();
}
if (connection != null && connection.isOpen()) {
connection.close();
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
上面的代碼存在問題,未確認發送成功,有丟失風險,再改善如下:
import com.qyhua.common.table.db_hex_fail_log.entity.DbHexFailLog;
import com.qyhua.common.table.db_hex_fail_log.service.impl.DbHexFailLogServiceImpl;
import com.rabbitmq.client.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;
/**
* @author hua
* @date 2024-08-22
*/
@Service
public class RabbitMqServiceImpl {
private static final Logger logger = LogManager.getLogger(RabbitMqServiceImpl.class);
private static final String QUEUE_NAME = "hex_kyc";
private Connection connection;
private Channel channel;
//存放所有消息,確認時刪除,沒確認的保存到數據庫
private ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
@Autowired
DbHexFailLogServiceImpl dbHexFailLogService;
@PostConstruct
public void init() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("xx.xx.xx.xx");
factory.setPort(xxx);
factory.setUsername("java_producer");
factory.setPassword("java_producer");
factory.setVirtualHost("xxxx");
factory.setConnectionTimeout(3000);
try {
this.connection = factory.newConnection();
this.channel = connection.createChannel();
this.channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 啓用發佈者確認模式
this.channel.confirmSelect();
setupConfirmListener();
logger.info("RabbitMqServiceImpl initialized successfully.");
} catch (IOException | TimeoutException e) {
logger.error("Failed to initialize RabbitMqServiceImpl: {}", e.getMessage(), e);
throw new RuntimeException("Failed to initialize RabbitMqServiceImpl", e);
}
}
public void sendMessage(String message) {
try {
long nextSeqNo = channel.getNextPublishSeqNo();
outstandingConfirms.put(nextSeqNo, message);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
logger.info(" [x] Sent '{}'", message);
} catch (Exception e) {
logger.error("Failed to send message: {}", e.getMessage(), e);
saveFailedMessageToDatabase(message,"CF");
}
}
// 設置接收監聽器,記錄未確認的消息
private void setupConfirmListener() {
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
if (multiple) {
outstandingConfirms.headMap(deliveryTag + 1).clear();
} else {
outstandingConfirms.remove(deliveryTag);
}
System.out.println("Message confirmed ok deliveryTag="+deliveryTag);
};
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
if (multiple) {
// 獲取從起點到 `deliveryTag + 1` 之間的所有未確認的消息
ConcurrentNavigableMap<Long, String> unconfirmedMessages = outstandingConfirms.headMap(deliveryTag + 1);
List<String> FailList= new ArrayList<>();
for (Map.Entry<Long, String> entry : unconfirmedMessages.entrySet()) {
String failedMessage = entry.getValue();
logger.error("Message not confirmed: deliveryTag={}, message={}", entry.getKey(), failedMessage);
FailList.add(failedMessage);
}
saveFailedMessageToDatabaseBy(FailList); // 批量保存到數據庫
unconfirmedMessages.clear(); // 清除這些未確認的消息
} else {
String failedMessage = outstandingConfirms.get(deliveryTag);
logger.error("Message not confirmed: deliveryTag={}, message={}", deliveryTag, failedMessage);
saveFailedMessageToDatabase(failedMessage,"SF");
outstandingConfirms.remove(deliveryTag); // 移除單條未確認的消息
}
};
channel.addConfirmListener(ackCallback, nackCallback);
}
private void saveFailedMessageToDatabaseBy(List<String> failList) {
List<DbHexFailLog> list=new ArrayList<>(failList.size());
LocalDateTime now = LocalDateTime.now();
for (String message : failList) {
DbHexFailLog f=new DbHexFailLog();
f.setInHexStr(message);
f.setCtime(now);
f.setFlag("SF");
list.add(f);
}
dbHexFailLogService.saveBatch(list,list.size());
failList.clear();
}
private void saveFailedMessageToDatabase(String message,String flag) {
DbHexFailLog f=new DbHexFailLog();
f.setInHexStr(message);
f.setCtime(LocalDateTime.now());
f.setFlag(flag);
dbHexFailLogService.save(f);
}
@PreDestroy
public void close() {
try {
if (channel != null && channel.isOpen()) {
channel.close();
}
if (connection != null && connection.isOpen()) {
connection.close();
}
logger.info("RabbitMqServiceImpl resources closed successfully.");
} catch (IOException | TimeoutException e) {
logger.error("Failed to close RabbitMqServiceImpl resources: {}", e.getMessage(), e);
}
}
}
上面的代碼優化後,主要增加了三項如下:
1 Publisher Confirms 機制:
- 啓用
channel.confirmSelect()來激活發佈者確認模式。 - 使用
ConfirmCallback和NackCallback來處理消息的確認與未確認邏輯。 - 未確認的消息會被保存到數據庫中。
2 保存失敗的消息到數據庫。
3 在 @PreDestroy 方法中關閉 Channel 和 Connection,確保服務銷燬時正確關閉資源。
方式二 SpringBoot框架使用
mvn依賴包:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring配置文件:
Spring:
rabbitmq:
host: xx.xx.xx.xx
port: 51091
username: java_consumer
password: java_consumer
virtual-host: hellow
connection-timeout: 6000
JAVA代碼:
發送消息java代碼:
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author hua
* @date 2024-08-22
*/
@Component
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Queue queue;
public void sendMessage(String message) {
rabbitTemplate.convertAndSend(queue.getName(), message);
System.out.println(" [x] Sent '" + message + "'");
}
}
接收消息java代碼:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author hua
* @date 2024-08-22
*/
@Component
public class RabbitListener {
private static final Logger logger = LogManager.getLogger(RabbitListener.class);
@RabbitListener(queues = "test")
public void receiveMessage(String message) {
try {
System.out.println("rabbit rev <- "+message);
//具體業務
} catch (Exception e) {
e.printStackTrace();
logger.error("rabbit err= ", e);
}
}
}
上面代碼在生產發送消息時通過編碼方式更靈活,接收直接使用註解更簡單。