当前位置: 首页 > news >正文

页制作与网站建设技术大全公司网站如何做的美丽

页制作与网站建设技术大全,公司网站如何做的美丽,wordpress 调用随即文章,网站开发的3个阶段目录 一、序言二、生产者确保消息发送成功1、为什么需要Publisher Confirms2、哪些消息会被确认处理成功 三、消费者保证消息被处理四、Spring RabbitMQ支持代码示例1、 application.yml2、RabbigtMQ配置3、可靠生产者配置4、可靠消费者配置5、测试用例 一、序言 在有些业务场…

目录

  • 一、序言
  • 二、生产者确保消息发送成功
    • 1、为什么需要Publisher Confirms
    • 2、哪些消息会被确认处理成功
  • 三、消费者保证消息被处理
  • 四、Spring RabbitMQ支持代码示例
    • 1、 application.yml
    • 2、RabbigtMQ配置
    • 3、可靠生产者配置
    • 4、可靠消费者配置
    • 5、测试用例

一、序言

在有些业务场景中,消息是不能丢的,比如分布式事务资金动账,出账方扣款,那么入账方就一定要收款。以前写了一篇分布式事务的文章,里面的跨地区转账就是一个实际案例。

消息是有可能丢的,比如生产者在发送消息时broker服务挂了,消息没有来得及落盘,这时消息就彻底丢了。

保证MQ消息可靠传输主要有两个方面,一方面是消息生产者确保消息一定发送成功,另一方面是消费者确保消息一定被处理。


二、生产者确保消息发送成功

1、为什么需要Publisher Confirms

在Spring AMQP中AmqpTemplate的实现RabbitTemplate已经支持 Publisher Confirms and Returns,所谓的publisher confirms意思就是消息发布者确认消息是否已经被发送。

在RabbitMQ官方文档描述中,持久化的消息在Broker重启时也是应该存活的,这里的词用的是应该,因为消息有可能在落地磁盘前Broker就挂了,导致消息丢失。

最直接的解决方案是通过事务,但是通过事务有两个问题:

  • 事务阻塞:发布者必须等待Broker处理完每条消息。
  • 事务很重:每次提交都会要求触发fsync(),强制磁盘,这个过程需要花很长的时间。

备注:在RabbitMQ官方测试中,通过事务去保证,发布10000条消息需要花至少4分钟的时间。

在这里插入图片描述

而通过Publisher Confirm机制,一旦Broker处理完就会确认消息,而且这个过程是异步的,生产者可以流式发布消息,不需要等待Broker,并且Broker会批量高效将消息落盘。

2、哪些消息会被确认处理成功

当Broker确认消息时,会通知消息发布者消息是否被成功处理,成功处理的基本规则如下:

  • 无法路由的mandatory(必须有符合条件的队列)和immediate(必须有消费者在线)类型在被basic.return后会被确认。
  • 非持久化消息在入队时会被确认。
  • 持久化消息当持久化到磁盘或者被消费者消费时会被确认。

三、消费者保证消息被处理

消费者端确保消息消费很简单,关闭消息自动确认就好,开启消息手动确认。当然有些场景消息只能被处理一次,可以通过分布式锁来实现。


四、Spring RabbitMQ支持代码示例

1、 application.yml

server:port: 8080
spring:rabbitmq:addresses: localhost:5672username: adminpassword: adminvirtual-host: /publisher-returns: truepublisher-confirm-type: correlatedlistener:type: simplesimple:acknowledge-mode: manualconcurrency: 5max-concurrency: 20prefetch: 5template:mandatory: true

备注:

  • 这里一定要设置spring.rabbitmq.publisher-returnstrue,并且设置spring.rabbitmq.publisher-confirm-typecorrelated,同时设置spring.rabbitmq.template.mandatorytrue
  • 上面我们将消费者的确认模式改为了手动确认

2、RabbigtMQ配置

@Configuration
public class RabbitReliableTransportConfig {/*** RabbitTemplate消息转换器配置,自动将对象转换为json字符串** @return*/@Beanpublic MessageConverter jackson2JsonMessageConverter() {Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();messageConverter.setClassMapper(new DefaultJackson2JavaTypeMapper());return messageConverter;}@Beanpublic Queue reliableQueue() {return QueueBuilder.durable("reliable-queue").build();}
}

3、可靠生产者配置

@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqReliableProducer {private final RabbitTemplate rabbitTemplate;public void sendReliableMsg(String body) {// 发送可靠消息ReliableMsgDTO reliableMsgDTO = ReliableMsgDTO.builder().body(body).build();CorrelationData correlationData = new CorrelationData();rabbitTemplate.convertAndSend("reliable-queue", reliableMsgDTO, correlationData);// 发送确认逻辑CompletableFuture<Confirm> future = correlationData.getFuture().completable();future.whenComplete((confirm, throwable) -> {if (confirm.isAck()) {log.info("消息已经被成功发送, 消息内容:{}", JSON.toJSONString(reliableMsgDTO));return;}log.warn("消息发送未成功发送, 原因:{}, 消息内容:{}", confirm.getReason(), JSON.toJSONString(reliableMsgDTO), throwable);// 5秒后再发送LockSupport.parkNanos(5 * 1000 * 1000 * 1000L);rabbitTemplate.convertSendAndReceive(reliableMsgDTO, correlationData);});}
}

4、可靠消费者配置

@Slf4j
@Component
public class RabbitMQReliableConsumer {@RabbitListener(queues = "reliable-queue")public void handleMsgFromQueue(ReliableMsgDTO reliableMsgDTO, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {channel.basicAck(tag, false);// channel.basicNack(tag, false, false);log.info("Message received from queue, message body: {}", JSON.toJSONString(reliableMsgDTO));}
}

备注:这里我们开启了消息的手动确认,如果消息处理失败没有确认,那么消息将会在下次消费者参加连接时再次被投递。

5、测试用例

测试结果如下,每当消息发送至Broker成功后会触发回调,如果消息发送失败将会触发重新发送。

2024-01-20 18:13:11.399  INFO 12316 --- [78.107.127:5672] c.u.r.i.p.RabbitMqReliableProducer       : 消息已经被成功发送, 消息内容:{"body":"hello"}
2024-01-20 18:13:11.399  INFO 12316 --- [ntContainer#0-5] c.u.r.i.c.RabbitMQReliableConsumer       : Message received from queue, message body: {"body":"hello"}

在这里插入图片描述

http://www.yayakq.cn/news/788191/

相关文章:

  • 网站开发绩效指标WordPress nas
  • 长沙网站设计公司怎么样做海报素材网站
  • 怎么免费做自己的网站常州住房和城乡建设局网站首页
  • 秀山微信网站建设做企业宣传片的网站
  • 法华寺网站建设外贸网站外链
  • 青岛知名网站建设在线制作头像生成
  • 网站建设中 html模板html代码怎么变成网页
  • 一元云购网站黑客攻击自己装修设计软件
  • 免费的舆情网站app下载如何用手机制作app软件
  • 为企业做一个网站多少钱做淘宝好还是自建网站好
  • 营销型企业网站建设策划河南品牌网站建设
  • 深圳建设工程协会网站南京做网站的公司
  • 淘宝客怎么做的网站推广wordpress api 发贴
  • 企业建站费用情况帮人建网站价格
  • 小女孩做网站一 网站建设方案
  • 青海建设局网站东莞建设网站制作
  • 虚拟主机和服务器青岛做优化网站哪家好
  • 网站建设设计官网kuake自助建站系统源码
  • 网站怎么做登陆nas 支持做网站
  • 宁波seo推广推荐公司整站优化该怎么做
  • 建收费网站网站对公司的作用是什么
  • 做网站后台数据库建设展厅布展方案设计
  • 网站按钮psd深圳网站制作哪家负责
  • 深圳正规网站开发团队在新加坡注册公司需要什么条件
  • 泰和网站建设企业seo策划方案优化案例
  • 公司网站优化推广全网推广的方式有哪些
  • 常见的营销型网站但无法上网
  • 怎么检测网站是否安全做变性手术视频网站
  • 如何让域名到网站汽车网站建设报价
  • 电子设计大赛网站开发网站开发外包公司