typhoonpython 2020-01-10
多线程是java并发的基础,我们先来学习一下吧:
首先,让我们来起一个多线程,看看
public class HelloWorld {
public static void main(String[] args) throws Exception {
// lambda 写法
Thread t = new Thread(() -> {
System.out.println("thread start");
try {
Thread.sleep(100); // 模拟IO操作,让线程等100毫秒
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("thread end");
}
});
t.start();
System.out.println("main end");
}//thread start
// main end
// thread end mianend与thread start 打印顺序并非一定的,这个是并发,不一定谁会先执行
}线程一般会存在几个状态,New 新建的线程对象,Runnable 正在运行中,Block 被阻塞,Waitting 等待中, Timeed Waittding 被sleep计时等待, Terminated 执行完毕
public class HelloWorld {
public static void main(String[] args) throws Exception {
// lambda 写法
Thread t = new Thread(() -> {
System.out.println("thread start");
try {
Thread.sleep(100); // 模拟IO操作,让线程等100毫秒
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("thread end");
}
});
System.out.println(t.getState()); // NEW 未执行线程
t.start();
System.out.println(t.getState()); // RUNNABLE 正在运行的线程
System.out.println("main end");
Thread.sleep(10);
System.out.println(t.getState()); // TIMED_WAITING 被sleep阻塞住的线程
Thread.sleep(100);
System.out.println(t.getState()); // TERMINATED 线程退出
}
}在学习python的时候我们知道一个线程可以等待另外一个线程,那java中也是一样的,都使用join
public class HelloWorld {
public static void main(String[] args) throws Exception {
// lambda 写法
Thread t = new Thread(() -> {
System.out.println("thread start");
try {
Thread.sleep(100); // 模拟IO操作,让线程等100毫秒
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("thread end");
}
});
t.start();
t.join(); // 等待 t 结束再往下走
System.out.println("main end");
} // thread start
// thread end
// main end 这个main end会最终执行
}注意join中也是可以传参数的,传入的int值,意思是等待多少毫秒
进程如何中断呢,有两种方法
方法一
class MyThread extends Thread{
@Override
public void run(){
// 判断进程是否被终端
while(! isInterrupted()){
System.out.println("i am alive");
}
System.out.println("end");
}
}
public class HelloWorld {
public static void main(String[] args) throws Exception {
// lambda 写法
Thread t = new MyThread();
t.start();
Thread.sleep(1000);
t.interrupt(); // 中断进程
System.out.println("main end");
}
}还有一种是设置标识位
当我们使用public时,线程间是可以访问变量的,java虚拟机是将变量保存在主内存上,当某一个线程访问变量时,会复制一份走,然后保存在自己的工作空间,如果发生改写,虚拟机会在某个时间之后将修改后的变量值改写主内存,但是这个时间是不确定的,因此我们需要使用volatile来声明这个变量,这样就会做到下述两点:
class MyThread extends Thread {
public volatile boolean isExit = false;
@Override
public void run() {
// 判断标志位
while (!this.isExit) {
System.out.println("i am alive");
}
System.out.println("end");
}
}
public class HelloWorld {
public static void main(String[] args) throws Exception {
// lambda 写法
MyThread t = new MyThread();
t.start();
Thread.sleep(1);
t.isExit = true; // 中断进程
System.out.println("main end");
}
}我们在使用的如果不设置volatile中断时间感觉也很快,这个是因为JVM的回写主内存非常快,所以不要爆侥幸信息,一定要记得声明volatile
接下来看一下守护线程,守护线程就是当所有非守护线程结束时,他也会结束,我记得在pyton中又叫做傀儡线程,设置守护需要在启动线程之前。
class MyThread extends Thread {
@Override
public void run() {
// 判断进程是否被终端
try {
Thread.sleep(1000000);
// 当线程被推出时 会抛出 InterruptedException 这个错误
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end");
}
}
public class HelloWorld {
public static void main(String[] args) throws Exception {
// lambda 写法
MyThread t = new MyThread();
t.setDaemon(true);
t.start();
Thread.sleep(10);
System.out.println("main end");
}
}t这个线程是主线程的守护线程,会在主线程退出时退出。
说了这个,那么多线程对于修改数据,是不是安全的呢?我们来看一个例子
class Num{
public static int num = 0;
}
class AddThread extends Thread {
@Override
public void run() {
int i = 0;
for(;i<100000;i++){
Num.num += 1;
}
System.out.println("AddThread end");
}
}
class DecThread extends Thread{
@Override
public void run(){
int i = 0;
for(;i<100000;i++){
Num.num -=1;
}
System.out.println("DecThread end");
}
}
public class HelloWorld {
public static void main(String[] args) throws Exception {
// lambda 写法
Thread t = new AddThread();
Thread t1 = new DecThread();
System.out.println(Num.num); // 0
t.start();
t1.start();
t.join();
t1.join();
System.out.println(Num.num); // -803
}
}这个结果并非我们预期的0,因此需要咋办,这个是因为所有的线程调度都是由操作系统做的,如果add正在主内存中取值就被挂起,然后dec去主内存中取值并作减法,然后退回给主系统,之后add才调用,这个时候就会造成数据混乱,那么怎么办呢?加锁,也就是说,某个线程在操作数据时,另外一个线程是不能去碰这个数据
class Num {
public static int num = 0;
public static final Object lock = new Object();
}
class AddThread extends Thread {
@Override
public void run() {
int i = 0;
for (; i < 100000; i++) {
// synchronized 这个就锁住了Num.lock
synchronized (Num.lock) {
Num.num += 1;
}// 释放锁
}
System.out.println("AddThread end");
}
}
class DecThread extends Thread {
@Override
public void run() {
int i = 0;
for (; i < 100000; i++) {
synchronized (Num.lock) {
Num.num -= 1;
}
}
System.out.println("DecThread end");
}
}
public class HelloWorld {
public static void main(String[] args) throws Exception {
// lambda 写法
Thread t = new AddThread();
Thread t1 = new DecThread();
System.out.println(Num.num); // 0
t.start();
t1.start();
t.join();
t1.join();
System.out.println(Num.num); // 0
}
}另外对于一些复制操作(基础类型除了long,double,还有引用类型)是原子操作,不需要加锁
我们来看一下如何进行加锁
class Num {
public static int num = 0;
public static final Object lock = new Object();
private int nn = 0;
// 同步方法,这条语句相当于 synchronized(this){this.nn += 1;} 就是把整个实例都锁住了
public synchronized void add() {
this.nn += 1;
}
public synchronized void dec() {
this.nn -= 1;
}
public void printNn() {
System.out.println(this.nn);
}
}
class AddThread extends Thread {
private Num n = null;
public AddThread(Num n) {
super();
this.n = n;
}
@Override
public void run() {
int i = 0;
for (; i < 100000; i++) {
// synchronized 这个就锁住了Num.lock
synchronized (Num.lock) {
Num.num += 1;
}// 释放锁
this.n.add();
}
System.out.println("AddThread end");
}
}
class DecThread extends Thread {
private Num n = null;
public DecThread(Num n) {
super();
this.n = n;
}
@Override
public void run() {
int i = 0;
for (; i < 100000; i++) {
synchronized (Num.lock) {
Num.num -= 1;
}
this.n.dec();
}
System.out.println("DecThread end");
}
}
public class HelloWorld {
public static void main(String[] args) throws Exception {
Num n = new Num();
// lambda 写法
Thread t = new AddThread(n);
Thread t1 = new DecThread(n);
System.out.println(Num.num); // 0
t.start();
t1.start();
t.join();
t1.join();
System.out.println(Num.num); // 0
n.printNn(); // 0
}
}java中的锁是可重入锁,什么意思,就是可以重复获取,获取一次锁计数+1,释放一次锁计数-1,最终释放完毕所有锁就归零
因此如果我们将锁住的信息定义为final标识符,那么他就不能重复获取锁,这样一旦锁的顺序不对,就会产生死锁,如下面的例子
class Num {
public static int num = 0;
public static final Object lock = new Object();
public static final Object lock2 = new Object();
}
class AddThread extends Thread {
@Override
public void run() {
int i = 0;
for (; i < 100000; i++) {
// synchronized 这个就锁住了Num.lock
synchronized (Num.lock2) {
Num.num += 1;
synchronized (Num.lock) {
Num.num -= 1;
}
}// 释放锁
}
System.out.println("AddThread end");
}
}
class DecThread extends Thread {
@Override
public void run() {
int i = 0;
for (; i < 100000; i++) {
synchronized (Num.lock) {
Num.num -= 1;
synchronized (Num.lock2) {
Num.num += 1;
}
}
}
System.out.println("DecThread end");
}
}
public class HelloWorld {
public static void main(String[] args) throws Exception {
Thread t = new AddThread();
Thread t1 = new DecThread();
System.out.println(Num.num); // 0
t.start();
t1.start();
t.join();
t1.join();
System.out.println(Num.num); // 0
}
}t线程先锁住lock然后在去锁住lock2,t1线程先锁住lock2然后在去锁住lock,因为lock都是不可变的,有final标识符,这样t跟t1就产生了冲突,谁也无法获取对方的未释放的锁,产生了死锁,程序进入等待,因此一定要注意加锁的顺序
那么接下来我们需要思考对于队列来讲,如何进行多线程协同呢,也就是一些放任务,一些取任务,我们可能想到取任务时使用poll,但是我们要求必须取到任务,
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
class Task {
Queue<String> q = new LinkedList<>();
public synchronized void add(String s) {
q.add(s);
}
public synchronized String get() throws InterruptedException {
while (q.isEmpty()) {
System.out.println("wait me");
}
return q.remove();
}
}
public class HelloWorld {
public static void main(String[] args) throws Exception {
Task t = new Task();
List<Thread> tt = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Thread ts = new Thread(() -> {
while (true) {
String s = null;
try {
s = t.get();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(s);
}
});
ts.start();
tt.add(ts);
}
Thread add = new Thread() {
@Override
public void run() {
while (true) {
for (int i = 0; i < 100; i++) {
t.add("task" + i);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
};
add.start();
}
}我看上述代码,这个就是我们之前看的加锁,但是看输出结果确实无限的wait me,这是因为在获取任务时,因为为空就会陷入while的无限循环中,这个这个方法锁住的是this,这个实例,add也就无法进行添加任务,形成了死循环,那么这个就需要wait跟notifyall这两个方法,他们必须在synchorized中使用,wait就是释放当前锁,并休眠,notifyall就是通知所有休眠的进行起来抢锁并执行,上述代码修改为下面这样
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
class Task {
Queue<String> q = new LinkedList<>();
public synchronized void add(String s) {
q.add(s);
this.notifyAll();
}
public synchronized String get() throws InterruptedException {
while (q.isEmpty()) {
this.wait();
System.out.println("wait me");
}
return q.remove();
}
}
public class HelloWorld {
public static void main(String[] args) throws Exception {
Task t = new Task();
List<Thread> tt = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Thread ts = new Thread(() -> {
while (true) {
String s = null;
try {
s = t.get();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(s);
}
});
ts.start();
tt.add(ts);
}
Thread add = new Thread() {
@Override
public void run() {
while (true) {
for (int i = 0; i < 100; i++) {
t.add("task" + i);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
};
add.start();
}
}增加两行代码,就实现了我们的功能。
synchronized锁在锁比较多的情况下容易造成死锁,而且在想要获取锁的时候必须等待,没有任何尝试机制,接下来我们看一下另外一种锁的ReentrantLock
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class Task {
Queue<String> q = new LinkedList<>();
private final Lock lock = new ReentrantLock(); // 实例化锁
private final Condition condition = lock.newCondition(); // 通过该锁,实例化一个与之匹配的condition
public void add(String s) throws InterruptedException {
// 这个就是尝试获取锁,如果1s获取不到就不再等待,跳过这里面的代码块
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
q.add(s);
condition.signalAll(); // 等同于notifyall
} finally {
lock.unlock();
}
}
}
public String get() throws InterruptedException {
// 普通的锁,会等待
lock.lock();
try {
while (q.isEmpty()) {
condition.await(); // 等同于wait
}
return q.remove();
} finally {
lock.unlock();
}
}
}
public class HelloWorld {
public static void main(String[] args) throws Exception {
Task t = new Task();
List<Thread> tt = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Thread ts = new Thread(() -> {
while (true) {
String s = null;
try {
s = t.get();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(s);
}
});
ts.start();
tt.add(ts);
}
Thread add = new Thread() {
@Override
public void run() {
while (true) {
for (int i = 0; i < 100; i++) {
try {
t.add("task" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
};
add.start();
}
}加下来看一下读写锁,上面的例子其实所有的读操作也会被锁,但对于读操作来讲,不影响我们的数据,我们希望在进行写的时候锁住,不写的时候,读没有限制,因此可以使用读写锁
import java.util.*;
import java.util.concurrent.locks.*;
class Task {
Queue<String> q = new LinkedList<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock(); // 实例化读写锁
private final Lock rlock = lock.readLock(); // 实例化读锁
private final Lock wlock = lock.writeLock(); // 实例化写锁
private int[] n = new int[100];
public void add(int i) throws InterruptedException {
// 写入锁
wlock.lock();
try {
n[i] += 1;
} finally {
wlock.unlock();
}
}
public int[] get() throws InterruptedException {
// 读锁
rlock.lock();
try {
return Arrays.copyOf(n, n.length);
} finally {
rlock.unlock();
}
}
}
public class HelloWorld {
public static void main(String[] args) throws Exception {
Task t = new Task();
List<Thread> tt = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Thread ts = new Thread(() -> {
while (true) {
int[] s = null;
try {
s = t.get();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(s);
}
});
ts.start();
tt.add(ts);
}
Thread add = new Thread() {
@Override
public void run() {
while (true) {
for (int i = 0; i < 100; i++) {
try {
t.add(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
};
add.start();
}
}上面的读写锁是一种悲观锁,比如说,如果换成队列,也就是说,写的时候,读锁必须全部释放完毕,否则无法进行下一步操作,可以使用之前队列的例子试试,下面看一下乐观锁
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.*;
class Task {
private final StampedLock lock = new StampedLock(); // 实例化锁
private Double s = 0.00;
public void add(Double d) {
// 写入锁
long stamp = lock.writeLock();
try {
s += d;
} finally {
lock.unlockWrite(stamp);
}
}
public Double get() {
// 获取乐观锁
long stamp = lock.tryOptimisticRead();
Double d = s; // double 赋值非原子级操作
System.out.println("du");
// 检查当前所是否是最新的
if (!lock.validate(stamp)) {
System.out.println("not new");
// 如果不是则获取悲观锁
stamp = lock.readLock();
try {
d = s;
} finally {
lock.unlockRead(stamp);
}
}
return d;
}
}
public class HelloWorld {
public static void main(String[] args) throws Exception {
Task t = new Task();
List<Thread> tt = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Thread ts = new Thread(() -> {
while (true) {
Double s;
s = t.get();
System.out.println(s);
}
});
ts.start();
tt.add(ts);
}
Thread add = new Thread() {
@Override
public void run() {
while (true) {
for (int i = 0; i < 100; i++) {
t.add(1.0000001);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
};
add.start();
}
}说完线程,那接下来我们看一下线程池
import java.util.ArrayList;
import java.util.concurrent.*;
class Task implements Callable<String> {
public String call() throws Exception {
Thread.sleep(10000);
return "mmm";
}
}
class Task1 implements Runnable {
private int i;
public Task1(int d) {
this.i = d;
}
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end" + this.i);
}
}
class MyThread extends Thread {
private Future<String> f;
public MyThread(Future<String> f) {
super();
this.f = f;
}
@Override
public void run() {
try {
System.out.println(f.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
public class HelloWorld {
public static void main(String[] args) throws Exception {
ExecutorService es = Executors.newFixedThreadPool(4); // 固定大小的线程池
// <T> Future<T> submit(Callable<T> task); 这个方法是往线程池中塞任务,看到接受的是Callable类型,返回的是Future类型,那就按这个借口来定义
Future<String> f = es.submit(new Task()); // 获取一个未来产生的Future对象
try {
String m = f.get(1, TimeUnit.SECONDS); // 获取结果只等待指定时间,如果没有等到,会报出下面的错误
System.out.println(m);
} catch (Exception e) {
System.out.println(e); // java.util.concurrent.TimeoutException
} finally {
System.out.println("not recv");
}
System.out.println(f.isDone()); // 判断是否完成
System.out.println(f.get()); // 等待获取结果,会一直阻塞
es.shutdown(); // 关闭线程池
ExecutorService es1 = Executors.newCachedThreadPool(); // 动态变化的线程池
var f1 = new ArrayList<Future<String>>();
for (int i = 0; i < 10; i++) {
Future<String> ff = es1.submit(new Task()); // 在这里尽量不要用定值的线程池,如果超出线程池大小,会报错
f1.add(ff);
} // 十个 mmm 同时打印
for (Future<String> ffff : f1) {
var mt = new MyThread(ffff);
mt.start();
mt.join();
}
Thread.sleep(10100); // 记得晚点关闭线程池
es1.shutdown();
ExecutorService es2 = Executors.newFixedThreadPool(2); // 固定大小的线程池
for (int i = 0; i < 6; i++) {
es2.submit(new Task1(i));
} // 两个两个的打印
Thread.sleep(10000);
es2.shutdown();
// 定时反复执行任务的线程池
ScheduledExecutorService es3 = Executors.newScheduledThreadPool(4);
es3.schedule(new Task1(1), 4, TimeUnit.SECONDS); // 4 s后执行这个任务
es3.scheduleAtFixedRate(new Task1(2), 2, 1, TimeUnit.SECONDS); // 两秒后,每隔1秒执行一次任务
es3.scheduleWithFixedDelay(new Task1(3), 2, 1, TimeUnit.SECONDS); // 两秒后,上一次任务结束,隔1秒执行一次任务
Thread.sleep(10000);
es3.shutdown();
}
}使用feature总是得用get等待,或者轮询看看是否isDone,来看一下自动调用回掉对象
import java.util.concurrent.*;
public class HelloWorld {
public static void main(String[] args) throws Exception {
// 这个是串行
CompletableFuture<String> f = CompletableFuture.supplyAsync(HelloWorld::call); // 第一个任务
// 上面任务完成后执行这个任务,s是上一个的返回值
CompletableFuture<String> f1 = f.thenApplyAsync((s) -> {
System.out.println(s); // mmm
return HelloWorld.call();
});
// f1成功之后打印
f1.thenAccept((res) -> {
System.out.println(res); // mmm
});
// f1失败打印
f1.exceptionally(e -> {
e.printStackTrace();
return null;
});
Thread.sleep(4000);
System.out.println("开始并行");
// 接下来看一下并行的
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
return call1(2);
});
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> {
return call1(3);
});
CompletableFuture<String> f4 = CompletableFuture.supplyAsync(() -> {
return call1(4);
});
CompletableFuture<String> f5 = CompletableFuture.supplyAsync(() -> {
return call1(5);
});
CompletableFuture<Object> ff = CompletableFuture.anyOf(f2, f3, f4, f5);
f2.thenAccept(System.out::println);
f3.thenAccept(System.out::println);
f4.thenAccept(System.out::println);
f5.thenAccept(System.out::println);
ff.thenAccept(System.out::println); // 感觉默认的线程应该是3个,因为第四个总感觉执行的慢一点,ff就是谁的快接受那个线程
Thread.sleep(4000);
}
static String call() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "mmm";
}
static String call1(int n) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "mmm" + n;
}
}