typhoonpython 2020-01-10
多线程是java并发的基础,我们先来学习一下吧:
首先,让我们来起一个多线程,看看
public class HelloWorld { public static void main(String[] args) throws Exception { // lambda 写法 Thread t = new Thread(() -> { System.out.println("thread start"); try { Thread.sleep(100); // 模拟IO操作,让线程等100毫秒 } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("thread end"); } }); t.start(); System.out.println("main end"); }//thread start // main end // thread end mianend与thread start 打印顺序并非一定的,这个是并发,不一定谁会先执行 }
线程一般会存在几个状态,New 新建的线程对象,Runnable 正在运行中,Block 被阻塞,Waitting 等待中, Timeed Waittding 被sleep计时等待, Terminated 执行完毕
public class HelloWorld { public static void main(String[] args) throws Exception { // lambda 写法 Thread t = new Thread(() -> { System.out.println("thread start"); try { Thread.sleep(100); // 模拟IO操作,让线程等100毫秒 } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("thread end"); } }); System.out.println(t.getState()); // NEW 未执行线程 t.start(); System.out.println(t.getState()); // RUNNABLE 正在运行的线程 System.out.println("main end"); Thread.sleep(10); System.out.println(t.getState()); // TIMED_WAITING 被sleep阻塞住的线程 Thread.sleep(100); System.out.println(t.getState()); // TERMINATED 线程退出 } }
在学习python的时候我们知道一个线程可以等待另外一个线程,那java中也是一样的,都使用join
public class HelloWorld { public static void main(String[] args) throws Exception { // lambda 写法 Thread t = new Thread(() -> { System.out.println("thread start"); try { Thread.sleep(100); // 模拟IO操作,让线程等100毫秒 } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("thread end"); } }); t.start(); t.join(); // 等待 t 结束再往下走 System.out.println("main end"); } // thread start // thread end // main end 这个main end会最终执行 }
注意join中也是可以传参数的,传入的int值,意思是等待多少毫秒
进程如何中断呢,有两种方法
方法一
class MyThread extends Thread{ @Override public void run(){ // 判断进程是否被终端 while(! isInterrupted()){ System.out.println("i am alive"); } System.out.println("end"); } } public class HelloWorld { public static void main(String[] args) throws Exception { // lambda 写法 Thread t = new MyThread(); t.start(); Thread.sleep(1000); t.interrupt(); // 中断进程 System.out.println("main end"); } }
还有一种是设置标识位
当我们使用public时,线程间是可以访问变量的,java虚拟机是将变量保存在主内存上,当某一个线程访问变量时,会复制一份走,然后保存在自己的工作空间,如果发生改写,虚拟机会在某个时间之后将修改后的变量值改写主内存,但是这个时间是不确定的,因此我们需要使用volatile来声明这个变量,这样就会做到下述两点:
class MyThread extends Thread { public volatile boolean isExit = false; @Override public void run() { // 判断标志位 while (!this.isExit) { System.out.println("i am alive"); } System.out.println("end"); } } public class HelloWorld { public static void main(String[] args) throws Exception { // lambda 写法 MyThread t = new MyThread(); t.start(); Thread.sleep(1); t.isExit = true; // 中断进程 System.out.println("main end"); } }
我们在使用的如果不设置volatile中断时间感觉也很快,这个是因为JVM的回写主内存非常快,所以不要爆侥幸信息,一定要记得声明volatile
接下来看一下守护线程,守护线程就是当所有非守护线程结束时,他也会结束,我记得在pyton中又叫做傀儡线程,设置守护需要在启动线程之前。
class MyThread extends Thread { @Override public void run() { // 判断进程是否被终端 try { Thread.sleep(1000000); // 当线程被推出时 会抛出 InterruptedException 这个错误 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("end"); } } public class HelloWorld { public static void main(String[] args) throws Exception { // lambda 写法 MyThread t = new MyThread(); t.setDaemon(true); t.start(); Thread.sleep(10); System.out.println("main end"); } }
t这个线程是主线程的守护线程,会在主线程退出时退出。
说了这个,那么多线程对于修改数据,是不是安全的呢?我们来看一个例子
class Num{ public static int num = 0; } class AddThread extends Thread { @Override public void run() { int i = 0; for(;i<100000;i++){ Num.num += 1; } System.out.println("AddThread end"); } } class DecThread extends Thread{ @Override public void run(){ int i = 0; for(;i<100000;i++){ Num.num -=1; } System.out.println("DecThread end"); } } public class HelloWorld { public static void main(String[] args) throws Exception { // lambda 写法 Thread t = new AddThread(); Thread t1 = new DecThread(); System.out.println(Num.num); // 0 t.start(); t1.start(); t.join(); t1.join(); System.out.println(Num.num); // -803 } }
这个结果并非我们预期的0,因此需要咋办,这个是因为所有的线程调度都是由操作系统做的,如果add正在主内存中取值就被挂起,然后dec去主内存中取值并作减法,然后退回给主系统,之后add才调用,这个时候就会造成数据混乱,那么怎么办呢?加锁,也就是说,某个线程在操作数据时,另外一个线程是不能去碰这个数据
class Num { public static int num = 0; public static final Object lock = new Object(); } class AddThread extends Thread { @Override public void run() { int i = 0; for (; i < 100000; i++) { // synchronized 这个就锁住了Num.lock synchronized (Num.lock) { Num.num += 1; }// 释放锁 } System.out.println("AddThread end"); } } class DecThread extends Thread { @Override public void run() { int i = 0; for (; i < 100000; i++) { synchronized (Num.lock) { Num.num -= 1; } } System.out.println("DecThread end"); } } public class HelloWorld { public static void main(String[] args) throws Exception { // lambda 写法 Thread t = new AddThread(); Thread t1 = new DecThread(); System.out.println(Num.num); // 0 t.start(); t1.start(); t.join(); t1.join(); System.out.println(Num.num); // 0 } }
另外对于一些复制操作(基础类型除了long,double,还有引用类型)是原子操作,不需要加锁
我们来看一下如何进行加锁
class Num { public static int num = 0; public static final Object lock = new Object(); private int nn = 0; // 同步方法,这条语句相当于 synchronized(this){this.nn += 1;} 就是把整个实例都锁住了 public synchronized void add() { this.nn += 1; } public synchronized void dec() { this.nn -= 1; } public void printNn() { System.out.println(this.nn); } } class AddThread extends Thread { private Num n = null; public AddThread(Num n) { super(); this.n = n; } @Override public void run() { int i = 0; for (; i < 100000; i++) { // synchronized 这个就锁住了Num.lock synchronized (Num.lock) { Num.num += 1; }// 释放锁 this.n.add(); } System.out.println("AddThread end"); } } class DecThread extends Thread { private Num n = null; public DecThread(Num n) { super(); this.n = n; } @Override public void run() { int i = 0; for (; i < 100000; i++) { synchronized (Num.lock) { Num.num -= 1; } this.n.dec(); } System.out.println("DecThread end"); } } public class HelloWorld { public static void main(String[] args) throws Exception { Num n = new Num(); // lambda 写法 Thread t = new AddThread(n); Thread t1 = new DecThread(n); System.out.println(Num.num); // 0 t.start(); t1.start(); t.join(); t1.join(); System.out.println(Num.num); // 0 n.printNn(); // 0 } }
java中的锁是可重入锁,什么意思,就是可以重复获取,获取一次锁计数+1,释放一次锁计数-1,最终释放完毕所有锁就归零
因此如果我们将锁住的信息定义为final标识符,那么他就不能重复获取锁,这样一旦锁的顺序不对,就会产生死锁,如下面的例子
class Num { public static int num = 0; public static final Object lock = new Object(); public static final Object lock2 = new Object(); } class AddThread extends Thread { @Override public void run() { int i = 0; for (; i < 100000; i++) { // synchronized 这个就锁住了Num.lock synchronized (Num.lock2) { Num.num += 1; synchronized (Num.lock) { Num.num -= 1; } }// 释放锁 } System.out.println("AddThread end"); } } class DecThread extends Thread { @Override public void run() { int i = 0; for (; i < 100000; i++) { synchronized (Num.lock) { Num.num -= 1; synchronized (Num.lock2) { Num.num += 1; } } } System.out.println("DecThread end"); } } public class HelloWorld { public static void main(String[] args) throws Exception { Thread t = new AddThread(); Thread t1 = new DecThread(); System.out.println(Num.num); // 0 t.start(); t1.start(); t.join(); t1.join(); System.out.println(Num.num); // 0 } }
t线程先锁住lock然后在去锁住lock2,t1线程先锁住lock2然后在去锁住lock,因为lock都是不可变的,有final标识符,这样t跟t1就产生了冲突,谁也无法获取对方的未释放的锁,产生了死锁,程序进入等待,因此一定要注意加锁的顺序
那么接下来我们需要思考对于队列来讲,如何进行多线程协同呢,也就是一些放任务,一些取任务,我们可能想到取任务时使用poll,但是我们要求必须取到任务,
import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Queue; class Task { Queue<String> q = new LinkedList<>(); public synchronized void add(String s) { q.add(s); } public synchronized String get() throws InterruptedException { while (q.isEmpty()) { System.out.println("wait me"); } return q.remove(); } } public class HelloWorld { public static void main(String[] args) throws Exception { Task t = new Task(); List<Thread> tt = new ArrayList<>(); for (int i = 0; i < 10; i++) { Thread ts = new Thread(() -> { while (true) { String s = null; try { s = t.get(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(s); } }); ts.start(); tt.add(ts); } Thread add = new Thread() { @Override public void run() { while (true) { for (int i = 0; i < 100; i++) { t.add("task" + i); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } } }; add.start(); } }
我看上述代码,这个就是我们之前看的加锁,但是看输出结果确实无限的wait me,这是因为在获取任务时,因为为空就会陷入while的无限循环中,这个这个方法锁住的是this,这个实例,add也就无法进行添加任务,形成了死循环,那么这个就需要wait跟notifyall这两个方法,他们必须在synchorized中使用,wait就是释放当前锁,并休眠,notifyall就是通知所有休眠的进行起来抢锁并执行,上述代码修改为下面这样
import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Queue; class Task { Queue<String> q = new LinkedList<>(); public synchronized void add(String s) { q.add(s); this.notifyAll(); } public synchronized String get() throws InterruptedException { while (q.isEmpty()) { this.wait(); System.out.println("wait me"); } return q.remove(); } } public class HelloWorld { public static void main(String[] args) throws Exception { Task t = new Task(); List<Thread> tt = new ArrayList<>(); for (int i = 0; i < 10; i++) { Thread ts = new Thread(() -> { while (true) { String s = null; try { s = t.get(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(s); } }); ts.start(); tt.add(ts); } Thread add = new Thread() { @Override public void run() { while (true) { for (int i = 0; i < 100; i++) { t.add("task" + i); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } } }; add.start(); } }
增加两行代码,就实现了我们的功能。
synchronized锁在锁比较多的情况下容易造成死锁,而且在想要获取锁的时候必须等待,没有任何尝试机制,接下来我们看一下另外一种锁的ReentrantLock
import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class Task { Queue<String> q = new LinkedList<>(); private final Lock lock = new ReentrantLock(); // 实例化锁 private final Condition condition = lock.newCondition(); // 通过该锁,实例化一个与之匹配的condition public void add(String s) throws InterruptedException { // 这个就是尝试获取锁,如果1s获取不到就不再等待,跳过这里面的代码块 if (lock.tryLock(1, TimeUnit.SECONDS)) { try { q.add(s); condition.signalAll(); // 等同于notifyall } finally { lock.unlock(); } } } public String get() throws InterruptedException { // 普通的锁,会等待 lock.lock(); try { while (q.isEmpty()) { condition.await(); // 等同于wait } return q.remove(); } finally { lock.unlock(); } } } public class HelloWorld { public static void main(String[] args) throws Exception { Task t = new Task(); List<Thread> tt = new ArrayList<>(); for (int i = 0; i < 10; i++) { Thread ts = new Thread(() -> { while (true) { String s = null; try { s = t.get(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(s); } }); ts.start(); tt.add(ts); } Thread add = new Thread() { @Override public void run() { while (true) { for (int i = 0; i < 100; i++) { try { t.add("task" + i); } catch (InterruptedException e) { e.printStackTrace(); } try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } } }; add.start(); } }
加下来看一下读写锁,上面的例子其实所有的读操作也会被锁,但对于读操作来讲,不影响我们的数据,我们希望在进行写的时候锁住,不写的时候,读没有限制,因此可以使用读写锁
import java.util.*; import java.util.concurrent.locks.*; class Task { Queue<String> q = new LinkedList<>(); private final ReadWriteLock lock = new ReentrantReadWriteLock(); // 实例化读写锁 private final Lock rlock = lock.readLock(); // 实例化读锁 private final Lock wlock = lock.writeLock(); // 实例化写锁 private int[] n = new int[100]; public void add(int i) throws InterruptedException { // 写入锁 wlock.lock(); try { n[i] += 1; } finally { wlock.unlock(); } } public int[] get() throws InterruptedException { // 读锁 rlock.lock(); try { return Arrays.copyOf(n, n.length); } finally { rlock.unlock(); } } } public class HelloWorld { public static void main(String[] args) throws Exception { Task t = new Task(); List<Thread> tt = new ArrayList<>(); for (int i = 0; i < 10; i++) { Thread ts = new Thread(() -> { while (true) { int[] s = null; try { s = t.get(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(s); } }); ts.start(); tt.add(ts); } Thread add = new Thread() { @Override public void run() { while (true) { for (int i = 0; i < 100; i++) { try { t.add(i); } catch (InterruptedException e) { e.printStackTrace(); } try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } } }; add.start(); } }
上面的读写锁是一种悲观锁,比如说,如果换成队列,也就是说,写的时候,读锁必须全部释放完毕,否则无法进行下一步操作,可以使用之前队列的例子试试,下面看一下乐观锁
import java.util.ArrayList; import java.util.List; import java.util.concurrent.locks.*; class Task { private final StampedLock lock = new StampedLock(); // 实例化锁 private Double s = 0.00; public void add(Double d) { // 写入锁 long stamp = lock.writeLock(); try { s += d; } finally { lock.unlockWrite(stamp); } } public Double get() { // 获取乐观锁 long stamp = lock.tryOptimisticRead(); Double d = s; // double 赋值非原子级操作 System.out.println("du"); // 检查当前所是否是最新的 if (!lock.validate(stamp)) { System.out.println("not new"); // 如果不是则获取悲观锁 stamp = lock.readLock(); try { d = s; } finally { lock.unlockRead(stamp); } } return d; } } public class HelloWorld { public static void main(String[] args) throws Exception { Task t = new Task(); List<Thread> tt = new ArrayList<>(); for (int i = 0; i < 10; i++) { Thread ts = new Thread(() -> { while (true) { Double s; s = t.get(); System.out.println(s); } }); ts.start(); tt.add(ts); } Thread add = new Thread() { @Override public void run() { while (true) { for (int i = 0; i < 100; i++) { t.add(1.0000001); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } } } }; add.start(); } }
说完线程,那接下来我们看一下线程池
import java.util.ArrayList; import java.util.concurrent.*; class Task implements Callable<String> { public String call() throws Exception { Thread.sleep(10000); return "mmm"; } } class Task1 implements Runnable { private int i; public Task1(int d) { this.i = d; } public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("end" + this.i); } } class MyThread extends Thread { private Future<String> f; public MyThread(Future<String> f) { super(); this.f = f; } @Override public void run() { try { System.out.println(f.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } public class HelloWorld { public static void main(String[] args) throws Exception { ExecutorService es = Executors.newFixedThreadPool(4); // 固定大小的线程池 // <T> Future<T> submit(Callable<T> task); 这个方法是往线程池中塞任务,看到接受的是Callable类型,返回的是Future类型,那就按这个借口来定义 Future<String> f = es.submit(new Task()); // 获取一个未来产生的Future对象 try { String m = f.get(1, TimeUnit.SECONDS); // 获取结果只等待指定时间,如果没有等到,会报出下面的错误 System.out.println(m); } catch (Exception e) { System.out.println(e); // java.util.concurrent.TimeoutException } finally { System.out.println("not recv"); } System.out.println(f.isDone()); // 判断是否完成 System.out.println(f.get()); // 等待获取结果,会一直阻塞 es.shutdown(); // 关闭线程池 ExecutorService es1 = Executors.newCachedThreadPool(); // 动态变化的线程池 var f1 = new ArrayList<Future<String>>(); for (int i = 0; i < 10; i++) { Future<String> ff = es1.submit(new Task()); // 在这里尽量不要用定值的线程池,如果超出线程池大小,会报错 f1.add(ff); } // 十个 mmm 同时打印 for (Future<String> ffff : f1) { var mt = new MyThread(ffff); mt.start(); mt.join(); } Thread.sleep(10100); // 记得晚点关闭线程池 es1.shutdown(); ExecutorService es2 = Executors.newFixedThreadPool(2); // 固定大小的线程池 for (int i = 0; i < 6; i++) { es2.submit(new Task1(i)); } // 两个两个的打印 Thread.sleep(10000); es2.shutdown(); // 定时反复执行任务的线程池 ScheduledExecutorService es3 = Executors.newScheduledThreadPool(4); es3.schedule(new Task1(1), 4, TimeUnit.SECONDS); // 4 s后执行这个任务 es3.scheduleAtFixedRate(new Task1(2), 2, 1, TimeUnit.SECONDS); // 两秒后,每隔1秒执行一次任务 es3.scheduleWithFixedDelay(new Task1(3), 2, 1, TimeUnit.SECONDS); // 两秒后,上一次任务结束,隔1秒执行一次任务 Thread.sleep(10000); es3.shutdown(); } }
使用feature总是得用get等待,或者轮询看看是否isDone,来看一下自动调用回掉对象
import java.util.concurrent.*; public class HelloWorld { public static void main(String[] args) throws Exception { // 这个是串行 CompletableFuture<String> f = CompletableFuture.supplyAsync(HelloWorld::call); // 第一个任务 // 上面任务完成后执行这个任务,s是上一个的返回值 CompletableFuture<String> f1 = f.thenApplyAsync((s) -> { System.out.println(s); // mmm return HelloWorld.call(); }); // f1成功之后打印 f1.thenAccept((res) -> { System.out.println(res); // mmm }); // f1失败打印 f1.exceptionally(e -> { e.printStackTrace(); return null; }); Thread.sleep(4000); System.out.println("开始并行"); // 接下来看一下并行的 CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { return call1(2); }); CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> { return call1(3); }); CompletableFuture<String> f4 = CompletableFuture.supplyAsync(() -> { return call1(4); }); CompletableFuture<String> f5 = CompletableFuture.supplyAsync(() -> { return call1(5); }); CompletableFuture<Object> ff = CompletableFuture.anyOf(f2, f3, f4, f5); f2.thenAccept(System.out::println); f3.thenAccept(System.out::println); f4.thenAccept(System.out::println); f5.thenAccept(System.out::println); ff.thenAccept(System.out::println); // 感觉默认的线程应该是3个,因为第四个总感觉执行的慢一点,ff就是谁的快接受那个线程 Thread.sleep(4000); } static String call() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "mmm"; } static String call1(int n) { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "mmm" + n; } }