当前位置: 首页 > news >正文

苏州网站建设介绍淘宝客单页网站怎么做

苏州网站建设介绍,淘宝客单页网站怎么做,湖南网站建设平台,dreamwearver做网站地图这是本人学习的总结,主要学习资料如下 马士兵教育rocketMq官方文档 目录 1、分布式事务的难题2、解决方式2.1、半事务消息和事务回查2.2、代码样例2.2.1、TransactionListener2.2.2、TransactionMQProducer2.2.3、MessageListenerConcurrently2.2.4、流程图 1、分布…

这是本人学习的总结,主要学习资料如下

  • 马士兵教育
  • rocketMq官方文档

目录

  • 1、分布式事务的难题
  • 2、解决方式
    • 2.1、半事务消息和事务回查
    • 2.2、代码样例
      • 2.2.1、TransactionListener
      • 2.2.2、TransactionMQProducer
      • 2.2.3、MessageListenerConcurrently
      • 2.2.4、流程图


1、分布式事务的难题

现有两个系统,A向B转钱。A系统扣钱和B系统加钱就应该属于同一个事务,任何一个失败都要回滚。两个系统之间唯一的通信方式就是RocketMQ
请添加图片描述

以最朴素的想法,现在就有两个实现分布式事务的方案。但这两个都有比较大的不可靠性。

  • A系统先扣钱再发送MQ:这样的弊端是无法确定消息有没有发送到MQ,或者消息有没有被MQ保存。总之这做法缺少一些回查的机制。
  • A系统先发送MQ再扣钱:这样的弊端是发送消息后,A系统可能出现错误回滚。而B收到了消息就正常消费,完全不知道A那边出了问题。

2、解决方式

2.1、半事务消息和事务回查

  • 半事务消息:半事务消息是指向RocketMQ发送一条消息,但这个消息只存放在CommitLog中,并不在ConsumeQueue展示。也就是说该消息被RocketMQ接收了,但是消费者却无法消费到这条消息。
  • 事务回查:在半事务消息发送成功后。A系统执行事务,如果成功则MQ将消息变成正常消息,失败则不发送消息。这里如果业务太复杂还不能确定事务是否完成的话,还可以发送UNKNOWN给MQ,这样MQ就会有定时器去检查事务是否完成。
    RocketMQ会向生产者询问是否可以把半事务变成正常的消息让消费者可以消费到。在这篇文章的例子就是询问A系统扣款有没有扣成功。如果成功了那就让B系统消费消息。

请添加图片描述

所以呢,通过半事务消息事务回查就能保证A系统和发送消息具有事务,即扣款失败则不发送消息,扣款成功则发送消息。所以半事务消息至少保证了生产者和MQ之间的原子性。MQ和消费者之间的原子性需要另外处理。

消费者需要保证幂等性,失败后重试,即使称为死信后也特殊处理等操作来保证事务。这个例子中B系统成功加钱的话那交易结束,如果尝试多次后还是失败,那就需要一个机制来通知A系统,让他把扣掉的钱加回去。

2.2、代码样例

2.2.1、TransactionListener

一个接口规范,我们需要实现这个接口来定义本地事务和事务回查。

就是本地事务具体执行,成功后怎么办,失败了怎么办。定时的事务回查如何检查事务有没有完成。这些东西都要定义在TransactionListener的实现中。


TransactionListener transactionListener = new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {// 执行本地事务,A扣100块// 如果成功// return LocalTransactionState.COMMIT_MESSAGE;// 如果失败// return LocalTransactionState.ROLLBACK_MESSAGE;//或者业务比较复杂,不想在这个阶段就关闭事务,可以返回Unknown,之后就需要MQ定时事务回查return LocalTransactionState.UNKNOW;}@Override// 事务回查,默认一分钟一次public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {System.out.println("事务回查, " + new SimpleDateFormat("yyyyMMdd, HH:mm:ss").format(new Date()));// 如果成功// return LocalTransactionState.COMMIT_MESSAGE;// 如果失败// return LocalTransactionState.ROLLBACK_MESSAGE;// 业务比较长,还不确定成功或失败,返回unknown,下次再查return LocalTransactionState.UNKNOW;}};

2.2.2、TransactionMQProducer

半事务消息的生产者,在DefaultMQProducer的基础上新增了一个重要的参数,类型是ExecutorService。这个线程池是用来生产线程去完成事务回查。

但是事务回查的逻辑不需要定义在线程的run()方法中,这一部分放在TransactionListener中。

 TransactionMQProducer producer = new TransactionMQProducer("transaction_producer");producer.setNamesrvAddr("localhost:9876");// build a thread pool used to for MQ to call back to check transactionExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.MINUTES, new ArrayBlockingQueue<>(10), (r) -> {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;});producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);producer.start();try{Message msg = new Message("transaction_producer", null, "A give B 100 dollar".getBytes());SendResult sendResult = producer.sendMessageInTransaction(msg, null);}catch(Exception e) {// rollbackSystem.out.println("rollback");}producer.shutdown();

2.2.3、MessageListenerConcurrently

消费者部分就比较简单,只要listener是MessageListenerConcurrently就好。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_consumer");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TransactionalTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {try{for(MessageExt msg: list) {// simulate DB actionSystem.out.println("update B where transactionId" + msg.getTransactionId());System.out.println("Success consume msg: " + msg.getMsgId());}} catch (Exception e) {e.printStackTrace();System.out.println("Failed to consume meg, try more times");// means that failed to consume this msg. In next time will still consume this msg.return ConsumeConcurrentlyStatus.RECONSUME_LATER;}// means that success to consume this msg. In the next time will consume next msg.return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});consumer.start();
while(true){
}

2.2.4、流程图

请添加图片描述

http://www.yayakq.cn/news/145792/

相关文章:

  • 钛钢饰品网站建设做淘客网站能干嘛
  • 网站建设岗位主要做什么我国档案网站建设比较分析
  • 遂宁公司做网站搜索引擎广告属于什么渠道
  • 网站建设人员分工表网站备案没通过不了
  • 国外网站推广平台有哪些公司个人网站申请空间
  • 网站竞价托管建设网站的网站公司
  • 网站建设seo合同书腾讯云申请域名
  • 绵阳网站建设费用外贸营销平台
  • google网站优化工具网络营销定价的特点
  • 公司网站开发文档四川住房建设部网站
  • 河南住房城乡建设厅官方网站建网站得钱吗
  • 宁波企业建站网站页面描述
  • 五华网站建设 优帮云做网站后有人抢注品牌关键字
  • 用vs做网站营销网站建设教程
  • 怎么做卡盟网站招聘模板图片
  • 相亲网站透露自己做理财的女生购物网站代码模板
  • 淄博网站建设多广告优化师怎么学
  • 医院网站建设技术方案网页制作工具有
  • 响应式 企业网站网站建设项目体会
  • 北京建设工程协会网站做网站平方根怎么表示
  • 优秀的网站开发抖音网红代运营
  • 广州市建设工程交易服务中心网站外贸开发网站建设
  • 谁知道我的世界做行为包的网站啊网站如何备案
  • 平面设计网站首页东莞网站优化中易
  • 营销型网站的付费推广渠道建设网站的制作步骤
  • 网站建设网址网站制作网页设计的素材图片
  • 网页制作师招聘seo快速排名代理
  • 网站建设一般流程猎头公司猎头
  • 网站开发需要什么专业的人才做网站包括什么
  • 网站运维托管汽车之家网页