RocketMQ解决分布式事务

LCFlxfldy 2020-01-03

1.原理图:

RocketMQ解决分布式事务

 2.设计实现思路:

1.生产者(发送方)投递事务消息到Broker中,设置该消息为半消息 可以被消费;

2.开始执行我们的本地事务,本地事务执行的结果(回滚或者提交)发送Broker;

3.Broker获取回滚或者提交,如果是回滚的情况则删除该消息、如果是提交的话,该消息就可以被消费者消费;

4.Broker如果没有及时的获取发送方本地事务结果的话,会主动查询本地事务结果

核心代码发送方

@RestController
public class ProducerController {
    @Autowired
    private OrderService orderService;

    @RequestMapping("/sendMsg")
    public String sendMsg() throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        String orderId = orderService.sendOrder();
        return orderId;

    }
}
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "mayiktProducer")
public class SyncProducerListener implements RocketMQLocalTransactionListener {

    @Autowired
    private TransationalUtils transationalUtils;

    @Autowired
    private OrderMapper orderMapper;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        TransactionStatus beginStatus = null;
        try {
            beginStatus = transationalUtils.begin();
            MessageHeaders headers = message.getHeaders();
            String objMsg = (String) headers.get("msg");
            if (StringUtils.isEmpty(objMsg)) {
                return RocketMQLocalTransactionState.ROLLBACK;
            }
            OrderEntity orderEntity = JSONObject.parseObject(objMsg, OrderEntity.class);
            int result = orderMapper.addOrder(orderEntity);
            if (result > 0) {
                transationalUtils.commit(beginStatus);
            }
            log.info("【本地业务执行完毕】 msg:{}, Object:{}", message, o);
            return null;
        } catch (Exception e) {
            e.printStackTrace();
            log.error("【执行本地业务异常】 exception message:{}", e.getMessage());
            if (beginStatus != null) {
                transationalUtils.rollback(beginStatus);
            }
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        log.info("【执行检查任务】");
        MessageHeaders headers = message.getHeaders();
        String objMsg = (String) headers.get("msg");
        if (StringUtils.isEmpty(objMsg)) {
            return RocketMQLocalTransactionState.UNKNOWN;
        }
        OrderEntity orderEntity = JSONObject.parseObject(objMsg, OrderEntity.class);
        String orderId = orderEntity.getOrderId();
        OrderEntity orderDbEntity = orderMapper.findOrderId(orderId);
        if (orderDbEntity == null) {
            return RocketMQLocalTransactionState.UNKNOWN;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }
}

消费者

@Service
@RocketMQMessageListener(topic = "orderTopic", consumerGroup = "mayiktTopic")
public class OrdeConsumer implements RocketMQListener<String> {
    @Autowired
    private DispatchMapper dispatchMapper;

    @Override
    public void onMessage(String msg) {
        OrderEntity orderEntity = JSONObject.parseObject(msg, OrderEntity.class);
        System.out.println(orderEntity.toString());
    }

}

手动事务

@Service
public class TransationalUtils {
    @Autowired
    public DataSourceTransactionManager transactionManager;

    public TransactionStatus begin() {
        TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionAttribute());
        return transaction;
    }

    public void commit(TransactionStatus transaction) {
        transactionManager.commit(transaction);

    }

    public void rollback(TransactionStatus transaction) {
        transactionManager.rollback(transaction);
    }

}

注:该代码来源于蚂蚁课堂(www.mayikt.com),于本人学习使用。

相关推荐