LCFlxfldy 2020-01-03
1.生产者(发送方)投递事务消息到Broker中,设置该消息为半消息 不可以被消费;
2.开始执行我们的本地事务,将本地事务执行的结果(回滚或者提交)发送给Broker;
3.Broker获取回滚或者提交,如果是回滚的情况则删除该消息、如果是提交的话,该消息就可以被消费者消费;
4.Broker如果没有及时的获取发送方本地事务结果的话,会主动查询本地事务结果。
@RestController public class ProducerController { @Autowired private OrderService orderService; @RequestMapping("/sendMsg") public String sendMsg() throws MQClientException, RemotingException, InterruptedException, MQBrokerException { String orderId = orderService.sendOrder(); return orderId; } }
@Slf4j @Component @RocketMQTransactionListener(txProducerGroup = "mayiktProducer") public class SyncProducerListener implements RocketMQLocalTransactionListener { @Autowired private TransationalUtils transationalUtils; @Autowired private OrderMapper orderMapper; @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { TransactionStatus beginStatus = null; try { beginStatus = transationalUtils.begin(); MessageHeaders headers = message.getHeaders(); String objMsg = (String) headers.get("msg"); if (StringUtils.isEmpty(objMsg)) { return RocketMQLocalTransactionState.ROLLBACK; } OrderEntity orderEntity = JSONObject.parseObject(objMsg, OrderEntity.class); int result = orderMapper.addOrder(orderEntity); if (result > 0) { transationalUtils.commit(beginStatus); } log.info("【本地业务执行完毕】 msg:{}, Object:{}", message, o); return null; } catch (Exception e) { e.printStackTrace(); log.error("【执行本地业务异常】 exception message:{}", e.getMessage()); if (beginStatus != null) { transationalUtils.rollback(beginStatus); } return RocketMQLocalTransactionState.ROLLBACK; } } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { log.info("【执行检查任务】"); MessageHeaders headers = message.getHeaders(); String objMsg = (String) headers.get("msg"); if (StringUtils.isEmpty(objMsg)) { return RocketMQLocalTransactionState.UNKNOWN; } OrderEntity orderEntity = JSONObject.parseObject(objMsg, OrderEntity.class); String orderId = orderEntity.getOrderId(); OrderEntity orderDbEntity = orderMapper.findOrderId(orderId); if (orderDbEntity == null) { return RocketMQLocalTransactionState.UNKNOWN; } return RocketMQLocalTransactionState.COMMIT; } }
@Service @RocketMQMessageListener(topic = "orderTopic", consumerGroup = "mayiktTopic") public class OrdeConsumer implements RocketMQListener<String> { @Autowired private DispatchMapper dispatchMapper; @Override public void onMessage(String msg) { OrderEntity orderEntity = JSONObject.parseObject(msg, OrderEntity.class); System.out.println(orderEntity.toString()); } }
@Service public class TransationalUtils { @Autowired public DataSourceTransactionManager transactionManager; public TransactionStatus begin() { TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionAttribute()); return transaction; } public void commit(TransactionStatus transaction) { transactionManager.commit(transaction); } public void rollback(TransactionStatus transaction) { transactionManager.rollback(transaction); } }
注:该代码来源于蚂蚁课堂(www.mayikt.com),于本人学习使用。