LCFlxfldy 2020-01-03

1.生产者(发送方)投递事务消息到Broker中,设置该消息为半消息 不可以被消费;
2.开始执行我们的本地事务,将本地事务执行的结果(回滚或者提交)发送给Broker;
3.Broker获取回滚或者提交,如果是回滚的情况则删除该消息、如果是提交的话,该消息就可以被消费者消费;
4.Broker如果没有及时的获取发送方本地事务结果的话,会主动查询本地事务结果。
@RestController
public class ProducerController {
@Autowired
private OrderService orderService;
@RequestMapping("/sendMsg")
public String sendMsg() throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
String orderId = orderService.sendOrder();
return orderId;
}
}@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "mayiktProducer")
public class SyncProducerListener implements RocketMQLocalTransactionListener {
@Autowired
private TransationalUtils transationalUtils;
@Autowired
private OrderMapper orderMapper;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
TransactionStatus beginStatus = null;
try {
beginStatus = transationalUtils.begin();
MessageHeaders headers = message.getHeaders();
String objMsg = (String) headers.get("msg");
if (StringUtils.isEmpty(objMsg)) {
return RocketMQLocalTransactionState.ROLLBACK;
}
OrderEntity orderEntity = JSONObject.parseObject(objMsg, OrderEntity.class);
int result = orderMapper.addOrder(orderEntity);
if (result > 0) {
transationalUtils.commit(beginStatus);
}
log.info("【本地业务执行完毕】 msg:{}, Object:{}", message, o);
return null;
} catch (Exception e) {
e.printStackTrace();
log.error("【执行本地业务异常】 exception message:{}", e.getMessage());
if (beginStatus != null) {
transationalUtils.rollback(beginStatus);
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
log.info("【执行检查任务】");
MessageHeaders headers = message.getHeaders();
String objMsg = (String) headers.get("msg");
if (StringUtils.isEmpty(objMsg)) {
return RocketMQLocalTransactionState.UNKNOWN;
}
OrderEntity orderEntity = JSONObject.parseObject(objMsg, OrderEntity.class);
String orderId = orderEntity.getOrderId();
OrderEntity orderDbEntity = orderMapper.findOrderId(orderId);
if (orderDbEntity == null) {
return RocketMQLocalTransactionState.UNKNOWN;
}
return RocketMQLocalTransactionState.COMMIT;
}
}@Service
@RocketMQMessageListener(topic = "orderTopic", consumerGroup = "mayiktTopic")
public class OrdeConsumer implements RocketMQListener<String> {
@Autowired
private DispatchMapper dispatchMapper;
@Override
public void onMessage(String msg) {
OrderEntity orderEntity = JSONObject.parseObject(msg, OrderEntity.class);
System.out.println(orderEntity.toString());
}
}@Service
public class TransationalUtils {
@Autowired
public DataSourceTransactionManager transactionManager;
public TransactionStatus begin() {
TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionAttribute());
return transaction;
}
public void commit(TransactionStatus transaction) {
transactionManager.commit(transaction);
}
public void rollback(TransactionStatus transaction) {
transactionManager.rollback(transaction);
}
}注:该代码来源于蚂蚁课堂(www.mayikt.com),于本人学习使用。