毕设 网站开发,360竞价推广,专业团队下一句,长沙哪家制作网站好这是本人学习的总结#xff0c;主要学习资料如下
马士兵教育rocketMq官方文档 目录 1、分布式事务的难题2、解决方式2.1、半事务消息和事务回查2.2、代码样例2.2.1、TransactionListener2.2.2、TransactionMQProducer2.2.3、MessageListenerConcurrently2.2.4、流程图 1、分布…这是本人学习的总结主要学习资料如下
马士兵教育rocketMq官方文档 目录 1、分布式事务的难题2、解决方式2.1、半事务消息和事务回查2.2、代码样例2.2.1、TransactionListener2.2.2、TransactionMQProducer2.2.3、MessageListenerConcurrently2.2.4、流程图 1、分布式事务的难题
现有两个系统A向B转钱。A系统扣钱和B系统加钱就应该属于同一个事务任何一个失败都要回滚。两个系统之间唯一的通信方式就是RocketMQ。
以最朴素的想法现在就有两个实现分布式事务的方案。但这两个都有比较大的不可靠性。
A系统先扣钱再发送MQ这样的弊端是无法确定消息有没有发送到MQ或者消息有没有被MQ保存。总之这做法缺少一些回查的机制。A系统先发送MQ再扣钱这样的弊端是发送消息后A系统可能出现错误回滚。而B收到了消息就正常消费完全不知道A那边出了问题。 2、解决方式
2.1、半事务消息和事务回查
半事务消息半事务消息是指向RocketMQ发送一条消息但这个消息只存放在CommitLog中并不在ConsumeQueue展示。也就是说该消息被RocketMQ接收了但是消费者却无法消费到这条消息。事务回查在半事务消息发送成功后。A系统执行事务如果成功则MQ将消息变成正常消息失败则不发送消息。这里如果业务太复杂还不能确定事务是否完成的话还可以发送UNKNOWN给MQ这样MQ就会有定时器去检查事务是否完成。 RocketMQ会向生产者询问是否可以把半事务变成正常的消息让消费者可以消费到。在这篇文章的例子就是询问A系统扣款有没有扣成功。如果成功了那就让B系统消费消息。 所以呢通过半事务消息和事务回查就能保证A系统和发送消息具有事务即扣款失败则不发送消息扣款成功则发送消息。所以半事务消息至少保证了生产者和MQ之间的原子性。MQ和消费者之间的原子性需要另外处理。
消费者需要保证幂等性失败后重试即使称为死信后也特殊处理等操作来保证事务。这个例子中B系统成功加钱的话那交易结束如果尝试多次后还是失败那就需要一个机制来通知A系统让他把扣掉的钱加回去。
2.2、代码样例
2.2.1、TransactionListener
一个接口规范我们需要实现这个接口来定义本地事务和事务回查。
就是本地事务具体执行成功后怎么办失败了怎么办。定时的事务回查如何检查事务有没有完成。这些东西都要定义在TransactionListener的实现中。 TransactionListener transactionListener new TransactionListener() {Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {// 执行本地事务A扣100块// 如果成功// return LocalTransactionState.COMMIT_MESSAGE;// 如果失败// return LocalTransactionState.ROLLBACK_MESSAGE;//或者业务比较复杂不想在这个阶段就关闭事务可以返回Unknown之后就需要MQ定时事务回查return LocalTransactionState.UNKNOW;}Override// 事务回查默认一分钟一次public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {System.out.println(事务回查 new SimpleDateFormat(yyyyMMdd, HH:mm:ss).format(new Date()));// 如果成功// return LocalTransactionState.COMMIT_MESSAGE;// 如果失败// return LocalTransactionState.ROLLBACK_MESSAGE;// 业务比较长还不确定成功或失败返回unknown下次再查return LocalTransactionState.UNKNOW;}};2.2.2、TransactionMQProducer
半事务消息的生产者在DefaultMQProducer的基础上新增了一个重要的参数类型是ExecutorService。这个线程池是用来生产线程去完成事务回查。
但是事务回查的逻辑不需要定义在线程的run()方法中这一部分放在TransactionListener中。 TransactionMQProducer producer new TransactionMQProducer(transaction_producer);producer.setNamesrvAddr(localhost:9876);// build a thread pool used to for MQ to call back to check transactionExecutorService executorService new ThreadPoolExecutor(2, 5, 100, TimeUnit.MINUTES, new ArrayBlockingQueue(10), (r) - {Thread thread new Thread(r);thread.setName(client-transaction-msg-check-thread);return thread;});producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);producer.start();try{Message msg new Message(transaction_producer, null, A give B 100 dollar.getBytes());SendResult sendResult producer.sendMessageInTransaction(msg, null);}catch(Exception e) {// rollbackSystem.out.println(rollback);}producer.shutdown();2.2.3、MessageListenerConcurrently
消费者部分就比较简单只要listener是MessageListenerConcurrently就好。
DefaultMQPushConsumer consumer new DefaultMQPushConsumer(transaction_consumer);
consumer.setNamesrvAddr(localhost:9876);
consumer.subscribe(TransactionalTopic, *);
consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext context) {try{for(MessageExt msg: list) {// simulate DB actionSystem.out.println(update B where transactionId msg.getTransactionId());System.out.println(Success consume msg: msg.getMsgId());}} catch (Exception e) {e.printStackTrace();System.out.println(Failed to consume meg, try more times);// means that failed to consume this msg. In next time will still consume this msg.return ConsumeConcurrentlyStatus.RECONSUME_LATER;}// means that success to consume this msg. In the next time will consume next msg.return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});consumer.start();
while(true){
}2.2.4、流程图