camhan 2020-01-01
SnowFlake 雪花ID 算法是推特公司推出的著名分布式ID生成算法。利用预先分配好的机器ID,工作区ID,机器时间可以生成全局唯一的随时间趋势递增的Long类型ID.长度在17-19位。随着时间的增长而递增,在MySQL数据库中,InnoDB存储引擎可以更快的插入递增的主键。而不像UUID那样因为写入是乱序的,InnoDB不得不频繁的做页分裂操作,耗时且容易产生碎片。
对于SnowFlake 的原理介绍,可以参考该文章:理解分布式id生成算法SnowFlake
理解了雪花的基本原理之后,我们试想:在分布式集群或者开发环境下,不同服务之间/相同服务的不同机器之间应该如何产生差异呢?有以下几种方案:
本方案结合了以上方案的优点,按照业务的实际情况对雪花中的数据中心和机器ID所占的位数进行调整:数据中心占4Bit,范围从0-15。机器ID占6Bit,范围从0-63
。对不同的服务在yml中配置服务名称,以服务编号作为数据中心ID。如果按照开发+测试+生产环境区分的话,可以部署5个不同的服务。application.yml 中配置如下的参数
# 分布式雪花ID不同机器ID自动化配置 snowFlake: dataCenter: 1 # 数据中心的id appName: test # 业务类型名称
而机器ID采用以下的策略实现:
package cn.keats.util; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import javax.annotation.PreDestroy; import javax.annotation.Resource; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Date; import java.util.Timer; import java.util.TimerTask; @Configuration @Slf4j public class MachineIdConfig { @Resource private JedisPool jedisPool; @Value("${snowFlake.dataCenter}") private Integer dataCenterId; @Value("${snowFlake.appName}") private String APP_NAME; /** * 机器id */ public static Integer machineId; /** * 本地ip地址 */ private static String localIp; /** * 获取ip地址 * * @return * @throws UnknownHostException */ private String getIPAddress() throws UnknownHostException { InetAddress address = InetAddress.getLocalHost(); return address.getHostAddress(); } /** * hash机器IP初始化一个机器ID */ @Bean public SnowFlake initMachineId() throws Exception { localIp = getIPAddress(); // 192.168.0.233 Long ip_ = Long.parseLong(localIp.replaceAll("\\.", ""));// 1921680233 // machineId = ip_.hashCode() % 32;// 0-31 // 创建一个机器ID createMachineId(); log.info("初始化 machine_id :{}", machineId); return new SnowFlake(machineId, dataCenterId); } /** * 容器销毁前清除注册记录 */ @PreDestroy public void destroyMachineId() { try (Jedis jedis = jedisPool.getResource()) { jedis.del(APP_NAME + dataCenterId + machineId); } } /** * 主方法:首先获取机器 IP 并 % 32 得到 0-31 * 使用 业务名 + 组名 + IP 作为 Redis 的 key,机器IP作为 value,存储到Redis中 * * @return */ public Integer createMachineId() { try { // 向redis注册,并设置超时时间 log.info("注册一个机器ID到Redis " + machineId + " IP:" + localIp); Boolean flag = registerMachine(machineId, localIp); // 注册成功 if (flag) { // 启动一个线程更新超时时间 updateExpTimeThread(); // 返回机器Id log.info("Redis中端口没有冲突 " + machineId + " IP:" + localIp); return machineId; } // 注册失败,可能原因 Hash%32 的结果冲突 if (!checkIfCanRegister()) { // 如果 0-31 已经用完,使用 32-64之间随机的ID getRandomMachineId(); createMachineId(); } else { // 如果存在剩余的ID log.warn("Redis中端口冲突了,使用 0-31 之间未占用的Id " + machineId + " IP:" + localIp); createMachineId(); } } catch (Exception e) { // 获取 32 - 63 之间的随机Id // 返回机器Id log.error("Redis连接异常,不能正确注册雪花机器号 " + machineId + " IP:" + localIp, e); log.warn("使用临时方案,获取 32 - 63 之间的随机数作为机器号,请及时检查Redis连接"); getRandomMachineId(); return machineId; } return machineId; } /** * 检查是否被注册满了 * * @return */ private Boolean checkIfCanRegister() { // 判断0~31这个区间段的机器IP是否被占满 try (Jedis jedis = jedisPool.getResource()) { Boolean flag = true; for (int i = 0; i < 32; i++) { flag = jedis.exists(APP_NAME + dataCenterId + i); // 如果不存在。设置机器Id为这个不存在的数字 if (!flag) { machineId = i; break; } } return !flag; } } /** * 1.更新超時時間 * 注意,更新前检查是否存在机器ip占用情况 */ private void updateExpTimeThread() { // 开启一个线程执行定时任务: // 每23小时更新一次超时时间 new Timer(localIp).schedule(new TimerTask() { @Override public void run() { // 检查缓存中的ip与本机ip是否一致, 一致则更新时间,不一致则重新获取一个机器id Boolean b = checkIsLocalIp(String.valueOf(machineId)); if (b) { log.info("IP一致,更新超时时间 ip:{},machineId:{}, time:{}", localIp, machineId, new Date()); try (Jedis jedis = jedisPool.getResource()) { jedis.expire(APP_NAME + dataCenterId + machineId, 60 * 60 * 24 ); } } else { // IP冲突 log.info("重新生成机器ID ip:{},machineId:{}, time:{}", localIp, machineId, new Date()); // 重新生成机器ID,并且更改雪花中的机器ID getRandomMachineId(); // 重新生成并注册机器id createMachineId(); // 更改雪花中的机器ID SnowFlake.setWorkerId(machineId); // 结束当前任务 log.info("Timer->thread->name:{}", Thread.currentThread().getName()); this.cancel(); } } }, 10 * 1000, 1000 * 60 * 60 * 23); } /** * 获取32-63随机数 */ public void getRandomMachineId() { machineId = (int) (Math.random() * 31) + 31; } /** * 检查Redis中对应Key的Value是否是本机IP * * @param mechineId * @return */ private Boolean checkIsLocalIp(String mechineId) { try (Jedis jedis = jedisPool.getResource()) { String ip = jedis.get(APP_NAME + dataCenterId + mechineId); log.info("checkIsLocalIp->ip:{}", ip); return localIp.equals(ip); } } /** * 1.注册机器 * 2.设置超时时间 * * @param machineId 取值为0~31 * @return */ private Boolean registerMachine(Integer machineId, String localIp) throws Exception { // try with resources 写法,出异常会释放括号内的资源 Java7特性 try (Jedis jedis = jedisPool.getResource()) { // key 业务号 + 数据中心ID + 机器ID value 机器IP Long result = jedis.setnx(APP_NAME + dataCenterId + machineId, localIp); if(result == 1){ // 过期时间 1 天 jedis.expire(APP_NAME + dataCenterId + machineId, 60 * 60 * 24); return true; } else { // 如果Key存在,判断Value和当前IP是否一致,一致则返回True String value = jedis.get(APP_NAME + dataCenterId + machineId); if(localIp.equals(value)){ // IP一致,注册机器ID成功 jedis.expire(APP_NAME + dataCenterId + machineId, 60 * 60 * 24); return true; } return false; } } } }
import org.springframework.context.annotation.Configuration; /** * 功能:分布式ID生成工具类 * */ @Configuration public class SnowFlake { /** * 开始时间截 (2019-09-08) 服务一旦运行过之后不能修改。会导致ID生成重复 */ private final long twepoch = 1567872000000L; /** * 机器Id所占的位数 0 - 64 */ private final long workerIdBits = 6L; /** * 工作组Id所占的位数 0 - 16 */ private final long dataCenterIdBits = 4L; /** * 支持的最大机器id,结果是63 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数) */ private final long maxWorkerId = -1L ^ (-1L << workerIdBits); /** * 支持的最大数据标识id,结果是15 */ private final long maxDatacenterId = -1L ^ (-1L << dataCenterIdBits); /** * 序列在id中占的位数 */ private final long sequenceBits = 12L; /** * 机器ID向左移12位 */ private final long workerIdShift = sequenceBits; /** * 数据标识id向左移17位(12+5) */ private final long datacenterIdShift = sequenceBits + workerIdBits; /** * 时间截向左移22位(5+5+12) */ private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits; /** * 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095) */ private final long sequenceMask = -1L ^ (-1L << sequenceBits); /** * 工作机器ID(0~63) */ private static long workerId; /** * 数据中心ID(0~16) */ private long datacenterId; /** * 毫秒内序列(0~4095) */ private long sequence = 0L; /** * 上次生成ID的时间截 */ private long lastTimestamp = -1L; //==============================Constructors===================================== /** * 构造函数 * * @param workerId 工作ID (0~63) * @param datacenterId 数据中心ID (0~15) */ public SnowFlake(long workerId, long datacenterId) { if (workerId > maxWorkerId || workerId < 0) { throw new IllegalArgumentException(String.format("机器ID必须小于 %d 且大于 0", maxWorkerId)); } if (datacenterId > maxDatacenterId || datacenterId < 0) { throw new IllegalArgumentException(String.format("工作组ID必须小于 %d 且大于 0", maxDatacenterId)); } this.workerId = workerId; this.datacenterId = datacenterId; } /** * 构造函数 * */ public SnowFlake() { this.workerId = 0; this.datacenterId = 0; } /** * 获得下一个ID (该方法是线程安全的) * * @return SnowFlakeId */ public synchronized long nextId() { long timestamp = timeGen(); //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常 if (timestamp < lastTimestamp) { throw new RuntimeException( String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp)); } // 如果是同一时间生成的,则进行毫秒内序列 if (lastTimestamp == timestamp) { sequence = (sequence + 1) & sequenceMask; // 毫秒内序列溢出 if (sequence == 0) { // 阻塞到下一个毫秒,获得新的时间戳 timestamp = tilNextMillis(lastTimestamp); } } //时间戳改变,毫秒内序列重置 else { sequence = 0L; } // 上次生成ID的时间截 lastTimestamp = timestamp; // 移位并通过或运算拼到一起组成64位的ID return ((timestamp - twepoch) << timestampLeftShift) // | (datacenterId << datacenterIdShift) // | (workerId << workerIdShift) // | sequence; } /** * 阻塞到下一个毫秒,直到获得新的时间戳 * * @param lastTimestamp 上次生成ID的时间截 * @return 当前时间戳 */ protected long tilNextMillis(long lastTimestamp) { long timestamp = timeGen(); while (timestamp <= lastTimestamp) { timestamp = timeGen(); } return timestamp; } /** * 返回以毫秒为单位的当前时间 * * @return 当前时间(毫秒) */ protected long timeGen() { return System.currentTimeMillis(); } public long getWorkerId() { return workerId; } public static void setWorkerId(long workerId) { SnowFlake.workerId = workerId; } public long getDatacenterId() { return datacenterId; } public void setDatacenterId(long datacenterId) { this.datacenterId = datacenterId; } }
public class RedisConfig { @Value("${spring.redis.host}") private String host; @Value("${spring.redis.port:6379}") private Integer port; @Value("${spring.redis.password:-1}") private String password; @Bean public JedisPool jedisPool() { // 1.设置连接池的配置对象 JedisPoolConfig config = new JedisPoolConfig(); // 设置池中最大连接数 config.setMaxTotal(50); // 设置空闲时池中保有的最大连接数 config.setMaxIdle(10); config.setMaxWaitMillis(3000L); config.setTestOnBorrow(true); log.info(password); // 2.设置连接池对象 if("-1".equals(password)){ log.info("Redis不通过密码连接"); return new JedisPool(config, host, port,0); } else { log.info("Redis通过密码连接" + password); return new JedisPool(config, host, port,0, password); } } }
@Autowired private SnowFlake snowFlake; // 生产ID snowFlake.nextId(); 方法生产ID