当前位置: 首页 > news >正文

wordpress源码整站做网站软件war

wordpress源码整站,做网站软件war,做网站推广赚钱吗,成都市网站建设系统学习消息队列——RabbitMQ的发布确认高级篇 简介 ‌RabbitMQ是一个开源的消息代理软件,实现了‌高级消息队列协议(AMQP)‌,主要用于在分布式系统中进行消息传递。RabbitMQ由‌‌Erlang语言编写,具有高性能、健壮…

系统学习消息队列——RabbitMQ的发布确认高级篇

简介

‌RabbitMQ是一个开源的消息代理软件,实现了‌高级消息队列协议(AMQP)‌,主要用于在分布式系统中进行消息传递。RabbitMQ由‌‌Erlang语言编写,具有高性能、健壮性和可伸缩性,适用于各种规模的企业应用‌。

基本概念和功能

RabbitMQ作为一个消息中间件,主要功能包括接收和转发消息,支持“生产者-消费者模型”。生产者不断向消息队列中写入消息,而消费者则从队列中读取或订阅消息。RabbitMQ支持多种消息传递模式,如普通模式、工作模式、发布/订阅模式等,以满足不同的应用场景需求‌。

架构和关键组件

RabbitMQ的架构基于生产者-消费者模型,通过队列实现消息的存储和转发。队列具有先进先出(FIFO)的特性,并且可以设置持久化、独占、自动删除等属性。RabbitMQ还引入了交换机和路由键等概念,以实现更灵活和复杂的消息路由和分发机制‌。

应用场景和优势

RabbitMQ广泛应用于需要高并发处理、流量削峰、系统解耦和提高可靠性的场景。其优势包括:

  • ‌高性能‌:RabbitMQ能够处理高并发请求,确保系统的稳定运行。
  • ‌高可靠性‌:通过消息的持久化存储和故障恢复机制,确保消息不会丢失。
  • ‌灵活性‌:支持多种消息传递模式和路由规则,满足复杂的应用需求

1.消息发布确认的方案

2.消息的回退

3.备份交换机

1.消息发布确认的方案
在前面的文章中,系统学习消息队列——RabbitMQ的消息发布确认,我们一定程度上学习了消息的发布确认的基础,但是在生产环境中,由于RabbitMq的重启,RabbitMQ在重启过程中投递失败,导致消息丢失,需要手动处理和恢复。那么我们该如何保证当RabbitMQ不可用的时候,消息的稳定投递呢?

我们采取下面的方案:

我们将要发送消息做一个持久化,发送消息的时候,我们持久化一份到数据库或者缓存中,当发送消息失败的时候,我们进行一次重新发送。所以在发送消息的时候,我们要进行代码业务逻辑的处理:

yml:

server:
port:11000
spring:
rabbitmq:
host:127.0.0.1
port:5672
username:guest
password:guest
publisher-confirm-type:correlated

publisher-confirm-type这个参数一共有三种配置方法:

NONE:
禁用发布确认,是默认值。
CORRELATED:
发布消息后,交换机会触发回调方法。
SIMPLE:
有两种效果:
1:和CORRELATED一样会触发回调方法
2:发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。

回调方法类:

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {/*** 交换机是否收到消息的回调方法* CorrelationData 消息相关数据* ack 交换机是否收到消息* cause 交换机未收到消息的原因*/
@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("交换机已经收到 id 为:{}的消息", correlationData.getId());} else {
log.info("交换机还未收到 id 为:{}消息,由于原因:{}", correlationData.getId(), cause);}}}

队列配置类:

@Configuration
publicclassConfirmQueueConfig {publicstatic final StringCONFIRM_EXCHANGE_NAME = "confirm.exchange";
publicstatic final StringCONFIRM_QUEUE_NAME = "confirm.queue";@Autowired
privateMyCallBack myCallBack;
@Autowired
privateRabbitTemplate rabbitTemplate;//依赖注入 rabbitTemplate 之后再设置它的回调对象
@PostConstruct
publicvoidinit() {rabbitTemplate.setConfirmCallback(myCallBack);}//声明业务 Exchange
@Bean("confirmExchange")
publicDirectExchangeconfirmExchange(){
returnnewDirectExchange(CONFIRM_EXCHANGE_NAME);}// 声明确认队列
@Bean("confirmQueue")
publicQueueconfirmQueue(){
returnQueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}// 声明确认队列绑定关系
@Bean
publicBindingqueueBinding(@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange){
returnBindingBuilder.bind(queue).to(exchange).with("key1");}}

生产者:

@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";@Autowired
private RabbitTemplate rabbitTemplate;@GetMapping("sendMessage/{message}")public void sendMessage(@PathVariable String message) {
//指定消息 id 为 1CorrelationData correlationData1 = new CorrelationData("1");
//这个key1是有交换机的key,会发送成功String routingKey = "key1";rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData1);
//这个交换机不存在,会发送失败CorrelationData correlationData2 = new CorrelationData("2");rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME+"1", routingKey, message + routingKey, correlationData2);CorrelationData correlationData3 = new CorrelationData("3");
//这个key2是没有交换机的key,会发送失败routingKey = "key2";rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData3);log.info("发送消息内容:{}", message);}
}

消费者:

@Component
@Slf4j
publicclassConfirmConsumer {publicstatic final StringCONFIRM_QUEUE_NAME = "confirm.queue";@RabbitListener(queues =CONFIRM_QUEUE_NAME)
publicvoidreceiveMsg(Message message){
String msg=newString(message.getBody());log.info("接受到队列 confirm.queue 消息:{}",msg);}}

我们发送信息:
http://localhost:11000/confir...可以啊

我们发送三条消息:
一条是有交换机有队列的消息
二条是没有交换机的消息
三条是有交换机没有队列的消息

结果如下:

我们可以看出:
第一条消息正常消费
第二条消息找不到交换机,抛异常了
第三条消息绑定键找不到队列,这条消息直接被抛弃了

2.消息的回退

我们发现第三条消息的反馈并不是很好,在仅仅开启了生产者确认机制的情况下,交换机收到消息后,会直接给生产者发送确认消息,如果该消息不可路由,那么消息会直接被抛弃,此时生产者是不知道这条消息被丢弃的。所以我们这里要引入消息的回退机制,如果消息不能路由到队列,就会有一个通知,通过设置mandatory参数可以将不可抵达队列的消息返回给生产者。

回调处理逻辑:

@Component
@Slf4j
publicclassMyCallBackimplementsRabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {/*** 交换机是否收到消息的回调方法* CorrelationData 消息相关数据* ack 交换机是否收到消息* cause 交换机未收到消息的原因*/
@Override
publicvoidconfirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {log.info("交换机已经收到 id 为:{}的消息", correlationData.getId());} else {log.info("交换机还未收到 id 为:{}消息,由于原因:{}", correlationData.getId(), cause);}}@Override
publicvoidreturnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.error(" 消 息 {}, 被 交 换 机 {} 退 回 , 退 回 原 因 :{}, 路 由 key:{}",
newString(message.getBody()),exchange,replyText,routingKey);}}

修改一下前面那个配置类的方法:

//依赖注入 rabbitTemplate 之后再设置它的回调对象@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(myCallBack);
/*** true:* 交换机无法将消息进行路由时,会将该消息返回给生产者* false:* 如果发现消息无法进行路由,则直接丢弃*/rabbitTemplate.setMandatory(true);
//设置回退消息交给谁处理rabbitTemplate.setReturnCallback(myCallBack);}

继续发送消息:http://localhost:11000/confir...可以啊

我们发现,交换机路由不到的队列,也会有反馈了:

3.备份交换机

有了前面那个mandatory参数和回退消息,我们对于无法投递到目的地的消息,可以进行处理了。但是我们在处理这些日志的时候,顶多就是打印了一下日志,然后触发报警,接着手动进行处理。通过日志收集这些无法到达路由的消息非常不优雅,而且手动复制日志非常容易出错。而且mandatory参数设置,还得增加配置类,增加了复杂性。

如果我们不想丢失消息,又不想增加配置类,该怎么做呢?在前面学习死信队列的时候系统学习消息队列——RabbitMQ的死信队列,我们可以为队列设置死信交换机来处理那些失败的消息。

RabbitMQ中有备份交换机这种存在,它就像死信交换机一样,可以用来处理那些路由不到的消息,当交换机接收到一份不可路由的消息的时候,我们就会把这条消息转发到备份交换机中,由备份交换机进行统一处理。

@Configuration
publicclassConfirmQueueConfig {publicstatic final StringCONFIRM_EXCHANGE_NAME = "confirm.exchange";
publicstatic final StringCONFIRM_QUEUE_NAME = "confirm.queue";
publicstatic final StringBACKUP_EXCHANGE_NAME = "backup.exchange";
publicstatic final StringBACKUP_QUEUE_NAME = "backup.queue";
publicstatic final StringWARNING_QUEUE_NAME = "warning.queue";// 声明确认队列
@Bean("confirmQueue")
publicQueueconfirmQueue(){
returnQueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}
//声明确认队列绑定关系
@Bean
publicBindingqueueBinding(@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange){
returnBindingBuilder.bind(queue).to(exchange).with("key1");}//声明备份 Exchange
@Bean("backupExchange")
publicFanoutExchangebackupExchange(){
returnnewFanoutExchange(BACKUP_EXCHANGE_NAME);}//声明确认 Exchange 交换机的备份交换机
@Bean("confirmExchange")
publicDirectExchangeconfirmExchange(){
//设置该交换机的备份交换机
ExchangeBuilder exchangeBuilder =
ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME); 
return (DirectExchange)exchangeBuilder.build();}
// 声明警告队列
@Bean("warningQueue")
publicQueuewarningQueue(){
returnQueueBuilder.durable(WARNING_QUEUE_NAME).build();}
// 声明报警队列绑定关系
@Bean
publicBindingwarningBinding(@Qualifier("warningQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchangebackupExchange){
returnBindingBuilder.bind(queue).to(backupExchange);}
// 声明备份队列
@Bean("backQueue")
publicQueuebackQueue(){
returnQueueBuilder.durable(BACKUP_QUEUE_NAME).build();}
// 声明备份队列绑定关系
@Bean
publicBindingbackupBinding(@Qualifier("backQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange backupExchange){
returnBindingBuilder.bind(queue).to(backupExchange);}}

我们发现,不可路由的消息被发现后,就被送到了报警的备份队列里面。

而且这种配置的优先级,比mandatory参数更高。

http://www.yayakq.cn/news/305117/

相关文章:

  • 自己做网站不用WordPresswordpress单栏简洁
  • 网站建设营销平台沭阳网站建设托管
  • 做笔记的网站源码wordpress免签约支付宝
  • ps做图游戏下载网站有哪些内容中国商标查询官网
  • 关于卖零食网站建设需求分析wordpress模板更换
  • 昆明建设网站制作百度关键词优化的方法
  • 长春建筑网站网站开发费
  • 一个企业做网站需要什么资料wordpress文章编辑软件
  • 济南网站建设内容设计网站开发7个基本流程
  • 制作很好的网站wordpress页眉导航栏位置
  • 对于学校网站建设的建议做海报那个网站好
  • 17做网店类似网站免费网站制作作业
  • 企业网站的设计思路自己建设网站需要多少钱
  • 体育健身网站建设东莞营销网站建设价格
  • 漳州市城乡住房建设局网站如何建设网站使用
  • 百度网盟有哪些网站吉林省 网站建设
  • 宁波派桑网站建设热门活动页面html
  • 关于网站建设项目收取费用网站备案政策
  • 网站主体信息易点设计
  • 建站时网站地图怎么做海淀最新消息今天
  • 昆明做网站的网络公司长沙网络营销推广
  • 网站建设书籍免费南宁经典网站建设
  • 电脑记事本做复杂网站网页设计图片怎么换
  • asp网站伪静态规则wordpress 结构化数据
  • 中山网站建设外包cnnic网站
  • 网站的内链是什么意思郑州400建站网站建设
  • 目前做网站需要什么cms2015wordpress漏洞
  • 怎样健网站海山网站建设
  • 做物流网站的公司哪家好深圳影视传媒公司有哪些
  • 广东做网站的公司有哪些连云港网站优化