当前位置: 首页 > news >正文

网站后台添加文字网站建设中山优化

网站后台添加文字,网站建设中山优化,绍兴h5建站,客户渠道消息堆积 原因 消息堆积是指在消息队列中,待处理的消息数量超过了消费者处理能力,导致消息在队列中不断堆积的现象。通常有以下几种原因: 消息生产过快:在高流量或者高负载的情况下,生产者以极高的速率发送消息&…

消息堆积

原因

消息堆积是指在消息队列中,待处理的消息数量超过了消费者处理能力,导致消息在队列中不断堆积的现象。通常有以下几种原因:

消息生产过快:在高流量或者高负载的情况下,生产者以极高的速率发送消息,超过了消费者的处理能力。

消费者处理能力不足:消费者消费消息的速度跟不上消息生产的速度,也会导致消息在队列中积压。可能的原因有:

  • 消费端业务逻辑复杂、耗时长。
  • 消费端代码性能低。
  • 系统资源限制,如CPU、内存、磁盘等也会限制消费者处理消息的速率。
  • 异常处理不当,消费者在处理消息时出现异常,导致消息无法被正确处理和确认。

网络问题:因为网络延迟或不稳定,消费者无法及时接收或确认消息,最终导致消息积压。

RabbitMQ服务器配置偏低

解决方案

消息积压可能会导致系统性能下降,影响用户体验,甚至导致系统崩溃。因此,及时发现消息积压并解决对于维护系统稳定性至关重要。

遇到消息积压时,首先要分析消息积压造成的原因,根据原因来调整策略。通常从以下几个方面考虑:

提高消费者效率

  • 增加消费者实例数量,比如新增机器。
  • 优化业务逻辑,比如使用多线程来处理业务。
  • 设置prefetchCount,当一个消费者阻塞时,消息转发到其他未阻塞的消费者。
  • 消息发生异常时,设置合适的重试策略,或者转入到死信队列。

限制生产者效率:比如流量控制、限流算法等。

  • 流量控制:在消息生产者中实现流量控制逻辑,根据消费者处理能力动态调整发送效率。
  • 限流:使用限流工具,为消息发送效率设置一个上限。
  • 设置过期时间:如果消息过期未被消费,可以配置死信队列,以避免消息丢失,同时减少对主队列的压力。

资源与配置优化:比如省级RabbitMQ服务器的硬件,调整RabbitMQ的配置参数等。

推拉模式

概述

RabbitMQ支持两种消息传递模式:推模式(push)和拉模式(pull)。

推模式:消息中间件主动将消息推送给消费者(对消息的获取更加实时,适合对数据实时性要求较高的业务,例如实时数据处理:监控系统、报表系统等)。

拉模式:消费者主动从消息中间件拉取消息(消费端可以按照自己的处理速度来消费,避免消息积压,适合需要流量控制,或者需要大量计算资源的任务,拉取模式允许消费者准备好之后再请求消息,避免资源浪费)。

RabbitMQ主要就是基于推模式工作的,例如最开始谈到的几种工作模式,都是基于推模式来进行实现。它的设计核心是让消息队列中的消费者接收到由生产者发送的消息,使用channel.basicConsume方式订阅队列,MQ就会把消息推送到订阅该队列的消费者。如果只想从队列中获取单条消息而不是持续订阅,则可以使用channel.basicGet方式来进行消费消息。

SpringBoot方式

@Configuration
public class PushAndPull {@Bean("pushQueue")public Queue pushQueue() {return QueueBuilder.durable(Constants.PUSH_QUEUE).build();}@Bean("pushExchange")public Exchange pushExchange() {return ExchangeBuilder.directExchange(Constants.PUSH_EXCHANGE).durable(true).build();}@Bean("pushQueueBind")public Binding pushQueueBind(@Qualifier("pushExchange") Exchange exchange,@Qualifier("pushQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("push").noargs();}@Bean("pullQueue")public Queue pullQueue() {return QueueBuilder.durable(Constants.PULL_QUEUE).build();}@Bean("pullExchange")public Exchange pullExchange() {return ExchangeBuilder.directExchange(Constants.PULL_EXCHANGE).durable(true).build();}@Bean("pullQueueBind")public Binding pullQueueBind(@Qualifier("pullExchange") Exchange exchange,@Qualifier("pullQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("pull").noargs();}
}
@Slf4j
@RestController
@RequestMapping("/pushAndPull")
public class PushAndPullController {@Resourceprivate RabbitTemplate rabbitTemplate;// 推模式@RequestMapping("/push")public void push() {this.rabbitTemplate.convertAndSend(Constants.PUSH_EXCHANGE, "push", "hello push");System.out.println("推模式消息发送成功!");}// 拉模式@RequestMapping("/pull")public void pull() {this.rabbitTemplate.convertAndSend(Constants.PULL_EXCHANGE, "pull", "hello pull");System.out.println("拉模式消息发送成功!");}}
@Configuration
@RestController
public class PushAndPullListener {// 推模式@RabbitListener(queues = Constants.PUSH_QUEUE)public void push(String message) {System.out.println("推模式: " + message);}@Resourceprivate RabbitTemplate rabbitTemplate;// 拉模式@RequestMapping("/pullConsumer")public void pull(String message) {Message receive = this.rabbitTemplate.receive(Constants.PULL_QUEUE);System.out.println("拉模式: " + receive);}}

SDK方式

// 推模式生产者
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("113.45.220.15"); // IPconnectionFactory.setPort(5672); // PORTconnectionFactory.setUsername("admin"); // 用户名connectionFactory.setPassword("admin"); // 密码connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机// TODO 创建连接Connection connection = connectionFactory.newConnection();// TODO 获取信道Channel channel = connection.createChannel();// TODO 声明交换机channel.exchangeDeclare(Constants.PUSH_EXCHANGE, BuiltinExchangeType.DIRECT, true);// TODO 声明队列channel.queueDeclare(Constants.PUSH_QUEUE, true, false, false, null);// TODO 绑定交换机和队列channel.queueBind(Constants.PUSH_QUEUE, Constants.PUSH_EXCHANGE, "push");// TODO 发送消息channel.basicPublish(Constants.PUSH_EXCHANGE, "push", null, "推模式".getBytes());System.out.println("推模式发送消息成功!");// TODO 释放资源channel.close();connection.close();}}
// 推模式消费者
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("113.45.220.15"); // IPconnectionFactory.setPort(5672); // PORTconnectionFactory.setUsername("admin"); // 用户名connectionFactory.setPassword("admin"); // 密码connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机// TODO 创建连接Connection connection = connectionFactory.newConnection();// TODO 获取信道Channel channel = connection.createChannel();// TODO 声明交换机channel.exchangeDeclare(Constants.PUSH_EXCHANGE, BuiltinExchangeType.DIRECT, true);// TODO 声明队列channel.queueDeclare(Constants.PUSH_QUEUE, true, false, false, null);// TODO 绑定交换机和队列channel.queueBind(Constants.PUSH_QUEUE, Constants.PUSH_EXCHANGE, "push");// TODO 接收消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("推模式消费者接收到消息:" + new String(body));}};channel.basicConsume(Constants.PUSH_QUEUE, true, consumer);// TODO 释放资源}}
// 拉模式生产者
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("113.45.220.15"); // IPconnectionFactory.setPort(5672); // PORTconnectionFactory.setUsername("admin"); // 用户名connectionFactory.setPassword("admin"); // 密码connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机// TODO 创建连接Connection connection = connectionFactory.newConnection();// TODO 获取信道Channel channel = connection.createChannel();// TODO 声明交换机channel.exchangeDeclare(Constants.PULL_EXCHANGE, BuiltinExchangeType.DIRECT, true);// TODO 声明队列channel.queueDeclare(Constants.PULL_QUEUE, true, false, false, null);// TODO 绑定交换机和队列channel.queueBind(Constants.PULL_QUEUE, Constants.PULL_EXCHANGE, "pull");// TODO 发送消息channel.basicPublish(Constants.PULL_EXCHANGE, "pull", null, "拉模式".getBytes());System.out.println("拉模式发送消息成功!");// TODO 释放资源channel.close();connection.close();}}
// 拉模式消费者
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("113.45.220.15"); // IPconnectionFactory.setPort(5672); // PORTconnectionFactory.setUsername("admin"); // 用户名connectionFactory.setPassword("admin"); // 密码connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机// TODO 创建连接Connection connection = connectionFactory.newConnection();// TODO 获取信道Channel channel = connection.createChannel();// TODO 声明交换机channel.exchangeDeclare(Constants.PULL_EXCHANGE, BuiltinExchangeType.DIRECT, true);// TODO 声明队列channel.queueDeclare(Constants.PULL_QUEUE, true, false, false, null);// TODO 绑定交换机和队列channel.queueBind(Constants.PULL_QUEUE, Constants.PULL_EXCHANGE, "pull");// TODO 接收消息GetResponse getResponse = channel.basicGet(Constants.PULL_QUEUE, true);if (getResponse != null) {System.out.println("拉模式收到消息:" + new String(getResponse.getBody()));}// TODO 释放资源}}
http://www.yayakq.cn/news/445929/

相关文章:

  • 长春市长春网站建设哪家好网站制作厦门公司
  • 网站跟wordpress连接网页设计试题及答案
  • 网站开发结构有中国外贸导航网
  • 榆林建设银行的网站买什么样的主机(用来建网站的)支持下载
  • 万网网站建设特点通辽北京网站建设
  • 网站建设外包协议做的最好自考网站是哪个
  • 做印刷网站公司简介crm系统怎么用
  • 建一个网站多少钱免费网站流量统计工具
  • 硬笔书法网站是谁做的一站式营销推广平台
  • 网站的建设与应用广州越秀建网站
  • dedecms如何做网站毕业设计做课程网站好
  • 网站的经营推广自助网站建设用什么好
  • 医疗网站平台建设方案国外网站 备案
  • 产品展示类网站晋江外贸网站建设
  • 科技公司 网站 石家庄网站备案进程查询
  • aspcms网站源码做网络运营需要掌握什么
  • 淘宝做网站给了钱法律咨询东莞网站建设
  • 网站建设师百度百科国内做网站公司哪家好
  • 金科做的网站做网站和做app哪个成本高
  • 汽车之家网页网站体验优化
  • 公司网站如何做windows怎么安装wordpress
  • 阜宁网站制作选哪家别人帮做的网站怎么修改病句
  • 价格合理的网站建设南沙外贸网站建设
  • c2c网站代表和网址关系的网站
  • 设计公司网站价格天津seo优化
  • 邢台做wap网站找谁外网npv加速器
  • 手机好看的网站饮料招商网站大全
  • 做catia数据的网站西安千叶网站建设
  • 做pc端网站咨询建设一个直播网站多少钱
  • wordpress 删除自带主题深圳网站seo优化公司