meilongwhpu 2020-01-30
producer 在发送消息的时候,会生成一个 "唯一" 的 msgId,broker 会为这个 msgId 创建哈希索引
UNIQ_KEY 由客户端生成
org.apache.rocketmq.common.message.MessageClientIDSetter#createUniqID
msgId 由 前缀 + 内容 组成:
前缀
ip 地址,进程号,classLoader 的 hashcode
内容
时间差(当前时间减去当月一日),计数器
static {
byte[] ip;
try {
// 获取 ip 地址,字节数组
ip = UtilAll.getIP();
} catch (Exception e) {
ip = createFakeIP();
}
LEN = ip.length + 4 + 4 + 4 + 2;
ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 4 + 4);
tempBuffer.position(0);
// 写入 ip
tempBuffer.put(ip);
tempBuffer.position(ip.length);
// 写入进程号
tempBuffer.putInt(UtilAll.getPid());
tempBuffer.position(ip.length + 4);
// 写入 hashcode
tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());
// 二进制转十六进制字符串
FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
// 设置起始时间为当月1日
setStartTime(System.currentTimeMillis());
COUNTER = new AtomicInteger(0);
}byte 数组转十六进制字符串
// org.apache.rocketmq.common.UtilAll#bytes2string// final static char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();public static String bytes2string(byte[] src) {
char[] hexChars = new char[src.length * 2];
for (int j = 0; j < src.length; j++) {
// & 过之后,byte 转成 int
int v = src[j] & 0xFF;
// 无符号右移 4 位,高位补 0 ,即取字节的高 4 位
hexChars[j * 2] = HEX_ARRAY[v >>> 4];
// 取字节低 4 位
hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F];
}
return new String(hexChars);
}设置开始时间
// org.apache.rocketmq.common.message.MessageClientIDSetter#setStartTime
private synchronized static void setStartTime(long millis) {
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(millis);
cal.set(Calendar.DAY_OF_MONTH, 1);
cal.set(Calendar.HOUR_OF_DAY, 0);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
// 当月1日为开始时间
startTime = cal.getTimeInMillis();
// 下月1日
cal.add(Calendar.MONTH, 1);
nextStartTime = cal.getTimeInMillis();
}// org.apache.rocketmq.common.message.MessageClientIDSetter#createUniqIDBuffer
private static byte[] createUniqIDBuffer() {
ByteBuffer buffer = ByteBuffer.allocate(4 + 2);
long current = System.currentTimeMillis();
if (current >= nextStartTime) {
setStartTime(current);
}
buffer.position(0);
// 当前时间减去当月一日
buffer.putInt((int) (System.currentTimeMillis() - startTime));
// 计数器
buffer.putShort((short) COUNTER.getAndIncrement());
return buffer.array();
}public static String createUniqID() {
// 1 个字节,8 位,每 4 位一个十六进制字符
StringBuilder sb = new StringBuilder(LEN * 2);
// 前缀:ip,pid,hashcode
sb.append(FIX_STRING);
// 时间差 + 计数器
sb.append(UtilAll.bytes2string(createUniqIDBuffer()));
return sb.toString();
}