zwjcyz 2012-08-01
程序设计需要同步(synchronization),原因:
1)复杂的功能要求的需要使用多线程编程,线程之间存在读写共享变量。
2)读写共享变量(shared mutual variable),JVM的内存模型(Memory model: decide when and how changes made by one thread become visuble to others)受到其它因素干扰。
3)对共享变量的操作非原子性。例如 i++;就不是原子操作,它分为两部分,(1) 读i (2) i+1写入内存。如果i是线程A和线程B共享的变量,线程A在操作(1)之后,线程调度器调度调度线程B执行i++,因此两个线程在变量i产生了不一致。注意,volatile修饰符是线程操作之前更新数据,但是,上面的问题显然不是更新数据就能解决的。
4)增加互斥区(mutual exclusion)会降低执行效率,但是这是实现数据安全、功能强大的多线程编程最为重要的部分。
5)线程之间需要配合的场景需要并发控制逻辑。
Java并发编程使用的方法:
1) 为代码块和函数添加synchronized,同步的作用有两点:
(1)a means of mutual exclusion, to prevent an object from being observed in an inconsistent state while it’s being modified by another thread.
Hadoop源码使用并发控制的实例:
Map阶段产生<K,V>会先存储在内存中,等到io.sort.mb指定的内存达到阈值(percent)时,会启动spill到本地磁盘的工作。
ReentrantLock与Condition的配合使用,Condition为ReentrantLock锁的等待和释放提供控制逻辑。
例如,使用ReentrantLock加锁之后,可以通过它自身的Condition.await()方法释放该锁,线程在此等待Condition.signal()方法,然后继续执行下去。await方法需要放在while循环中,因此,在不同线程之间实现并发控制,还需要一个volatile的变量,boolean是原子性的变量。因此,一般的并发控制的操作逻辑如下所示:
volatile boolean isProcess = false;
ReentrantLock lock = new ReentrantLock();
Condtion processReady = lock.newCondtion();
thread: run() {
lock.lock();
isProcess = true;
try {
while(!isProcessReady) { //isProcessReady 是另外一个线程的控制变量
processReady.await();//释放了lock,在此等待signal
}catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
isProcess = false;
}
}
}
}
看Hadoop的一段摘取的源码:
private class MapOutputBuffer<K extends Object, V extends Object>
implements MapOutputCollector<K, V>, IndexedSortable {
...
boolean spillInProgress;
final ReentrantLock spillLock = new ReentrantLock();
final Condition spillDone = spillLock.newCondition();
final Condition spillReady = spillLock.newCondition();
volatile boolean spillThreadRunning = false;
final SpillThread spillThread = new SpillThread();
...
public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
...
spillInProgress = false;
spillThread.setDaemon(true);
spillThread.setName("SpillThread");
spillLock.lock();
try {
spillThread.start();
while (!spillThreadRunning) {
spillDone.await();
}
} catch (InterruptedException e) {
throw new IOException("Spill thread failed to initialize", e);
} finally {
spillLock.unlock();
}
}
protected class SpillThread extends Thread {
@Override
public void run() {
spillLock.lock();
spillThreadRunning = true;
try {
while (true) {
spillDone.signal();
while (!spillInProgress) {
spillReady.await();
}
try {
spillLock.unlock();
sortAndSpill();
} catch (Throwable t) {
sortSpillException = t;
} finally {
spillLock.lock();
if (bufend < bufstart) {
bufvoid = kvbuffer.length;
}
kvstart = kvend;
bufstart = bufend;
spillInProgress = false;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
spillLock.unlock();
spillThreadRunning = false;
}
}
}