重学SpringBoot3-集成RocketMQ(二)

news/2024/9/18 20:19:12 标签: rocketmq, springboot, java

更多SpringBoot3内容请关注我的专栏:《SpringBoot3》
期待您的点赞👍收藏⭐评论✍

重学SpringBoot3-集成RocketMQ(二)

  • 1. 基础概念
  • 2. 准备工作
  • 3. 实现事务消息的生产者
  • 4. 事务监听器实现
  • 5. 消费者示例
  • 6. 发送事务消息
  • 7. 测试
    • 7.1 模拟本地事务正常提交
    • 7.2 模拟本地事务提交失败,未回查
    • 7.3 模拟本地事务提交失败,回查成功
    • 7.4 模拟本地事务提交失败,回查失败
    • 7.5 模拟本地事务提交成功,消费失败
  • 关键点总结

今天介绍下如何在 Spring Boot 3 中与 RocketMQ 整合实现分布式事务。RocketMQ 提供了类似 X/Open XA 的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA 是一种分布式事务解决方案,一种分布式事务处理模式。下面详细介绍下 RocketMQ 如何实现事务消息。

1. 基础概念

在 RocketMQ 中,半消息(Half Message)主要用于实现事务消息。它是指生产者在发送事务消息时,RocketMQ 会先将消息保存为 半消息,等待事务状态的最终确认,确保消息的可靠性和一致性。

图片来源:https://www.cnblogs.com/dennyzhangdd/p/14572024.html

半消息的工作流程:

  1. 发送半消息:生产者首先将消息发送到 RocketMQ,RocketMQ 将其标记为半消息,并暂时存储到消息队列中,但这时消费者不会收到该消息。
  2. 执行本地事务:生产者在发送半消息后,开始执行自己的本地事务操作。
  3. 提交或回滚消息
    • 如果本地事务成功,生产者会通知 RocketMQ 提交消息,RocketMQ 将半消息转换为正常消息,并发送给消费者。
    • 如果本地事务失败,生产者会通知 RocketMQ回滚消息,RocketMQ 会删除该半消息。
  4. 事务状态回查:如果 RocketMQ 没有收到生产者的事务状态确认,RocketMQ 会通过回查机制询问生产者事务的最终状态,确保消息的一致性。

现有一个案例,后端文件上传接口,同时上传 OSS 和 MySQL,数据库负责文件元信息的增删改查, OSS负责存储文件对象,如何保证最终一致性?下面用RocketMQ 的事务消息来实现最终一致性。

2. 准备工作

请参考《重学SpringBoot3-集成RocketMQ(一)》进行环境搭建和配置工作。配置文件新增如下配置:

  consumer2:
    group: springboot-consumer-group2  # 新的消费者组名称
    topic: transaction-topic  # 订阅新的主题
    access-key: RocketMQ    # 若启用了 ACL 功能
    secret-key: 12345678    # 若启用了 ACL 功能

3. 实现事务消息的生产者

创建一个事务消息的生产者类,通过事务生产者发送消息,并处理本地事务逻辑。

java">import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class TransactionalMessageProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送事务消息
     */
    public void sendTransactionMessage(String topic, String message) {
        TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(topic, MessageBuilder.withPayload(message).build(), null);
        System.out.println("Transaction message sent: " + sendResult.getLocalTransactionState());
    }
}

4. 事务监听器实现

通过实现 RocketMQLocalTransactionListener 接口,定义事务的提交或回滚逻辑。

java">package com.example.boot308rocketmq;

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

/**
 * @author CoderJia
 * @create 2024/09/12 15:06
 * @Description
 **/
@Component
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务逻辑,根据业务情况返回事务的提交或回滚状态
        try {
            // 模拟本地事务处理逻辑
            System.out.println("Executing local transaction...");
            boolean success = performLocalTransaction();
            if (success) {
                return RocketMQLocalTransactionState.COMMIT;
            } else {
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 事务回查逻辑,确认本地事务的最终状态
        System.out.println("Checking local transaction...");

        // 根据本地事务的处理结果返回 COMMIT_MESSAGE 或 ROLLBACK_MESSAGE
        System.out.println("local transaction check success");
        return RocketMQLocalTransactionState.COMMIT;
    }

    private boolean performLocalTransaction() {
        // TODO 模拟本地事务处理文件上传OSS
        try {
            System.out.println("Upload files to OSS...");
            Thread.sleep(3000);
            System.out.println("File upload to OSS completed");
            return true;
        } catch (InterruptedException e) {
            System.out.println("Failed to upload file to OSS");
            return false;
        }
    }

}

5. 消费者示例

创建一个消费者,订阅并消费事务消息。

RocketMQListener<String> 是一个接口类型,用于定义一个 RocketMQ 消息监听器,它指定接收的消息类型为 String。在 RocketMQ 中,消费者可以通过实现 RocketMQListener 接口来自动处理接收到的消息。

java">import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(topic = "transaction-topic", consumerGroup = "transaction-consumer-group")
public class TransactionalMessageConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        // 处理接收到的消息
        System.out.printf("Received message: %s%n", message);
    }
}

6. 发送事务消息

在服务中调用事务消息生产者:

java">import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {

    @Resource
    private RocketMQProducer rocketMQProducer;

    @GetMapping("/sendTransactionMessage")
    public ResponseEntity<String> sendTransactionMessage(@RequestParam String message) {
        rocketMQProducer.sendTransactionMessage("transaction-topic", message);
        return ResponseEntity.ok("Transaction message sent: " + message);
    }
}

7. 测试

7.1 模拟本地事务正常提交

如下图观察到,当本地事务即文件上传完成之后,生产者会通知 RocketMQ 提交消息,RocketMQ 将半消息转换为正常消息,并发送给消费进行消费。

本地事务正常提交

7.2 模拟本地事务提交失败,未回查

在 RocketMQ 中,如果消息生产者没有在规定的时间内向消息队列确认事务状态,RocketMQ 会通过回查机制(即“回滚检查”或“事务回查”)来询问生产者事务的最终状态,从而确保消息的一致性。broker.conf 中配置 transactionCheckMax=10000 表示 RocketMQ 最长等待 10 秒后进行事务状态回查。

本地事务提交失败,未回查

7.3 模拟本地事务提交失败,回查成功

本人搭建 RocketMQ 设置的回查时间为15s,所以将本地事务执行时间修改为 16s,这样会触发 RocketMQ 进行事务状态回查。

transactionCheckMax

事务状态回查成功

7.4 模拟本地事务提交失败,回查失败

修改回查方法的返回值,让RocketMQ 回查本地状态值将消息进行回滚,消费者同样不会消费消息。

回查失败

7.5 模拟本地事务提交成功,消费失败

例如生产者本地事务执行成功,但是消费者消费失败的情况,RocketMQ 会进行消息重试,直至成功。

修改一下消费者处理逻辑:

java">package com.example.boot308rocketmq;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**
 * @author CoderJia
 * @create 2024/09/12 15:06
 * @Description
 **/
@Slf4j
@Service
@RocketMQMessageListener(topic = "transaction-topic", consumerGroup = "springboot-consumer-group2")
public class TransactionalMessageConsumer02 implements RocketMQListener<MessageExt> {


    @Override
    public void onMessage(MessageExt message) {
        try {
            // 处理消息的业务逻辑
            String msgBody = new String(message.getBody(), "UTF-8");
            log.info("Received message:{}", msgBody);

            // 模拟业务处理
            boolean success = processBusinessLogic(msgBody);

            if (!success) {
                throw new RuntimeException("Business processing failure");
            }

            log.info("Business processing successful");

        } catch (Exception e) {
            log.error("MsgID:{},reconsumeTimes:{},e:{}", message.getMsgId(), message.getReconsumeTimes(), e.getMessage());

            // 重新抛出异常,让 RocketMQ 进行重试
            throw new RuntimeException("Message consumption failed, retrying");
        }
    }

    private boolean processBusinessLogic(String message) {
        // 这里是业务逻辑,返回 true 表示成功,false 表示失败
        // 例如:数据库操作或其他远程调用
        return Math.random() > 0.5; // 模拟随机成功或失败
    }
}

消费失败

关键点总结

  1. 事务消息发送:通过 RocketMQTemplate.sendMessageInTransaction() 方法发送事务消息。
  2. 本地事务处理:实现 TransactionListener 接口的 executeLocalTransaction() 方法处理本地事务逻辑。
  3. 事务回查:在 checkLocalTransaction() 方法中定义如何检查事务消息的最终状态。
    以上就是 SpringBoot3 结合 RocketMQ 实现的事务消息,提供了分布式事务中的最终一致性。

http://www.niftyadmin.cn/n/5664463.html

相关文章

《仙境传说RO:新启航》游戏攻略,VMOS云手机辅助高效挂机助攻!

在《仙境传说RO&#xff1a;新启航》中&#xff0c;游戏的玩法非常丰富&#xff0c;但对很多玩家而言&#xff0c;想要全方位提升游戏体验并不容易。借助VMOS云手机&#xff0c;可以为玩家提供更为便捷和高效的辅助工具。VMOS云手机特别定制了适用于《仙境传说RO&#xff1a;新…

漏洞挖掘 | 记录第一个src厂商的支付漏洞

前言 最近去搜了一些厂商SRC试一下&#xff0c;没想到已学的知识也能挖掘到逻辑漏洞&#xff0c;下面给大家说一下这个逻辑漏洞的思路&#xff0c;没想到居然存在的&#xff0c;也算是运气好吧&#xff0c;希望对大家有所帮助。纯实战文章&#xff0c;一点都不水&#xff0c;且…

HAL库固件包的获取和KEIL软件的DFP安装

一、HAL库固件包 HAL库&#xff08;Hardware Abstraction Layer库&#xff09;是STMicroelectronics为其STM32系列微控制器提供的一套硬件抽象层软件。它的主要目的是简化硬件级编程&#xff0c;提供一组标准化的API接口&#xff0c;使得开发者可以不必深入了解底层硬件的细节…

【C++登堂入室】类和对象(下)

目录 一、 再谈构造函数 1.1 构造函数体赋值 1.2 初始化列表 1.3 explicit关键字 二、static成员 2.1 概念 2.2 特性 三、友元函数 3.1 友元函数 3.2 友元类 四、内部类 五、 再次理解类和对象 结尾 一、 再谈构造函数 1.1 构造函数体赋值 在创建对象时&#xf…

python多进程程序设计 之二

python多进程程序设计 之二 ProcessPoolExecutor构造器成员函数map成员函数submit实列代码 ProcessPoolExecutor ProcessPoolExecutor 类是 Executor 子类&#xff0c;它使用进程池异步执行调用。 ProcessPoolExecutor 使用multiprocessing模块&#xff0c;这允许它绕过全局解…

千益畅行:旅游卡免费服务,包含哪些内容?

​凭此卡可免费旅游&#xff0c;单卡支持2-6人同行&#xff0c;免费服务内容包含&#xff1a;酒店住宿、团餐、景区首道大门票、导游、大巴、旅游责任险、接送机等&#xff0c;目前支持全国40多条旅游线路&#xff0c;爱旅游的朋友们可以约起来&#xff01; #旅游卡服务#

CSP-J 算法基础 广度优先搜索BFS

文章目录 前言广度优先搜索是什么广度优先搜索的实现BFS 的具体编程实现举例&#xff1a;广度优先搜索的具体步骤初始状态&#xff1a;步骤 1&#xff1a;加入起点节点 1步骤 2&#xff1a;访问队列中的节点 1&#xff0c;加入其邻居节点 2 和 4步骤 3&#xff1a;访问队列中的…

(CS231n课程笔记)深度学习之损失函数详解(SVM loss,Softmax,熵,交叉熵,KL散度)

学完了线性分类&#xff0c;我们要开始对预测结果进行评估&#xff0c;进而优化权重w&#xff0c;提高预测精度&#xff0c;这就要用到损失函数。 损失函数&#xff08;Loss Function&#xff09;是机器学习模型中的一个关键概念&#xff0c;用于衡量模型的预测结果与真实标签…