Qemo 2019-06-28
本文主要研究一下rocketmq的ProducerImpl
io/openmessaging/rocketmq/producer/ProducerImpl.java
public class ProducerImpl extends AbstractOMSProducer implements Producer { public ProducerImpl(final KeyValue properties) { super(properties); } @Override public KeyValue properties() { return properties; } @Override public SendResult send(final Message message) { return send(message, this.rocketmqProducer.getSendMsgTimeout()); } @Override public SendResult send(final Message message, final KeyValue properties) { long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT) ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout(); return send(message, timeout); } private SendResult send(final Message message, long timeout) { checkMessageType(message); org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message); try { org.apache.rocketmq.client.producer.SendResult rmqResult = this.rocketmqProducer.send(rmqMessage, timeout); if (!rmqResult.getSendStatus().equals(SendStatus.SEND_OK)) { log.error(String.format("Send message to RocketMQ failed, %s", message)); throw new OMSRuntimeException("-1", "Send message to RocketMQ broker failed."); } message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId()); return OMSUtil.sendResultConvert(rmqResult); } catch (Exception e) { log.error(String.format("Send message to RocketMQ failed, %s", message), e); throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e); } } @Override public Promise<SendResult> sendAsync(final Message message) { return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout()); } @Override public Promise<SendResult> sendAsync(final Message message, final KeyValue properties) { long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT) ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout(); return sendAsync(message, timeout); } private Promise<SendResult> sendAsync(final Message message, long timeout) { checkMessageType(message); org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message); final Promise<SendResult> promise = new DefaultPromise<>(); try { this.rocketmqProducer.send(rmqMessage, new SendCallback() { @Override public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) { message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId()); promise.set(OMSUtil.sendResultConvert(rmqResult)); } @Override public void onException(final Throwable e) { promise.setFailure(e); } }, timeout); } catch (Exception e) { promise.setFailure(e); } return promise; } @Override public void sendOneway(final Message message) { checkMessageType(message); org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message); try { this.rocketmqProducer.sendOneway(rmqMessage); } catch (Exception ignore) { //Ignore the oneway exception. } } @Override public void sendOneway(final Message message, final KeyValue properties) { sendOneway(message); } }
io/openmessaging/rocketmq/utils/OMSUtil.java
public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) { org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message(); rmqMessage.setBody(omsMessage.getBody()); KeyValue headers = omsMessage.headers(); KeyValue properties = omsMessage.properties(); //All destinations in RocketMQ use Topic if (headers.containsKey(MessageHeader.TOPIC)) { rmqMessage.setTopic(headers.getString(MessageHeader.TOPIC)); rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); } else { rmqMessage.setTopic(headers.getString(MessageHeader.QUEUE)); rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "QUEUE"); } for (String key : properties.keySet()) { MessageAccessor.putProperty(rmqMessage, key, properties.getString(key)); } //Headers has a high priority for (String key : headers.keySet()) { MessageAccessor.putProperty(rmqMessage, key, headers.getString(key)); } return rmqMessage; }
org/apache/rocketmq/client/producer/SendCallback.java
public interface SendCallback { void onSuccess(final SendResult sendResult); void onException(final Throwable e); }
io/openmessaging/rocketmq/promise/DefaultPromise.java
public class DefaultPromise<V> implements Promise<V> { private static final Logger LOG = LoggerFactory.getLogger(DefaultPromise.class); private final Object lock = new Object(); private volatile FutureState state = FutureState.DOING; private V result = null; private long timeout; private long createTime; private Throwable exception = null; private List<PromiseListener<V>> promiseListenerList; public DefaultPromise() { createTime = System.currentTimeMillis(); promiseListenerList = new ArrayList<>(); timeout = 5000; } //...... @Override public boolean set(final V value) { if (value == null) return false; this.result = value; return done(); } @Override public boolean setFailure(final Throwable cause) { if (cause == null) return false; this.exception = cause; return done(); } private boolean done() { synchronized (lock) { if (!isDoing()) { return false; } state = FutureState.DONE; lock.notifyAll(); } notifyListeners(); return true; } private void notifyListeners() { if (promiseListenerList != null) { for (PromiseListener<V> listener : promiseListenerList) { notifyListener(listener); } } } private void notifyListener(final PromiseListener<V> listener) { try { if (exception != null) listener.operationFailed(this); else listener.operationCompleted(this); } catch (Throwable t) { LOG.error("notifyListener {} Error:{}", listener.getClass().getSimpleName(), t); } } //...... }