网络菜市场 2020-04-25
假设我们要实现一个视频网站在线人数统计功能,在每个客户端登录网站时,统计在线人数,通常用一个变量count代表人数,用户上线后,count++
class Online{ int count; public Online(){ this.count = 0; } public void login(){ count++; } }
假设目前在线人数count是10,甲登录网站,网站后台读取到count值为10(count++分为三步:读取-修改-写入),还没来得及修改,这时乙也在登录,后台读取到count值也为10,最终甲、乙登录完成后,count变为了11,正确结果本来应该是12的
java.util.concurrent.atomic包中有很多原子变量,用于对数据进行原子操作
如对于上面的问题,可以用AtomicInteger原子变量的incrementAndGet()来实现正确操作
public synchronized void login(){ count++; }
synchronized(Online.class)
synchronized(this)
public void login(){ synchronized(this){ count++; } }
位于java.util.concurrent.locks
包中
public interface Lock { void lock(); boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); }
锁 | 说明 |
---|---|
ReentrantLock | 可重入互斥锁 |
ReentrantReadWriteLock | 可重入读写锁 |
class X { private final ReentrantLock lock = new ReentrantLock(); // ... public void m() { lock.lock(); // block until condition holds try { // ... method body } finally { lock.unlock() } } }}
class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { rwl.readLock().lock(); if (!cacheValid) { // Must release read lock before acquiring write lock rwl.readLock().unlock(); rwl.writeLock().lock(); try { // Recheck state because another thread might have // acquired write lock and changed state before we did. if (!cacheValid) { data = ... cacheValid = true; } // Downgrade by acquiring read lock before releasing write lock rwl.readLock().lock(); } finally { rwl.writeLock().unlock(); // Unlock write, still hold read } } try { use(data); } finally { rwl.readLock().unlock(); } } }}
可重入锁:同一线程在方法获取锁的时候,在进入前面方法内部调用其它方法会自动获取锁
公平锁:按照线程锁申请后顺序来获取锁
非公平锁:不一定按照线程锁申请先后顺序来获取锁
乐观锁:乐观地认为其他人读数据时都不会修改数据,不会上锁
悲观锁:悲观地认为其他人读数据时都会修改数据,会上锁,别人只能等待它释放锁
共享锁:同一时刻可以被多个线程拥有
独占锁:同一时刻只能被一个线程拥有
java.util.concurrent.Semaphore
通常一个信号量维持着一个许可证的集合,acquire
方法会申请许可证permit
,让线程阻塞直到许可证是空的,而release
方法会释放一个许可证
假设现在有一个类使用信号量去实现资源池,生产者消费者模式线程同步
public class Pool<E> { private final E[] items; private final Semaphore availableItems; private final Semaphore availableSpaces; private int putPosition = 0, takePosition = 0; Pool(int capacity) { availableItems = new Semaphore(0); availableSpaces = new Semaphore(capacity); items = (E[]) new Object[capacity]; } boolean isEmpty(){ return availableItems.availablePermits() == 0; } boolean isFull(){ return availableSpaces.availablePermits() == 0; } public void put(E x) throws InterruptedException { availableSpaces.acquire(); doInsert(x); availableItems.release(); } public E take() throws InterruptedException { availableItems.acquire(); E item = doExtract(); availableSpaces.release(); return item; } private synchronized void doInsert(E x) { int i = putPosition; items[i] = x; putPosition = (++i == items.length) ? 0 : i; } private synchronized E doExtract() { int i = takePosition; E x = items[i]; items[i] = null; takePosition = (++i == items.length) ? 0 : i; return x; } }
首先定义2个信号量Semaphore
:
availableItems
代表可用资源数,数值初始化为0
availableSpaces
可用空间数,数值初始化为capacity
生产消费操作实现方法:
从池中取出资源(take
):
availableItems.acquire()
查询availableItems
许可证,该方法会阻塞直到池中有可用资源doExtract方法
)availableSpaces.release()
释放许可证,表示池中多了一个可用的空间,可以用来存放新的资源放入资源至池中(put
):
availableSpaces.acquire()
查询availableSpaces
许可证,该方法会阻塞直到池中有可用空间doInsert方法
)availableItems.release()
释放许可证,表示池中多了一个可用资源,可以来访问该资源