文章目錄
  • 1.環境搭建
  • 1.Docker安裝RabbitMQ
  • 1.拉取鏡像
  • 2.安裝命令
  • 3.開啓5672和15672端口
  • 4.登錄控制枱
  • 2.整合Spring AMQP
  • 1.sun-common模塊下創建新模塊
  • 2.引入amqp依賴和fastjson
  • 3.新建一個mq-demo的模塊
  • 1.在sun-frame下創建mq-demo
  • 2.然後在mq-demo下創建生產者和消費者子模塊
  • 3.查看是否交給父模塊管理了
  • 4.在mq-demo模塊引入sun-common-rabbitmq依賴
  • 5.publisher引入sun-common-test依賴
  • 6.將sun-common-rabbitmq clean-install一下
  • 7.給consumer和publisher都創建主類
  • 1.ConsumerApplication.java
  • 2.PublisherApplication.java
  • 4.測試MQ
  • 1.application.yml mq的最基本配置
  • 2.consumer
  • 1.TestConfig.java MQ配置
  • 2.TestConfigListener.java 監聽隊列
  • 3.publisher
  • 1.TestConfig.java 測試(注意指定啓動類)
  • 2.結果
  • 2.基本交換機
  • 1.Fanout
  • 1.FanoutConfig.java 交換機配置
  • 2.FanoutConfigListener.java 監聽者
  • 3.FanoutConfig.java 生產者
  • 2.Direct
  • 1.DirectConfig.java 交換機配置
  • 2.DirectConfigListener.java 監聽者
  • 3.DirectConfig.java 生產者

1.環境搭建

1.Docker安裝RabbitMQ
1.拉取鏡像
docker pull rabbitmq:3.8-management
2.安裝命令
docker run -e RABBITMQ_DEFAULT_USER=sun 
           -e RABBITMQ_DEFAULT_PASS=mq 
           -v mq-plugins:/plugins 
           --name mq 
           --hostname mq 
           -p 15672:15672 
           -p 5672:5672 
           -d 699038cb2b96 # 注意這裏是鏡像id,需要替換
3.開啓5672和15672端口
4.登錄控制枱

15672端口

2.整合Spring AMQP
1.sun-common模塊下創建新模塊

【RabbitMQ筆記09】消息隊列RabbitMQ之常見方法的使用_java

2.引入amqp依賴和fastjson
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <!-- 繼承父模塊的版本和通用依賴 -->
    <parent>
        <groupId>com.sunxiansheng</groupId>
        <artifactId>sun-common</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>sun-common-rabbitmq</artifactId>
    <!-- 子模塊的version,如果不寫就默認跟父模塊的一樣 -->
    <version>${children.version}</version>

    <!-- 自定義依賴,無需版本號 -->
    <dependencies>
        <!--AMQP依賴,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!-- 用於傳遞消息時的序列化操作 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>
    </dependencies>

</project>
3.新建一個mq-demo的模塊
1.在sun-frame下創建mq-demo

【RabbitMQ筆記09】消息隊列RabbitMQ之常見方法的使用_java_02

2.然後在mq-demo下創建生產者和消費者子模塊

【RabbitMQ筆記09】消息隊列RabbitMQ之常見方法的使用_ci_03

【RabbitMQ筆記09】消息隊列RabbitMQ之常見方法的使用_#分佈式_04

3.查看是否交給父模塊管理了

【RabbitMQ筆記09】消息隊列RabbitMQ之常見方法的使用_spring_05

4.在mq-demo模塊引入sun-common-rabbitmq依賴
<dependencies>
        <!-- 引入sun-common-rabbitmq -->
        <dependency>
            <groupId>com.sunxiansheng</groupId>
            <artifactId>sun-common-rabbitmq</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>
5.publisher引入sun-common-test依賴
<dependencies>
        <!-- sun-common-test -->
        <dependency>
            <groupId>com.sunxiansheng</groupId>
            <artifactId>sun-common-test</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>
6.將sun-common-rabbitmq clean-install一下
7.給consumer和publisher都創建主類
1.ConsumerApplication.java
package com.sunxiansheng.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
@ComponentScan("com.sunxiansheng")
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}
2.PublisherApplication.java
package com.sunxiansheng.publisher;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class PublisherApplication {
    public static void main(String[] args) {
        SpringApplication.run(PublisherApplication.class);
    }
}
4.測試MQ
1.application.yml mq的最基本配置
spring:
  # RabbitMQ 配置
  rabbitmq:
    # 服務器地址
    host: ip
    # 用户名
    username: sunxiansheng
    # 密碼
    password: rabbitmq
    # 虛擬主機
    virtual-host: /
    # 端口
    port: 5672
2.consumer
1.TestConfig.java MQ配置
package com.sunxiansheng.consumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Description: 最基本的MQ測試
 * @Author sun
 * @Create 2024/8/2 14:34
 * @Version 1.0
 */
@Configuration
public class TestConfig {

    /**
     * 創建一個fanout類型的交換機
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout.exchange.test");
    }

    /**
     * 創建一個隊列
     * @return
     */
    @Bean
    public Queue fanoutQueueTest() {
        return new Queue("fanout.queue.test");
    }

    /**
     * 交換機和隊列綁定
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(fanoutQueueTest()).to(fanoutExchange());
    }

}
2.TestConfigListener.java 監聽隊列
package com.sunxiansheng.consumer.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * Description: 最基本的MQ測試
 * @Author sun
 * @Create 2024/8/2 14:34
 * @Version 1.0
 */
@Component
public class TestConfigListener {

    @RabbitListener(queues = "fanout.queue.test")
    public void receive(String message) {
        System.out.println("接收到的消息:" + message);
    }

}
3.publisher
1.TestConfig.java 測試(注意指定啓動類)
package com.sunxiansheng.consumer.config;

import com.sunxiansheng.publisher.PublisherApplication;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

/**
 * Description: 最基本的MQ測試
 * @Author sun
 * @Create 2024/8/2 14:34
 * @Version 1.0
 */
@SpringBootTest(classes = PublisherApplication.class) // 指定啓動類
public class TestConfig {

    @Resource
    private AmqpTemplate amqpTemplate;

    @Test
    public void send() {
        // 交換機
        String exchange = "fanout.exchange.test";
        // 路由鍵
        String routingKey = "";
        // 消息
        String message = "hello fanout";
        // 發送消息
        amqpTemplate.convertAndSend(exchange, routingKey, message);
    }

}
2.結果

【RabbitMQ筆記09】消息隊列RabbitMQ之常見方法的使用_java_06

2.基本交換機

1.Fanout
1.FanoutConfig.java 交換機配置
package com.sunxiansheng.consumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Description: Fanout交換機
 * @Author sun
 * @Create 2024/7/29 15:06
 * @Version 1.0
 */
@Configuration
public class FanoutConfig {

    @Bean
    public FanoutExchange fanoutExchange1() {
        // 創建一個fanout類型的交換機
        return new FanoutExchange("fanout.exchange");
    }

    @Bean
    public Queue fanoutQueue1() {
        // 創建一個隊列
        return new Queue("fanout.queue1");
    }

    @Bean
    public Queue fanoutQueue2() {
        // 創建一個隊列
        return new Queue("fanout.queue2");
    }

    // 兩個隊列綁定到交換機上
    @Bean
    public Binding bindingFanoutQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange1) {
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange1);
    }

    @Bean
    public Binding bindingFanoutQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange1) {
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange1);
    }
}
2.FanoutConfigListener.java 監聽者
package com.sunxiansheng.consumer.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * Description: Fanout交換機
 * @Author sun
 * @Create 2024/7/29 15:06
 * @Version 1.0
 */
@Component
public class FanoutConfigListener {

    @RabbitListener(queues = "fanout.queue1")
    public void receive1(String message) {
        System.out.println("fanout.queue1接收到的消息:" + message);
    }

    @RabbitListener(queues = "fanout.queue2")
    public void receive2(String message) {
        System.out.println("fanout.queue2接收到的消息:" + message);
    }

}
3.FanoutConfig.java 生產者
package com.sunxiansheng.consumer.config;

import com.sunxiansheng.publisher.PublisherApplication;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

/**
 * Description: Fanout交換機
 * @Author sun
 * @Create 2024/7/29 15:06
 * @Version 1.0
 */
@SpringBootTest(classes = PublisherApplication.class) // 指定啓動類
public class FanoutConfig {

    @Resource
    private AmqpTemplate amqpTemplate;

    @Test
    public void send() {
        // 交換機
        String exchange = "fanout.exchange";
        // 路由鍵
        String routingKey = "";
        // 消息
        String message = "hello fanout";
        // 發送消息
        amqpTemplate.convertAndSend(exchange, routingKey, message);
    }

}
2.Direct
1.DirectConfig.java 交換機配置
package com.sunxiansheng.consumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Description: Direct交換機
 * @Author sun
 * @Create 2024/7/29 15:06
 * @Version 1.0
 */
@Configuration
public class DirectConfig {

    @Bean
    public DirectExchange directExchange() {
        // 創建一個direct類型的交換機
        return new DirectExchange("direct.exchange");
    }

    @Bean
    public Queue directQueue1() {
        // 創建一個隊列
        return new Queue("direct.queue1");
    }

    @Bean
    public Queue directQueue2() {
        // 創建一個隊列
        return new Queue("direct.queue2");
    }

    // 兩個隊列綁定到交換機上,這裏需要指定routingKey
    @Bean
    public Binding bindingDirectQueue1(Queue directQueue1, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue1).to(directExchange).with("black");
    }

    @Bean
    public Binding bindingDirectQueue2(Queue directQueue2, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue2).to(directExchange).with("green");
    }

}
2.DirectConfigListener.java 監聽者
package com.sunxiansheng.consumer.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * Description: Direct交換機
 * @Author sun
 * @Create 2024/7/29 15:06
 * @Version 1.0
 */
@Component
public class DirectConfigListener {

    @RabbitListener(queues = "direct.queue1")
    public void receive1(String message) {
        System.out.println("direct.queue1接收到的消息:" + message);
    }

    @RabbitListener(queues = "direct.queue2")
    public void receive2(String message) {
        System.out.println("direct.queue2接收到的消息:" + message);
    }

}
3.DirectConfig.java 生產者
package com.sunxiansheng.consumer.config;

import com.sunxiansheng.publisher.PublisherApplication;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

/**
 * Description: Direct交換機
 * @Author sun
 * @Create 2024/7/29 15:06
 * @Version 1.0
 */
@SpringBootTest(classes = PublisherApplication.class) // 指定啓動類
public class DirectConfig {

    @Resource
    private AmqpTemplate amqpTemplate;

    @Test
    public void send() {
        // 交換機
        String exchange = "direct.exchange";
        // 路由鍵
        String routingKey = "black";
        // 消息
        String message = "hello direct";
        // 發送消息
        amqpTemplate.convertAndSend(exchange, routingKey, message);
    }

}