Happyunlimited 2019-11-16
java中对于生产者消费者模型,或者小米手机营销 1分钟卖多少台手机等都存在限流的思想在里面。
关于限流 目前存在两大类,从线程个数(jdk1.5 Semaphore)和RateLimiter速率(guava)
Semaphore:从线程个数限流
RateLimiter:从速率限流 目前常见的算法是漏桶算法和令牌算法
令牌桶算法。相比漏桶算法而言区别在于,令牌桶是会去匀速的生成令牌,拿到令牌才能够进行处理,类似于匀速往桶里放令牌
漏桶算法是:生产者消费者模型,生产者往木桶里生产数据,消费者按照定义的速度去消费数据
应用场景:
漏桶算法:必须读写分流的情况下,限制读取的速度
令牌桶算法:必须读写分离的情况下,限制写的速率或者小米手机饥饿营销的场景 只卖1分种抢购1000实现的方法都是一样。
RateLimiter来实现对于多线程问题查找时,很多时候可能使用的类都是原子性的,但是由于代码逻辑的问题,也可能发生线程安全问题
1、关于RateLimter和Semphore简单用法
package concurrent; import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.*; import java.util.stream.IntStream; import static java.lang.Thread.currentThread; /** * ${DESCRIPTION} * 关于限流 目前存在两大类,从线程个数(jdk1.5 Semaphore)和RateLimiter速率(guava) * Semaphore:从线程个数限流 * RateLimiter:从速率限流 目前常见的算法是漏桶算法和令牌算法,下面会具体介绍 * * @author mengxp * @version 1.0 * @create 2018-01-15 22:44 **/ public class RateLimiterExample { //Guava 0.5的意思是 1秒中0.5次的操作,2秒1次的操作 从速度来限流,从每秒中能够执行的次数来 private final static RateLimiter limiter=RateLimiter.create(0.5d); //同时只能有三个线程工作 Java1.5 从同时处理的线程个数来限流 private final static Semaphore sem=new Semaphore(3); private static void testSemaphore(){ try { sem.acquire(); System.out.println(currentThread().getName()+" is doing work..."); TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(10)); } catch (InterruptedException e) { e.printStackTrace(); }finally { sem.release(); System.out.println(currentThread().getName()+" release the semephore..other thread can get and do job"); } } public static void runTestSemaphore(){ ExecutorService service = Executors.newFixedThreadPool(10); IntStream.range(0,10).forEach((i)->{ //RateLimiterExample::testLimiter 这种写法是创建一个线程 service.submit(RateLimiterExample::testSemaphore); }); } /** * Guava的RateLimiter */ private static void testLimiter(){ System.out.println(currentThread().getName()+" waiting " +limiter.acquire()); } //Guava的RateLimiter public static void runTestLimiter(){ ExecutorService service = Executors.newFixedThreadPool(10); IntStream.range(0,10).forEach((i)->{ //RateLimiterExample::testLimiter 这种写法是创建一个线程 service.submit(RateLimiterExample::testLimiter); }); } public static void main(String[] args) { IntStream.range(0,10).forEach((a)-> System.out.println(a));//从0-9 //runTestLimiter(); runTestSemaphore(); } }
2、实现漏桶算法
package concurrent.BucketAl; import com.google.common.util.concurrent.Monitor; import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import static java.lang.Thread.currentThread; /** * ${DESCRIPTION} * * @author mengxp * @version 1.0 * @create 2018-01-20 22:42 * 实现漏桶算法 实现多线程生产者消费者模型 限流 **/ public class Bucket { //定义桶的大小 private final ConcurrentLinkedQueue<Integer> container=new ConcurrentLinkedQueue<>(); private final static int BUCKET_LIMIT=1000; //消费者 不论多少个线程,每秒最大的处理能力是1秒中执行10次 private final RateLimiter consumerRate=RateLimiter.create(10d); //往桶里面放数据时,确认没有超过桶的最大的容量 private Monitor offerMonitor=new Monitor(); //从桶里消费数据时,桶里必须存在数据 private Monitor consumerMonitor=new Monitor(); /** * 往桶里面写数据 * @param data */ public void submit(Integer data){ if (offerMonitor.enterIf(offerMonitor.newGuard(()->container.size()<BUCKET_LIMIT))){ try { container.offer(data); System.out.println(currentThread()+" submit.."+data+" container size is :["+container.size()+"]"); } finally { offerMonitor.leave(); } }else { //这里时候采用降级策略了。消费速度跟不上产生速度时,而且桶满了,抛出异常 //或者存入MQ DB等后续处理 throw new IllegalStateException(currentThread().getName()+"The bucket is ful..Pls latter can try..."); } } /** * 从桶里面消费数据 * @param consumer */ public void takeThenConsumer(Consumer<Integer> consumer){ if (consumerMonitor.enterIf(consumerMonitor.newGuard(()->!container.isEmpty()))){ try { //不打印时 写 consumerRate.acquire(); System.out.println(currentThread()+" waiting"+consumerRate.acquire()); Integer data = container.poll(); //container.peek() 只是去取出来不会删掉 consumer.accept(data); }finally { consumerMonitor.leave(); } }else { //当木桶的消费完后,可以消费那些降级存入MQ或者DB里面的数据 System.out.println("will consumer Data from MQ..."); try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } }
2.1 漏桶算法测试类
package concurrent.BucketAl; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; import static java.lang.Thread.currentThread; /** * ${DESCRIPTION} * * @author mengxp * @version 1.0 * @create 2018-01-20 23:11 * 漏桶算法测试 * 实现漏桶算法 实现多线程生产者消费者模型 限流 **/ public class BuckerTest { public static void main(String[] args) { final Bucket bucket = new Bucket(); final AtomicInteger DATA_CREATOR = new AtomicInteger(0); //生产线程 10个线程 每秒提交 50个数据 1/0.2s*10=50个 IntStream.range(0, 10).forEach(i -> { new Thread(() -> { for (; ; ) { int data = DATA_CREATOR.incrementAndGet(); try { bucket.submit(data); TimeUnit.MILLISECONDS.sleep(200); } catch (Exception e) { //对submit时,如果桶满了可能会抛出异常 if (e instanceof IllegalStateException) { System.out.println(e.getMessage()); //当满了后,生产线程就休眠1分钟 try { TimeUnit.SECONDS.sleep(60); } catch (InterruptedException e1) { e1.printStackTrace(); } } } } }).start(); }); //消费线程 采用RateLimiter每秒处理10个 综合的比率是5:1 IntStream.range(0, 10).forEach(i -> { new Thread( () -> { for (; ; ) { bucket.takeThenConsumer(x -> { System.out.println(currentThread()+"C.." + x); }); } } ).start(); }); } }
3、令牌桶算法
package concurrent.TokenBucket; import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static java.lang.Thread.currentThread; import static java.lang.Thread.interrupted; /** * ${DESCRIPTION} * * @author mengxp * @version 1.0 * @create 2018-01-21 0:18 * 令牌桶算法。相比漏桶算法而言区别在于,令牌桶是会去匀速的生成令牌,拿到令牌才能够进行处理,类似于匀速往桶里放令牌 * 漏桶算法是:生产者消费者模型,生产者往木桶里生产数据,消费者按照定义的速度去消费数据 * * 应用场景: * 漏桶算法:必须读写分流的情况下,限制读取的速度 * 令牌桶算法:必须读写分离的情况下,限制写的速率或者小米手机饥饿营销的场景 只卖1分种抢购1000 * * 实现的方法都是一样。RateLimiter来实现 * 对于多线程问题查找时,很多时候可能使用的类都是原子性的,但是由于代码逻辑的问题,也可能发生线程安全问题 **/ public class TokenBuck { //可以使用 AtomicInteger+容量 可以不用Queue实现 private AtomicInteger phoneNumbers=new AtomicInteger(0); private RateLimiter rateLimiter=RateLimiter.create(20d);//一秒只能执行五次 //默认销售500台 private final static int DEFALUT_LIMIT=500; private final int saleLimit; public TokenBuck(int saleLimit) { this.saleLimit = saleLimit; } public TokenBuck() { this(DEFALUT_LIMIT); } public int buy(){ //这个check 必须放在success里面做判断,不然会产生线程安全问题(业务引起) //原因当phoneNumbers=99 时 同时存在三个线程进来。虽然phoneNumbers原子性,但是也会发生。如果必须写在这里,在success //里面也需要加上double check /* if (phoneNumbers.get()>=saleLimit){ throw new IllegalStateException("Phone has been sale "+saleLimit+" can not buy more...") }*/ //目前设置超时时间,10秒内没有抢到就抛出异常 //这里的TimeOut*Ratelimiter=总数 这里的超时就是让别人抢几秒,所以设置总数也可以由这里的超时和RateLimiter来计算 boolean success = rateLimiter.tryAcquire(10, TimeUnit.SECONDS); if (success){ if (phoneNumbers.get()>=saleLimit){ throw new IllegalStateException("Phone has been sale "+saleLimit+" can not buy more..."); } int phoneNo = phoneNumbers.getAndIncrement(); System.out.println(currentThread()+" user has get :["+phoneNo+"]"); return phoneNo; }else { //超时后 同一时间,很大的流量来强时,超时快速失败。 throw new RuntimeException(currentThread()+"has timeOut can try again..."); } } }
3.1、令牌桶算法的测试类
package concurrent.TokenBucket; import java.util.stream.IntStream; /** * ${DESCRIPTION} * * @author mengxp * @version 1.0 * @create 2018-01-21 0:40 **/ public class TokenBuckTest { public static void main(String[] args) { final TokenBuck tokenBuck=new TokenBuck(200); IntStream.range(0,300).forEach(i->{ //目前测试时,让一个线程抢一次,不用循环抢 //tokenBuck::buy 这种方式 产生一个Runnable new Thread(tokenBuck::buy).start(); }); } }