消息队列中间件,RabbitMQ的使用,死信队列,延迟队列,利用枚举实现队列,交换机,RountKey的声明

目录

0.交换机种类和区别

1.声明队列和交换机以及RountKey

2.初始化循环绑定

3.声明交换机

4.监听队列

4.1 监听普通队列

4.2监听死信队列

 5.削峰填谷的实现


0.交换机种类和区别

  1. Direct Exchange(直连交换机)

    • 直连交换机将消息发送到与消息的路由键完全匹配的队列。它是最简单的交换机类型之一。
    • 当一个队列使用某个直连交换机绑定时,它需要指定一个绑定键(binding key),当消息的路由键与该绑定键完全匹配时,消息会被发送到该队列。
  2. Fanout Exchange(扇出交换机)

    • 扇出交换机会将消息发送到与其绑定的所有队列,忽略消息的路由键。
    • 当一个队列使用扇出交换机绑定时,它会接收到交换机发送的所有消息,无论消息的路由键是什么。
  3. Topic Exchange(主题交换机)

    • 主题交换机根据消息的路由键和绑定键之间的模式匹配来路由消息。
    • 绑定键可以使用通配符进行匹配,支持 '*' 匹配一个单词,'#' 匹配零个或多个单词,从而允许更灵活的路由规则。
  4. Headers Exchange(标头交换机)

    • 标头交换机根据消息的标头(headers)中的键值对来路由消息,而不是使用路由键。
    • 在将队列绑定到标头交换机时,可以指定一组标头键值对,只有当消息的标头中包含与绑定相匹配的所有键值对时,消息才会被路由到该队列。

如果满足key的前提下,绑定同一个交换机的队列都会分配到相同数量的信息

比如此时交换机有20条信息,a,b队列都会分配到20条信息

默认情况下,会轮询分配给消费者,也可以设置最多获取多少条未被消费的信息,根据消费者的消费能力来设置

1.声明队列和交换机以及RountKey

package com.example.config;


import lombok.Getter;

@Getter
public enum RabbitmqBind {


    DATA_CLEAN_PROCESS_DEAD(
            RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
            RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS_DEAD,
            RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS_DEAD,
            false,
            false,
            null,
            null
    ),

    DATA_CLEAN_PROCESS(
            RabbitMqExchangeEnum.E_DIRECT_RCP,
            RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS,
            RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS,
            true,
            true,
            RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
            RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS_DEAD),

    SMS_CLEAN_DEAD(
            RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
            RabbitMqQueueConstants.Q_API_TO_DCN_SMS_DEAD,
            RabbitmqRoutingKey.K_DATA_CLEAN_FINISH_DEAD,
            true,
            false,
            null,
            null
    ),

    SMS_CLEAN(
            RabbitMqExchangeEnum.E_TOPIC_RCP,
            RabbitMqQueueConstants.Q_API_TO_DCN_SMS,
            RabbitmqRoutingKey.K_API_TO_DCN_SMS,
            true,
            true,
            RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
            RabbitmqRoutingKey.K_DATA_CLEAN_FINISH_DEAD
    ),


    ;

    /**
     * 交换机
     */
    private RabbitMqExchangeEnum exchange;

    /**
     * 队列名称
     */
    private String queueName;

    /**
     * 路由Key
     */
    private RabbitmqRoutingKey routingKey;

    /**
     * 绑定标识
     * 是否启用
     */
    private Boolean isBind;

    /**
     * 是否绑定死信
     */
    private Boolean isDeathBelief;

    /**
     * 绑定的死信交换机
     */
    private RabbitMqExchangeEnum boundDeadExchange;

    /**
     * 死信key
     */
    private RabbitmqRoutingKey deadRoutingKey;


    RabbitmqBind(RabbitMqExchangeEnum exchange, String queueName, RabbitmqRoutingKey routingKey, Boolean isBind,
                 Boolean isDeathBelief, RabbitMqExchangeEnum boundDeadExchange, RabbitmqRoutingKey deadRoutingKey
    ) {
        this.exchange = exchange;
        this.queueName = queueName;
        this.routingKey = routingKey;
        this.isBind = isBind;
        this.isDeathBelief = isDeathBelief;
        this.boundDeadExchange = boundDeadExchange;
        this.deadRoutingKey = deadRoutingKey;
    }

    /**
     * 交换机
     */
    @Getter
    public enum RabbitMqExchangeEnum {

        /**
         * 交换机定义,类型 - 名称
         */
        E_DIRECT_RCP("direct", "E_DIRECT_RCP"),
        DEAD_E_DIRECT_RCP("direct", "DEAD_E_DIRECT_RCP"),

        E_TOPIC_RCP("topic", "E_TOPIC_RCP"),

        E_TOPIC_PAY("topic", "E_TOPIC_PAY");

        private String exchangeType;

        private String exchangeName;

        RabbitMqExchangeEnum(String exchangeType, String exchangeName) {
            this.exchangeType = exchangeType;
            this.exchangeName = exchangeName;
        }
    }

    /**
     * 队列名定义
     */
    public interface RabbitMqQueueConstants {

        /**
         * 接收清洗数据
         */
        String Q_DATA_CLEAN_PROCESS = "RMPS_TO_RCP_DATA_CLEAN_PROCESS";

        /**
         * 清洗结束通知
         */
        String Q_API_TO_DCN_SMS = "Q_API_TO_DCN_SMS";

        /**
         * 死信队列
         */
        String Q_DATA_CLEAN_PROCESS_DEAD = "Q_DATA_CLEAN_PROCESS_DEAD";

        /**
         * 清洗结束通知死信队列
         */
        String Q_API_TO_DCN_SMS_DEAD = "Q_API_TO_DCN_SMS_DEAD";
    }

    /**
     * routingKey
     */
    @Getter
    public enum RabbitmqRoutingKey {

        /**
         * 路由
         */
        K_DATA_CLEAN_PROCESS("K_DATA_CLEAN_PROCESS"),
        K_API_TO_DCN_SMS("K_API_TO_DCN_SMS"),

        // 路由绑定死信路由
        DEAD("DEAD"),

        //死信路由
        K_DATA_CLEAN_PROCESS_DEAD("K_DATA_CLEAN_PROCESS_DEAD"),
        K_DATA_CLEAN_FINISH_DEAD("K_DATA_CLEAN_FINISH_DEAD"),
        ;

        private String keyName;

        RabbitmqRoutingKey(String keyName) {
            this.keyName = keyName;
        }
    }

}

2.初始化循环绑定

package com.example.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Arrays;

@Configuration
@ConditionalOnClass(EnableRabbit.class)
public class MqConfig {
    @Resource
    protected RabbitTemplate rabbitTemplate;
    @Resource
    ConnectionFactory connectionFactory;
//
//    @Lazy
//    @Autowired
//    protected RabbitAdmin rabbitAdmin;
//
//
//    public static final int DEFAULT_CONCURRENT = 10;
//
//    @Bean("customContainerFactory")
//    public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
//                                                                 ConnectionFactory connectionFactory) {
//        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//        factory.setConcurrentConsumers(DEFAULT_CONCURRENT);
//        factory.setMaxConcurrentConsumers(DEFAULT_CONCURRENT);
//        configurer.configure(factory, connectionFactory);
//        return factory;
//    }
//
//    @Bean
//    @ConditionalOnMissingBean
//    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
//        return new RabbitTransactionManager(connectionFactory);
//    }
//
//    @Bean
//    @ConditionalOnMissingBean
//    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
//        return new RabbitAdmin(connectionFactory);
//    }

    @PostConstruct
    protected void init() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);


        rabbitTemplate.setChannelTransacted(true);
        //创建exchange
        Arrays.stream(RabbitmqBind.RabbitMqExchangeEnum.values())
                .forEach(rabbitMqExchangeEnum -> {
                            Exchange exchange = RabbitmqExchange
                                    .getInstanceByType(rabbitMqExchangeEnum.getExchangeType())
                                    .createExchange(rabbitMqExchangeEnum.getExchangeName());
                            rabbitAdmin.declareExchange(exchange);
                        }
                );

        //创建队列并绑定exchange
        Arrays.stream(RabbitmqBind.values()).forEach(RabbitmqBind -> {
            if (RabbitmqBind.getIsBind()) {
                if (RabbitmqBind.getIsDeathBelief()) {
                    //需要绑定死信交换机的队列
                    rabbitAdmin.declareQueue(QueueBuilder.durable(RabbitmqBind.getQueueName())
                            .ttl(60000).deadLetterExchange(RabbitmqBind.getBoundDeadExchange().getExchangeName())
                            .deadLetterRoutingKey(RabbitmqBind.getDeadRoutingKey().getKeyName()).build());
                    rabbitAdmin.declareBinding(new Binding(RabbitmqBind.getQueueName(),
                            Binding.DestinationType.QUEUE,
                            RabbitmqBind.getExchange().getExchangeName(),
                            RabbitmqBind.getRoutingKey().getKeyName(), null));
                } else {
                    //不需要绑定死信交换机的队列
                    rabbitAdmin.declareQueue(new Queue(RabbitmqBind.getQueueName(),
                            true, false, false, null));
                    rabbitAdmin.declareBinding(new Binding(RabbitmqBind.getQueueName(),
                            Binding.DestinationType.QUEUE,
                            RabbitmqBind.getExchange().getExchangeName(),
                            RabbitmqBind.getRoutingKey().getKeyName(), null));
                }
            }
        });
    }

}

 绑定的形式由枚举类中定义

3.声明交换机

package com.example.config;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.TopicExchange;

import java.util.Arrays;


@Getter
@Slf4j
public enum RabbitmqExchange {

    DIRECT("direct"){
        @Override
        public Exchange createExchange(String exchangeName) {
            return new DirectExchange(exchangeName, true, false);
        }
    },

    TOPIC("topic"){
        @Override
        public Exchange createExchange(String exchangeName) {
            return new TopicExchange(exchangeName, true, false);
        }
    };

    public static RabbitmqExchange getInstanceByType(String type){

        return Arrays.stream(RabbitmqExchange.values()).filter(e -> e.getType().equals(type))
                .findAny()
                .orElseThrow(() ->
//                        new ProcessException("无效的exchange type")

                        new RuntimeException("无效的exchange type")
                );
    }

    private String type;


    RabbitmqExchange(String type) {
        this.type = type;
    }

    public abstract Exchange createExchange(String exchangeName);

}

4.监听队列

4.1 监听普通队列

package com.example.listener;

import com.example.config.RabbitmqBind;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Slf4j
@Component
@RabbitListener(queues = {
        RabbitmqBind.RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS }, concurrency = "1-5")
//, containerFactory = "customContainerFactory"
public class MqListener {



    @RabbitHandler
    public void processMessage(String message) {
        log.info("DataClean recive message :{} ", message);
        process(message);
    }

    @RabbitHandler
    public void processMessage(byte[] message) {
        String msg = new String(message);
        log.info("DataClean recive message :{} ", msg);
        process(msg);
    }

    /**
     * 处理推送消息
     * @param message
     */
    private void process(String message) {
        log.info("process message :{}" , message);
        if(StringUtils.isBlank(message)) {
            log.error("process message is blank , message:{}" , message);
            return;
        }
    }

}

 监听并处理任务

4.2监听死信队列

package com.example.listener;

import com.example.config.RabbitmqBind;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RabbitListener(queues = {
        RabbitmqBind.RabbitMqQueueConstants.Q_API_TO_DCN_SMS_DEAD }, concurrency = "1-5")
public class DeadListener {

    @RabbitHandler
    public void processMessage(String message) {
        log.info("DataClean recive message :{} ", message);
        process(message);
    }

    @RabbitHandler
    public void processMessage(byte[] message) {
        String msg = new String(message);
        log.info("DataClean recive message :{} ", msg);
        process(msg);
    }


    /**
     * 处理推送消息
     * @param message
     */
    private void process(String message) {
        log.info("Dead process message :{}" , message);
        if(StringUtils.isBlank(message)) {
            log.error("Dead process message is blank , message:{}" , message);
            return;
        }
    }

}

 5.削峰填谷的实现

把高峰期的消息填进低峰期

可以用拉取的方式来实现

或者用消费者的最大数量和最小数量来实现

channel.basicQos();//设置最大获取未确认消息的数量,实现权重

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/547895.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

武汉星起航:亚马逊跨境引领全球贸易新趋势,展现积极影响力

随着全球化浪潮的持续推进,跨境电商行业正迎来前所未有的发展机遇。亚马逊作为全球领先的电商平台,其在跨境电商领域的发展趋势备受瞩目。亚马逊跨境电商不仅扩大了跨境市场的规模,优化了供应链管理,还积极应用科技创新&#xff0…

【讲解下如何从零基础学习Java】

🎥博主:程序员不想YY啊 💫CSDN优质创作者,CSDN实力新星,CSDN博客专家 🤗点赞🎈收藏⭐再看💫养成习惯 ✨希望本文对您有所裨益,如有不足之处,欢迎在评论区提出…

pyqt实战-软件通用界面设计模板样例

软件界面 技术点 无边框设计自定义右上角最大化,最小化,关闭按钮界面布局能够自适应界面的放大缩小按住鼠标左键能够拖动整个界面treewidget整体风格设计 代码 import sysfrom PyQt5 import QtCore, QtGui, QtWidgets from PyQt5.Qt import * impor…

c# 服务创建

服务 创建服务 编写服务 可以对server1.cs重新命名,点击你的server按F7进入代码编辑模式,编写脚本 双击你的server.cs右击空白位置,添加安装程序,此时会生成“serviceInstaller1”及“serviceProcessInstaller1” 后续可以点击P…

开源相机管理库Aravis例程学习(一)——单帧采集single-acquisition

开源相机管理库Aravis例程学习(一)——单帧采集single-acquisition 简介源码函数说明arv_camera_newarv_camera_acquisitionarv_camera_get_model_namearv_buffer_get_image_widtharv_buffer_get_image_height 简介 本文针对官方例程中的第一个例程&…

CSS特效---跳动的文字

1、演示 2、一切尽在代码中 <!--* Author: your name* Date: 2023-10-03 14:42:44* LastEditTime: 2023-10-03 14:56:26* LastEditors: DESKTOP-536UVPC* Description: In User Settings Edit* FilePath: \css-special-effects\跳动的文字.html --> <!DOCTYPE html>…

【Linux】进程的优先级环境变量

个人主页 &#xff1a; zxctscl 如有转载请先通知 文章目录 1. 前言2. 进程的优先级2.1 什么是优先级2.2 为什么要有优先级2.3 优先级的查看方式2.4 对优先级调整 3. 命令行参数4. 环境变量4.1 环境变量与配置文件4.1.1 环境变量初步介绍4.1.2 配置文件 4.2 更多环境变量4.3 整…

Shiro整合EhCache缓存(二)

Shiro整合EhCache缓存 1、授权、角色认证1.1、后端接口服务注解1.2、授权-获取角色1.3、授权-获取权限1.4、授权-异常处理1.5、前端页面授权 2、EhCache2.1、Shiro整合EhCache 1、授权、角色认证 用户登录后&#xff0c;需要验证是否具有指定角色指定权限。Shiro也提供了方便的…

企业网络日益突出的难题与SD-WAN解决方案

随着企业规模的迅速扩张和数字化转型的深入推进&#xff0c;企业在全球范围内需要实现总部、分支机构、门店、数据中心、云等地点的网络互联、数据传输和应用加速。SD-WAN作为当今主流解决方案&#xff0c;在网络效率、传输质量、灵活性和成本等方面远远超越传统的互联网、专线…

归并排序详解(附代码)

归并排序 数据科学家每天都在处理算法。 然而&#xff0c;数据科学学科作为一个整体已经发展成为一个不涉及复杂算法实现的角色。 尽管如此&#xff0c;从业者仍然可以从建立对算法的理解和知识库中受益。 在本文中&#xff0c;对排序算法归并排序进行了介绍、解释、评估和实…

高温超导量子干涉仪更具实用价值 政策推动行业研制能力提升

高温超导量子干涉仪更具实用价值 政策推动行业研制能力提升 高温超导量子干涉仪&#xff0c;一种采用临界温度在77K以上高温超导材料制造而成的可对磁场微小变化进行测量的仪器&#xff0c;需要工作在液氮环境中。 超导量子干涉仪&#xff08;SQUID&#xff09;&#xff0c;一种…

面试官:为什么忘记密码要重置而不是告诉你原密码?

这是一个挺有意思的面试题,挺简单的,不知道大家平时在重置密码的时候有没有想过这个问题。回答这个问题其实就一句话:因为服务端也不知道你的原密码是什么。如果知道的话,那就是严重的安全风险问题了。 我们这里来简单分析一下。 做过开发的应该都知道,服务端在保存密码到…

CPLD可运行的最高频率是多少

CPLD可运行的最高频率是多少 AG32 内置CPLD的可运行最高频率 AG32 内置CPLD的可运行最高频率 AG32 MCU 的运行最高频率是248M。而CPLD中没有标准的最高频率。 最大能跑多少MHz&#xff0c;取决于cpld 里的设计。 如果是逻辑电路&#xff0c;则不存在时钟的概念。 如果是时序电路…

在vue和 js 、ts 数据中使用 vue-i18n,切换语言环境时,标签文本实时变化

我的项目需要显示两种语言(中文和英文)&#xff0c;并且我想要切换语言时&#xff0c;页面语言环境会随之改变&#xff0c;目前发现&#xff0c;只能在vue中使用$t(‘’)的方式使用&#xff0c;但是这种方式只能在vue中使用&#xff0c;而我的菜单文件是定义在js中&#xff0c;…

直流充电桩与交流充电桩有哪些区别,如何选最靠谱?

在当今快速发展的电动汽车市场&#xff0c;正确选择充电桩成为了车主们面临的重要问题之一。直流充电桩与交流充电桩区到底有什么区别&#xff1f;哪些方面不同&#xff1f;分别适用场景是什么&#xff1f;不同场景应该怎么选&#xff1f;本文一文为您详解。 一、直流充电桩与交…

ObjectMapper的具体介绍与使用

文章目录 声明一、前言二、ObjectMapper与JSONObject比较1、核心主要有三个部分&#xff1a;依赖包不同 2、ObjectMapper使用概述2.1、工程的pom.xml导包信息2.2、创建案例中的测试对象2.3、对象和JSON相互转化2.3.1、测试代码2.3.2、测试结果展示 2.4、集合和JSON像话转化2.4.…

【让自己的U盘变得与众不同】

文章目录 今日座右铭&#xff1a;在心里种花&#xff0c;人生才不会荒芜。 文章目录 文章目录前言一、准备ICO图标二、插入U盘1.点击新建文本文档-输入代码-点击保存2.将代码文本文档名称修改为autorun.inf在这里插入图片描述3.将图标及代码文本文档放入U盘中在这里插入图片描述…

深度残差收缩网络中,使用 Sigmoid 函数的用意在哪?

在深度残差收缩网络中&#xff0c;使用 Sigmoid 函数将输出归一化到 0 和 1 之间的目的是为了限制输出值的范围&#xff0c;并且使得输出可以被解释为概率。这个 Sigmoid 函数的输出可以被看作是一个置信度或者概率的度量&#xff0c;表示某个事件发生的可能性。 在设置阈值时…

财富池指标--通达信主力建仓免费指标公式源码

主力交易一只个股&#xff0c;一般会经过以下几个阶段&#xff1a;建仓、拉升、出货。那么&#xff0c;怎么判断一只股票正处于主力建仓时期呢&#xff1f; 1、从k线图上来看 个股走势图中&#xff0c;连续出现小阳线的k线图&#xff0c;其成交量持续放量&#xff0c;或者在个…

【学习笔记】rt-thread

任务 创建好任务&#xff0c;不管是动态还是静态创建&#xff0c;任务的状态是init &#xff0c;通过start方法来启动任务&#xff1b;线程大小 设置小了&#xff0c;无法正常工作&#xff1f;显示占空间100% 启动过程 TODO 这是编译器特性&#xff1f; 因为RT-Thread使用编…
最新文章