一 RabbitMQ下載

RabbitMQ 官網最新版下載:

RabbitMQ: One broker to queue them all | RabbitMQ

RabbitMQ v3.13.6版本下載:

https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.13.6/rabbitmq-server-3.13.6-1.el8.noarch.rpm

RabbitMQ依賴erlang-26.2.5.2-1.el7.x86_64.rpm下載:

https://github.com/rabbitmq/erlang-rpm/releases/download/v26.2.5.2/erlang-26.2.5.2-1.el7.x86_64.rpm

二 RabbitMQ安裝

   1 安裝erlang環境

RabbitMQ前要先安裝erlang環境,因為RabbitMQ是用erlang開發的

  執行安裝指令如下:

rpm -ivh erlang-26.2.5.2-1.el7.x86_64.rpm

   執行後如下圖:

rabbitmq環境安裝&java調用小例子_mark223的技術博客_java

  驗證 erlang 安裝是否成功,執行erl可以查看版本,説明安裝成功如下圖:

 

rabbitmq環境安裝&java調用小例子_mark223的技術博客_#服務器_02

2 安裝RabbitMQ

  執行安裝RabbitMQ指令如下:

rpm -ivh rabbitmq-server-3.13.6-1.el8.noarch.rpm

  執行安裝中,如下圖: 

rabbitmq環境安裝&java調用小例子_mark223的技術博客_#運維_03

 注意:如果erlang環境沒有安裝好,或者版本與當前rabbitMQ不匹配則會報錯以下錯誤,提示需要指定範圍的依賴版本,如下圖: 

rabbitmq環境安裝&java調用小例子_mark223的技術博客_虛擬主機_04

如果出現上圖的錯誤,請參考上一步重新安裝erlang環境即可。

 安裝結束後,消息隊列數據保存在哪?日記在哪?想了解更多的信息?

只需一條指令可查詢當前狀態信息:

rabbitmq-diagnostics status

執行後如下圖:

rabbitmq環境安裝&java調用小例子_mark223的技術博客_erlang_05

rabbitmq環境安裝&java調用小例子_mark223的技術博客_#運維_06

從上圖狀態中可以看出目前沒有使用任何配置文件,以可以看到以下有用的信息:

  • 數據目錄: /var/lib/rabbitmq/mnesia/rabbit@server-c868cc62-54b3-4125-80a3-a809f3caff6b
  • 日記文件:/var/log/rabbitmq/rabbit@server-c868cc62-54b3-4125-80a3-a809f3caff6b.log

 上圖信息很詳細,可以説開發者開發這個工具非常的細心,對軟件有足夠了解使用也安心!

3 配置RabbitMQ(可選項)

  安裝好後RabbitMQ沒有使用任何的配置文件(也沒有默認配置文件),但會生成一個空目錄位置在:/etc/rabbitmq/ ,在這裏你可以按照自己的需求參考官方網站配置自己的項目,格式支持有多種,下面我這裏要變更默認端口為例創建一個配置文件:

vi /etc/rabbitmq/rabbitmq.config

配置文件內容:

[
  {rabbit, [{tcp_listeners, [{"0.0.0.0", 51091}]}]},
  {rabbitmq_management, [
  {listener, [{port,59876}, {ssl, false}]}
  ]}
].

  通過配置配置文件實現變更:

  1.   客户端 51091 用於消費或生產端連接,IP 0.0.0.0 代表綁定服務器內外網IP。
  2.   管理端口 59876 用於RabbitMQ的Web管理。

再次執行 rabbitmq-diagnostics status 查看新增的配置文件是否被使用,如下圖:

rabbitmq環境安裝&java調用小例子_mark223的技術博客_#運維_07

 上圖可以看到剛剛創建的配置文件已被引用狀態。

4 RabbitMQ 啓動與關閉

   RabbitMQ安裝好後最終是服務狀態,可以通過服務管理控制:

#啓動
systemctl start rabbitmq-server

#停止關閉
systemctl stop rabbitmq-server

#重啓
systemctl restart rabbitmq-server

#開機啓動
systemctl enable rabbitmq-server

#查看狀態
systemctl status rabbitmq-server

  操作如下圖:

rabbitmq環境安裝&java調用小例子_mark223的技術博客_虛擬主機_08

5 開啓RabbitMQ的Web管理界面(可選項,強烈建議開啓)

RabbitMQ的安裝後自帶Web管理界面,但是需要執行以下指令開啓:

rabbitmq-plugins enable rabbitmq_management

 我們平時只需要一名管員即可,後面要增加用户或設置權限直接在Web操作即可。

   新增一位 RabbitMQ的Web管理員並增加設置管理權限 ,用於管理RabbitMQ.

#新增人員
rabbitmqctl add_user hua abc123uuPP

#設置權限
rabbitmqctl set_permissions -p / hua ".*" ".*" ".*"

#設置為管理員
rabbitmqctl set_user_tags hua administrator

* 表示授予該用户對該虛擬主機上所有隊列和交換機的 configurewriteread 權限。

  • 第一個 ".*" 表示用户可以配置任意隊列和交換機。
  • 第二個 ".*" 表示用户可以向任意隊列和交換機發送消息。
  • 第三個 ".*" 表示用户可以從任意隊列中消費消息。

 執行過程如圖:

rabbitmq環境安裝&java調用小例子_mark223的技術博客_虛擬主機_09

執行上面命令增加一個Web管理員:

  • 用户名稱:hua
  • 密碼:abc123uuPP
  • 權限 :管理員

本地localhost登陸RabbitWeb管理平台,用默認的賬號登陸即可:

  • 默認用户:guest
  • 默認密碼:guest

三 RabbitMQ Web 管理

1 RabbitMQ Web 登陸

 進入RabbitMQ Web 登陸頁面如下:

rabbitmq環境安裝&java調用小例子_mark223的技術博客_#服務器_10

首先我們使用默認賬號密碼嘗試登陸,為了安全確實限制本地登陸,如下圖:

rabbitmq環境安裝&java調用小例子_mark223的技術博客_erlang_11

使用上面新建的賬號hua登陸,登陸成功如下圖:

rabbitmq環境安裝&java調用小例子_mark223的技術博客_erlang_12

2 用户管理

 用户管理,用户增加操作簡單,如下圖:

rabbitmq環境安裝&java調用小例子_mark223的技術博客_#運維_13

  用户管理,用户權限設置操作簡單,如下圖:

rabbitmq環境安裝&java調用小例子_mark223的技術博客_#運維_14

用户操作界面非常人性化,可以很方便設置權限,修改用户資料。 

3 虛擬主機(重要)

虛擬主機(vhost)是 RabbitMQ 中的一種邏輯隔離機制,它相當於一個獨立的命名空間。每個虛擬主機內部可以擁有自己獨立的隊列、交換機、綁定等資源,彼此之間相互隔離,不能共享資源。

  • 命名空間:每個虛擬主機都有自己的隊列、交換機、綁定等資源。
  • 資源隔離:不同虛擬主機之間的資源(如隊列和交換機)完全隔離,防止不同應用間的資源衝突。
  • 用户權限:不同的用户可以被授予不同虛擬主機的訪問權限,確保用户只能訪問指定的虛擬主機中的資源。

虛擬主機提供了一種隔離和權限管理的方式,適用於以下場景:

  • 多租户架構:在 SaaS(軟件即服務)或多租户應用中,你可以為不同的租户創建不同的虛擬主機,以確保數據隔離。
  • 開發與生產環境隔離:你可以為開發環境和生產環境創建不同的虛擬主機,避免資源衝突和干擾。
  • 權限管理:不同的用户或應用可以通過虛擬主機進行權限分離,確保只有特定用户才能訪問某些資源。

默認虛擬主機

RabbitMQ 默認創建一個虛擬主機 /,這是一個特殊的虛擬主機,通常用於測試或默認情況下的資源管理。生產環境中,建議創建和使用新的虛擬主機,以更好地管理資源和權限。 

虛擬主機操作也非常簡單,如下圖:

rabbitmq環境安裝&java調用小例子_mark223的技術博客_虛擬主機_15

 在用户管理界面選擇用户綁定指定的虛擬主機,非常方便,如下圖: 

rabbitmq環境安裝&java調用小例子_mark223的技術博客_虛擬主機_16

功能強大,非常好用。 

四 java代碼接入

  方式一 java通用:

  1 引入mvn依賴

<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.20.0</version>
        </dependency>

  JAVA 連接RabbitMQ生產消息與接收消費測試代碼:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author hua
 * @date 2024-08-21 18:01
 */
public class TestRabbitMQ {

    private final static String QUEUE_NAME = "hello";

    public static void main1(String[] args) {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("xx.xx.xx.xx");
        factory.setPort(51091);
        factory.setUsername("java_producer");
        factory.setPassword("java_producer");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");

        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }


    public static void main(String[] argv) throws Exception {
        // 創建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("xx.xx.xx.xx");
        factory.setPort(51091);
        factory.setUsername("java_consumer");
        factory.setPassword("java_consumer");
        // 連接到 RabbitMQ 服務器
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 聲明隊列(確保隊列存在)
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            // 定義回調函數,當有消息送達時執行
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };

            // 消費消息
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        }
    }
}

 測試運行發送消息,發送成功。如下圖:

 

rabbitmq環境安裝&java調用小例子_mark223的技術博客_#服務器_17

 測試運行接收消息,消費成功。如下圖:

rabbitmq環境安裝&java調用小例子_mark223的技術博客_erlang_18

 上面測試通過後,改成服務類方便生產環境使用來發送消息代碼:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author hua
 * @date 2024-08-22
 */
@Service
public class RabbitMqServiceImpl {

    private static final Logger logger = LogManager.getLogger(RabbitMqServiceImpl.class);

    private static final String QUEUE_NAME = "test";
    private Connection connection;
    private Channel channel;

    public RabbitMqServiceImpl() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("xx.xx.xx.xx");
        factory.setPort(51091);
        factory.setUsername("java_producer");
        factory.setPassword("java_producer");
        //如果不指定虛擬機默認會使用/
        factory.setVirtualHost("test");


        try {
            this.connection = factory.newConnection();
            this.channel = connection.createChannel();
            this.channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            logger.info("RabbitMqServiceImpl initialized successfully.");
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
            logger.error("Failed to initialize RabbitMqServiceImpl: {}", e.getMessage());
            throw new RuntimeException("Failed to initialize RabbitMqServiceImpl", e);
        }
    }

    public void sendMessage(String message) {
        try {
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        } catch (IOException e) {
            e.printStackTrace();
            logger.error("Failed to send message: {}", e.getMessage());
        }
    }

    public void close() {
        try {
            if (channel != null && channel.isOpen()) {
                channel.close();
            }
            if (connection != null && connection.isOpen()) {
                connection.close();
            }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

   上面的代碼存在問題,未確認發送成功,有丟失風險,再改善如下:

import com.qyhua.common.table.db_hex_fail_log.entity.DbHexFailLog;
import com.qyhua.common.table.db_hex_fail_log.service.impl.DbHexFailLogServiceImpl;
import com.rabbitmq.client.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;

/**
 * @author hua
 * @date 2024-08-22
 */
@Service
public class RabbitMqServiceImpl {

    private static final Logger logger = LogManager.getLogger(RabbitMqServiceImpl.class);
    private static final String QUEUE_NAME = "hex_kyc";

    private Connection connection;
    private Channel channel;

    //存放所有消息,確認時刪除,沒確認的保存到數據庫
    private ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();

    @Autowired
    DbHexFailLogServiceImpl dbHexFailLogService;

    @PostConstruct
    public void init() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("xx.xx.xx.xx");
        factory.setPort(xxx);
        factory.setUsername("java_producer");
        factory.setPassword("java_producer");
        factory.setVirtualHost("xxxx");
        factory.setConnectionTimeout(3000);

        try {
            this.connection = factory.newConnection();
            this.channel = connection.createChannel();
            this.channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            // 啓用發佈者確認模式
            this.channel.confirmSelect();
            setupConfirmListener();

            logger.info("RabbitMqServiceImpl initialized successfully.");
        } catch (IOException | TimeoutException e) {
            logger.error("Failed to initialize RabbitMqServiceImpl: {}", e.getMessage(), e);
            throw new RuntimeException("Failed to initialize RabbitMqServiceImpl", e);
        }
    }

    public void sendMessage(String message) {
        try {
            long nextSeqNo = channel.getNextPublishSeqNo();
            outstandingConfirms.put(nextSeqNo, message);

            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            logger.info(" [x] Sent '{}'", message);
        } catch (Exception e) {
            logger.error("Failed to send message: {}", e.getMessage(), e);
            saveFailedMessageToDatabase(message,"CF");
        }
    }

    // 設置接收監聽器,記錄未確認的消息
    private void setupConfirmListener() {
        ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
            if (multiple) {
                outstandingConfirms.headMap(deliveryTag + 1).clear();
            } else {
                outstandingConfirms.remove(deliveryTag);
            }

            System.out.println("Message confirmed ok deliveryTag="+deliveryTag);
        };

        ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
            if (multiple) {
                // 獲取從起點到 `deliveryTag + 1` 之間的所有未確認的消息
                ConcurrentNavigableMap<Long, String> unconfirmedMessages = outstandingConfirms.headMap(deliveryTag + 1);
                List<String> FailList= new ArrayList<>();
                for (Map.Entry<Long, String> entry : unconfirmedMessages.entrySet()) {
                    String failedMessage = entry.getValue();
                    logger.error("Message not confirmed: deliveryTag={}, message={}", entry.getKey(), failedMessage);
                    FailList.add(failedMessage);
                }
                saveFailedMessageToDatabaseBy(FailList);  // 批量保存到數據庫
                unconfirmedMessages.clear();  // 清除這些未確認的消息
            } else {
                String failedMessage = outstandingConfirms.get(deliveryTag);
                logger.error("Message not confirmed: deliveryTag={}, message={}", deliveryTag, failedMessage);
                saveFailedMessageToDatabase(failedMessage,"SF");
                outstandingConfirms.remove(deliveryTag);  // 移除單條未確認的消息
            }
        };

        channel.addConfirmListener(ackCallback, nackCallback);
    }

    private void saveFailedMessageToDatabaseBy(List<String> failList) {

        List<DbHexFailLog> list=new ArrayList<>(failList.size());
        LocalDateTime now = LocalDateTime.now();
        for (String message : failList) {
            DbHexFailLog f=new DbHexFailLog();
            f.setInHexStr(message);
            f.setCtime(now);
            f.setFlag("SF");
            list.add(f);
        }


        dbHexFailLogService.saveBatch(list,list.size());
        failList.clear();

    }

    private void saveFailedMessageToDatabase(String message,String flag) {
        DbHexFailLog f=new DbHexFailLog();
        f.setInHexStr(message);
        f.setCtime(LocalDateTime.now());
        f.setFlag(flag);
        dbHexFailLogService.save(f);

    }

    @PreDestroy
    public void close() {
        try {
            if (channel != null && channel.isOpen()) {
                channel.close();
            }
            if (connection != null && connection.isOpen()) {
                connection.close();
            }
            logger.info("RabbitMqServiceImpl resources closed successfully.");
        } catch (IOException | TimeoutException e) {
            logger.error("Failed to close RabbitMqServiceImpl resources: {}", e.getMessage(), e);
        }
    }
}

上面的代碼優化後,主要增加了三項如下:

 1 Publisher Confirms 機制

  • 啓用 channel.confirmSelect() 來激活發佈者確認模式。
  • 使用 ConfirmCallbackNackCallback 來處理消息的確認與未確認邏輯。
  • 未確認的消息會被保存到數據庫中。

2 保存失敗的消息到數據庫。

3 在 @PreDestroy 方法中關閉 ChannelConnection,確保服務銷燬時正確關閉資源。

方式二 SpringBoot框架使用

mvn依賴包:

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

spring配置文件:

Spring: 
 rabbitmq:
    host: xx.xx.xx.xx
    port: 51091
    username: java_consumer
    password: java_consumer
    virtual-host: hellow
    connection-timeout: 6000

JAVA代碼:

  發送消息java代碼:

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @author hua
 * @date 2024-08-22
 */
@Component
public class MessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private Queue queue;

    public void sendMessage(String message) {
        rabbitTemplate.convertAndSend(queue.getName(), message);
        System.out.println(" [x] Sent '" + message + "'");
    }
}

  接收消息java代碼:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author hua
 * @date 2024-08-22
 */
@Component
public class RabbitListener {

    private static final Logger logger = LogManager.getLogger(RabbitListener.class);

    @RabbitListener(queues = "test")
    public void receiveMessage(String message) {

        try {
            System.out.println("rabbit rev <- "+message);
            //具體業務
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("rabbit err= ", e);
        }

    }
}

   上面代碼在生產發送消息時通過編碼方式更靈活,接收直接使用註解更簡單。