项目总结64:分别使用Redisson和Zookeeper分布式锁模拟模拟抢红包业务

camhan 2020-05-26

项目总结64:分别使用Redisson和Zookeeper分布式锁模拟模拟抢红包业务

业务场景

模拟1000人在10秒内抢10000(或1000)元红包,金额在1-100不等;

使用的框架或软件:

框架或组件:Springboot(基础框架)、Redisson(实现分布式锁)、Zookeeper(实现分布式锁方案)、Ngnix(负载均衡),Redis(红包数据存取数据库)

系统或软件:Linux服务器、Jmeter(模拟并发请求)

具体代码示例和测试结果(公用方法放在文中附录)

情况1- 单机服务——没有任何线程安全考虑——出现数据错误

@GetMapping("/get/money")
public String getRedPackage(){

    Map map = new HashMap();
    Object o = redisTemplate.opsForValue().get(KEY_RED_PACKAGE_MONEY);
    int remainMoney = Integer.parseInt(String.valueOf(o));
    if(remainMoney <= 0 ){
        map.put("result","红包已抢完");
        return ReturnModel.success(map).appendToString();
    }
    int randomMoney = (int) (Math.random() * 100);
    if(randomMoney > remainMoney){
        randomMoney = remainMoney;
    }
    int newRemainMoney = remainMoney-randomMoney;
    redisTemplate.opsForValue().set(KEY_RED_PACKAGE_MONEY,newRemainMoney);
    String result = "原有金额:" + remainMoney + " 红包金额:" + randomMoney + "剩余金额:" + newRemainMoney;
    System.out.println(result);
    map.put("result",result);

    return ReturnModel.success(map).appendToString();
}

原有金额:1000 红包金额:49剩余金额:951
原有金额:1000 红包金额:62剩余金额:938
原有金额:1000 红包金额:61剩余金额:939
原有金额:1000 红包金额:93剩余金额:907
原有金额:1000 红包金额:73剩余金额:927
原有金额:939 红包金额:65剩余金额:874
原有金额:939 红包金额:16剩余金额:923
原有金额:939 红包金额:30剩余金额:909

情况2- 单台服务——使用Lock锁——数据正常;Lock在单服务器是线程安全的

public static Lock lock = new ReentrantLock();

@GetMapping("/get/money/lock")
public String getRedPackageLock(){
    Map map = new HashMap();
    lock.lock();
    try{
        Object o = redisTemplate.opsForValue().get(KEY_RED_PACKAGE_MONEY);
        int remainMoney = Integer.parseInt(String.valueOf(o));
        if(remainMoney <= 0 ){
            map.put("result","红包已抢完");
            return ReturnModel.success(map).appendToString();
        }
        int randomMoney = (int) (Math.random() * 100);
        if(randomMoney > remainMoney){
            randomMoney = remainMoney;
        }
        int newRemainMoney = remainMoney-randomMoney;
        redisTemplate.opsForValue().set(KEY_RED_PACKAGE_MONEY,newRemainMoney);
        String result = "原有金额:" + remainMoney + " 红包金额:" + randomMoney + "剩余金额:" + newRemainMoney;
        System.out.println(result);
        map.put("result",result);
        return ReturnModel.success(map).appendToString();
    }finally {
        lock.unlock();
    }
}

原有金额:1000 红包金额:11剩余金额:989
原有金额:989 红包金额:48剩余金额:941
原有金额:941 红包金额:17剩余金额:924
原有金额:924 红包金额:89剩余金额:835
原有金额:835 红包金额:63剩余金额:772
原有金额:772 红包金额:77剩余金额:695
原有金额:695 红包金额:76剩余金额:619
原有金额:619 红包金额:8剩余金额:611
原有金额:611 红包金额:67剩余金额:544
原有金额:544 红包金额:9剩余金额:535
原有金额:535 红包金额:78剩余金额:457
......

情况3- 两台服务器——使用Lock锁——数据异常(代码情况2一样);Lock在镀钛服务器下是非线程安全的

负载均衡配置

使用Nginx配置负载均衡,Ngnix安装参考博客;配置参考博客;部署两个服务分别是8001和8002端口,Nginx暴露8080端口,转发请求到8001和8002;

项目总结64:分别使用Redisson和Zookeeper分布式锁模拟模拟抢红包业务

 nginx配置

http {
    include       mime.types;
    default_type  application/octet-stream;
    sendfile        on;
    keepalive_timeout  65;
    ##定义负载均衡真实服务器IP:端口号 weight表示权重
    upstream myserver{
        server   XX.XX.XX.XX:8001 weight=1;
        server   XX.XX.XX.XX:8002 weight=1;
     }
    server {
        listen   8080;
        location  / {
            proxy_pass   http://myserver;
            proxy_connect_timeout 10;
        }
    }  


}

情况3-1- 两台服务器——使用Redisson分布式锁——数据正常

<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.31.Final</version>
        </dependency>
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.6.5</version>
        </dependency>
@Configuration
public class RedissonConfig {

    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.port}")
    private String port;

    @Value("${spring.redis.password}")
    private String password;


    @Bean
    public RedissonClient getRedisson(){

        Config config = new Config();
        config.useSingleServer().setAddress("redis://" + host + ":" + port).setPassword(password);
        return Redisson.create(config);
    }

}
@Autowired
private RedissonClient redissonClient;

//3-抢红包-redisson
@GetMapping("/get/money/redisson")
public String getRedPackageRedison(){
    RLock rLock = redissonClient.getLock("secKill");
    rLock.lock();
    Map map = new HashMap();
    try{
        Object o = redisTemplate.opsForValue().get(KEY_RED_PACKAGE_MONEY);
        int remainMoney = Integer.parseInt(String.valueOf(o));
        if(remainMoney <= 0 ){
            map.put("result","红包已抢完");
            return ReturnModel.success(map).appendToString();
        }
        int randomMoney = (int) (Math.random() * 100);
        if(randomMoney > remainMoney){
            randomMoney = remainMoney;
        }
        int newRemainMoney = remainMoney-randomMoney;
        redisTemplate.opsForValue().set(KEY_RED_PACKAGE_MONEY,newRemainMoney);
        String result = "原有金额:" + remainMoney + " 红包金额:" + randomMoney + "剩余金额:" + newRemainMoney;
        System.out.println(result);
        map.put("result",result);
        return ReturnModel.success(map).appendToString();
    }finally {
        rLock.unlock();
    }
}

情况3-2- 两台服务器——使用Zookeeper分布式锁——数据正常

<!-- ZooKeeper 之 Curator-->
        <!-- ZooKeeper版本号为4的话,机器安装zookeeper的版本要求是3.5及其以上的版本-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-client -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>4.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.5.4-beta</version>
        </dependency>
        <dependency>
            <groupId>com.google.collections</groupId>
            <artifactId>google-collections</artifactId>
            <version>1.0</version>
        </dependency>
@Configuration
public class ZkConfiguration {
    /**
     * 重试次数
     */
    @Value("${curator.retryCount}")
    private int retryCount;
    /**
     * 重试间隔时间
     */
    @Value("${curator.elapsedTimeMs}")
    private int elapsedTimeMs;
    /**
     * 连接地址
     */
    @Value("${curator.connectString}")
    private String connectString;
    /**
     * Session过期时间
     */
    @Value("${curator.sessionTimeoutMs}")
    private int sessionTimeoutMs;
    /**
     * 连接超时时间
     */
    @Value("${curator.connectionTimeoutMs}")
    private int connectionTimeoutMs;

    @Bean(initMethod = "start")
    public CuratorFramework curatorFramework() {
        return CuratorFrameworkFactory.newClient(
                connectString,
                sessionTimeoutMs,
                connectionTimeoutMs,
                new RetryNTimes(retryCount,elapsedTimeMs));
    }

    /**
     * Distributed lock by zookeeper distributed lock by zookeeper.
     *
     * @return the distributed lock by zookeeper
     */
    @Bean(initMethod = "init")
    public DistributedLockByZookeeper distributedLockByZookeeper() {
        return new DistributedLockByZookeeper();
    }
}
@Slf4j
public class DistributedLockByZookeeper {
    private final static String ROOT_PATH_LOCK = "myk";

    private CountDownLatch countDownLatch = new CountDownLatch(1);

    /**
     * The Curator framework.
     */
    @Autowired
    CuratorFramework curatorFramework;

    /**
     * 获取分布式锁
     * 创建一个临时节点,
     *
     * @param path the path
     */
    public void acquireDistributedLock(String path) {
        String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
        while (true) {
            try {
                curatorFramework.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.EPHEMERAL)
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                        .forPath(keyPath);
                //log.info("success to acquire lock for path:{}", keyPath);
                break;
            } catch (Exception e) {
                //抢不到锁,进入此处!
                //log.info("failed to acquire lock for path:{}", keyPath);
                //log.info("while try again .......");
                try {
                    if (countDownLatch.getCount() <= 0) {
                        countDownLatch = new CountDownLatch(1);
                    }
                    //避免请求获取不到锁,重复的while,浪费CPU资源
                    countDownLatch.await();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
    }

    /**
     * 释放分布式锁
     *
     * @param path the  节点路径
     * @return the boolean
     */
    public boolean releaseDistributedLock(String path) {
        try {
            String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
            if (curatorFramework.checkExists().forPath(keyPath) != null) {
                curatorFramework.delete().forPath(keyPath);
            }
        } catch (Exception e) {
            //log.error("failed to release lock,{}", e);
            return false;
        }
        return true;
    }

    /**
     * 创建 watcher 事件
     */
    private void addWatcher(String path) {
        String keyPath;
        if (path.equals(ROOT_PATH_LOCK)) {
            keyPath = "/" + path;
        } else {
            keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
        }
        try {
            final PathChildrenCache cache = new PathChildrenCache(curatorFramework, keyPath, false);
            cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
            cache.getListenable().addListener((client, event) -> {
                if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
                    String oldPath = event.getData().getPath();
                    //log.info("上一个节点 " + oldPath + " 已经被断开");
                    if (oldPath.contains(path)) {
                        //释放计数器,让当前的请求获取锁
                        countDownLatch.countDown();
                    }
                }
            });
        } catch (Exception e) {
            log.info("监听是否锁失败!{}", e);
        }
    }

    /**
     * 创建父节点,并创建永久节点
     */
    public void init() {
        curatorFramework = curatorFramework.usingNamespace("lock-namespace");
        String path = "/" + ROOT_PATH_LOCK;
        try {
            if (curatorFramework.checkExists().forPath(path) == null) {
                curatorFramework.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.PERSISTENT)
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                        .forPath(path);
            }
            addWatcher(ROOT_PATH_LOCK);
            log.info("root path 的 watcher 事件创建成功");
        } catch (Exception e) {
            log.error("connect zookeeper fail,please check the log >> {}", e.getMessage(), e);
        }
    }

}
@Autowired
    DistributedLockByZookeeper distributedLockByZookeeper;
    private final static String PATH = "red_package";
    //4-抢红包-zookeeper
    @GetMapping("/get/money/zookeeper")
    public String getRedPackageZookeeper(){

        Boolean flag = false;
        distributedLockByZookeeper.acquireDistributedLock(PATH);
        Map map = new HashMap();
        try {
            Object o = redisTemplate.opsForValue().get(KEY_RED_PACKAGE_MONEY);
            int remainMoney = Integer.parseInt(String.valueOf(o));
            if(remainMoney <= 0 ){
                map.put("result","红包已抢完");
                return ReturnModel.success(map).appendToString();
            }
            int randomMoney = (int) (Math.random() * 100);
            if(randomMoney > remainMoney){
                randomMoney = remainMoney;
            }
            int newRemainMoney = remainMoney-randomMoney;
            redisTemplate.opsForValue().set(KEY_RED_PACKAGE_MONEY,newRemainMoney);
            String result = "原有金额:" + remainMoney + " 红包金额:" + randomMoney + "剩余金额:" + newRemainMoney;
            System.out.println(result);
            map.put("result",result);
            return ReturnModel.success(map).appendToString();
        } catch(Exception e){
            e.printStackTrace();
            flag = distributedLockByZookeeper.releaseDistributedLock(PATH);
            //System.out.println("releaseDistributedLock: " + flag);
            map.put("result","getRedPackageZookeeper catch exceeption");
            return ReturnModel.success(map).appendToString();
        }finally {
            flag = distributedLockByZookeeper.releaseDistributedLock(PATH);
            //System.out.println("releaseDistributedLock: " + flag);
        }
    }

附录

1- 其他配置和类

application.properties文件

server.port=80

#配置redis
spring.redis.host=XX.XX.XX.XX
spring.redis.port=6379
spring.redis.password=xuegaotest1234
spring.redis.database=0
#重试次数
curator.retryCount=5
#重试间隔时间
curator.elapsedTimeMs=5000
# zookeeper 地址
curator.connectString=XX.XX.XX.XX:2181
# session超时时间
curator.sessionTimeoutMs=60000
# 连接超时时间
curator.connectionTimeoutMs=5000

ReturnModel 类

public class ReturnModel implements Serializable{


    private int code;
    private String msg;
    private Object data;




    public static ReturnModel success(Object obj){
        return new ReturnModel(200,"success",obj);
    }

    public String appendToString(){
        return  JSON.toJSONString(this);
    }

    public ReturnModel() {
    }

    public ReturnModel(int code, String msg, Object data) {
        this.code = code;
        this.msg = msg;
        this.data = data;
    }

    public int getCode() {
        return code;
    }

    public void setCode(int code) {
        this.code = code;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public Object getData() {
        return data;
    }

    public void setData(Object data) {
        this.data = data;
    }
}

 SeckillController类

public final static String KEY_RED_PACKAGE_MONEY  = "key_red_package_money";


    @Autowired
    private RedisTemplate redisTemplate;

    //1-设置红包
    @GetMapping("/set/money/{amount}")
    public String setRedPackage(@PathVariable Integer amount){
        redisTemplate.opsForValue().set(KEY_RED_PACKAGE_MONEY,amount);
        Object o = redisTemplate.opsForValue().get(KEY_RED_PACKAGE_MONEY);
        Map map = new HashMap();
        map.put("moneyTotal",Integer.parseInt(String.valueOf(o)));
        return ReturnModel.success(map).appendToString();
    }

流程解析

1- Zookeeper分布锁

1- 在ZkConfiguration类中加载CuratorFramework时,设置参数,实例化一个CuratorFramework类; 实例化过程中,执行CuratorFrameworkImpl类中的的start(),其中CuratorFrameworkImpl类是CuratorFramework的实现类;根据具体的细节可以参考博客

@Bean(initMethod = "start")
public CuratorFramework curatorFramework() {
    return CuratorFrameworkFactory.newClient( connectString,  sessionTimeoutMs, connectionTimeoutMs,  new RetryNTimes(retryCount,elapsedTimeMs));
}

2- 在ZkConfiguration类中加载DistributedLockByZookeeper时;执行其中的init()方法;init()方法中主要是创建父节点和添加监听

/**
 * 创建父节点,并创建永久节点
 */
public void init() {
    curatorFramework = curatorFramework.usingNamespace("lock-namespace");
    String path = "/" + ROOT_PATH_LOCK;
    try {
        if (curatorFramework.checkExists().forPath(path) == null) {
            curatorFramework.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                    .forPath(path);
        }
        addWatcher(ROOT_PATH_LOCK);
        log.info("root path 的 watcher 事件创建成功");
    } catch (Exception e) {
        log.error("connect zookeeper fail,please check the log >> {}", e.getMessage(), e);
    }
}

3- 在具体业务中调用distributedLockByZookeeper.acquireDistributedLock(PATH);获取分布式锁

/**
 * 获取分布式锁
 * 创建一个临时节点,
 *
 * @param path the path
 */
public void acquireDistributedLock(String path) {
    String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
    while (true) {
        try {
            curatorFramework.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL)
                    .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                    .forPath(keyPath);
            break;
        } catch (Exception e) {
            //抢不到锁,进入此处!
            try {
                if (countDownLatch.getCount() <= 0) {
                    countDownLatch = new CountDownLatch(1);
                }
                //避免请求获取不到锁,重复的while,浪费CPU资源
                countDownLatch.await();
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }
    }
}

4- 业务结束时调用distributedLockByZookeeper.releaseDistributedLock(PATH);释放锁

/**
 * 释放分布式锁
 *
 * @param path the  节点路径
 * @return the boolean
 */
public boolean releaseDistributedLock(String path) {
    try {
        String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
        if (curatorFramework.checkExists().forPath(keyPath) != null) {
            curatorFramework.delete().forPath(keyPath);
        }
    } catch (Exception e) {
        return false;
    }
    return true;
}

 原理图如下

项目总结64:分别使用Redisson和Zookeeper分布式锁模拟模拟抢红包业务

 期间碰到的问题

问题: 项目启动时:java.lang.ClassNotFoundException: com.google.common.base.Function

原因:缺少google-collections jar包;如下

<dependency>

    <groupId>com.google.collections</groupId>

    <artifactId>google-collections</artifactId>

    <version>1.0</version>

</dependency>

问题:项目启动时:org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss

原因:简单说,就是连接失败(可能原因的有很多);依次排查了zookeeper服务器防火墙、application.properties配置文件;最后发现IP的写错了,更正后就好了

问题:Jemter启用多线程并发测试时:java.net.BindException: Address already in use: connect

原因和解决方案:参考博客

END

项目总结64:分别使用Redisson和Zookeeper分布式锁模拟模拟抢红包业务

相关推荐