java入门篇13 -- 多线程

typhoonpython 2020-01-10

多线程是java并发的基础,我们先来学习一下吧:

首先,让我们来起一个多线程,看看

public class HelloWorld {
    public static void main(String[] args) throws Exception {
        // lambda 写法
        Thread t = new Thread(() -> {
            System.out.println("thread start");
            try {
                Thread.sleep(100);  // 模拟IO操作,让线程等100毫秒
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println("thread end");
            }
        });
        t.start();
        System.out.println("main end");
    }//thread start
    // main end
    // thread end  mianend与thread start 打印顺序并非一定的,这个是并发,不一定谁会先执行
}

线程一般会存在几个状态,New 新建的线程对象,Runnable 正在运行中,Block 被阻塞,Waitting 等待中, Timeed Waittding 被sleep计时等待, Terminated 执行完毕

public class HelloWorld {
    public static void main(String[] args) throws Exception {
        // lambda 写法
        Thread t = new Thread(() -> {
            System.out.println("thread start");
            try {
                Thread.sleep(100);  // 模拟IO操作,让线程等100毫秒
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println("thread end");
            }
        });
        System.out.println(t.getState());  // NEW 未执行线程
        t.start();
        System.out.println(t.getState());  // RUNNABLE 正在运行的线程
        System.out.println("main end");
        Thread.sleep(10);
        System.out.println(t.getState());  // TIMED_WAITING  被sleep阻塞住的线程
        Thread.sleep(100);
        System.out.println(t.getState());  // TERMINATED  线程退出
    }
}

在学习python的时候我们知道一个线程可以等待另外一个线程,那java中也是一样的,都使用join

public class HelloWorld {
    public static void main(String[] args) throws Exception {
        // lambda 写法
        Thread t = new Thread(() -> {
            System.out.println("thread start");
            try {
                Thread.sleep(100);  // 模拟IO操作,让线程等100毫秒
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println("thread end");
            }
        });
        t.start();
        t.join();  // 等待 t 结束再往下走
        System.out.println("main end");
    }  // thread start
    // thread end
    // main end 这个main end会最终执行
}

注意join中也是可以传参数的,传入的int值,意思是等待多少毫秒

进程如何中断呢,有两种方法

方法一

class MyThread extends Thread{
    @Override
    public void run(){
        // 判断进程是否被终端
        while(! isInterrupted()){
            System.out.println("i am alive");
        }
        System.out.println("end");
    }
}
public class HelloWorld {
    public static void main(String[] args) throws Exception {
        // lambda 写法
        Thread t = new MyThread();
        t.start();
        Thread.sleep(1000);
        t.interrupt();  // 中断进程
        System.out.println("main end");
    }
}

还有一种是设置标识位

当我们使用public时,线程间是可以访问变量的,java虚拟机是将变量保存在主内存上,当某一个线程访问变量时,会复制一份走,然后保存在自己的工作空间,如果发生改写,虚拟机会在某个时间之后将修改后的变量值改写主内存,但是这个时间是不确定的,因此我们需要使用volatile来声明这个变量,这样就会做到下述两点:

  • 每次访问变量,都会去主内存中取复制一份
  • 每次修改变量,会立即写会主内存
class MyThread extends Thread {
    public volatile boolean isExit = false;

    @Override
    public void run() {
        // 判断标志位
        while (!this.isExit) {
            System.out.println("i am alive");
        }
        System.out.println("end");
    }
}

public class HelloWorld {
    public static void main(String[] args) throws Exception {
        // lambda 写法
        MyThread t = new MyThread();
        t.start();
        Thread.sleep(1);
        t.isExit = true;  // 中断进程
        System.out.println("main end");
    }
}

我们在使用的如果不设置volatile中断时间感觉也很快,这个是因为JVM的回写主内存非常快,所以不要爆侥幸信息,一定要记得声明volatile

接下来看一下守护线程,守护线程就是当所有非守护线程结束时,他也会结束,我记得在pyton中又叫做傀儡线程,设置守护需要在启动线程之前。

class MyThread extends Thread {
    @Override
    public void run() {
        // 判断进程是否被终端
        try {
            Thread.sleep(1000000);
            // 当线程被推出时 会抛出 InterruptedException 这个错误
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("end");
    }
}

public class HelloWorld {
    public static void main(String[] args) throws Exception {
        // lambda 写法
        MyThread t = new MyThread();
        t.setDaemon(true);
        t.start();
        Thread.sleep(10);
        System.out.println("main end");
    }
}

t这个线程是主线程的守护线程,会在主线程退出时退出。

说了这个,那么多线程对于修改数据,是不是安全的呢?我们来看一个例子

class Num{
    public static int num = 0;
}
class AddThread extends Thread {
    @Override
    public void run() {
        int i = 0;
        for(;i<100000;i++){
            Num.num += 1;
        }
        System.out.println("AddThread end");
    }
}
class DecThread extends Thread{
    @Override
    public void run(){
        int i = 0;
        for(;i<100000;i++){
            Num.num -=1;
        }
        System.out.println("DecThread end");
    }
}

public class HelloWorld {
    public static void main(String[] args) throws Exception {
        // lambda 写法
        Thread t = new AddThread();
        Thread t1 = new DecThread();
        System.out.println(Num.num);  // 0
        t.start();
        t1.start();
        t.join();
        t1.join();
        System.out.println(Num.num);  // -803
    }
}

这个结果并非我们预期的0,因此需要咋办,这个是因为所有的线程调度都是由操作系统做的,如果add正在主内存中取值就被挂起,然后dec去主内存中取值并作减法,然后退回给主系统,之后add才调用,这个时候就会造成数据混乱,那么怎么办呢?加锁,也就是说,某个线程在操作数据时,另外一个线程是不能去碰这个数据

class Num {
    public static int num = 0;
    public static final Object lock = new Object();
}

class AddThread extends Thread {
    @Override
    public void run() {
        int i = 0;
        for (; i < 100000; i++) {
            // synchronized 这个就锁住了Num.lock
            synchronized (Num.lock) {
                Num.num += 1;
            }// 释放锁
        }
        System.out.println("AddThread end");
    }
}

class DecThread extends Thread {
    @Override
    public void run() {
        int i = 0;
        for (; i < 100000; i++) {
            synchronized (Num.lock) {
                Num.num -= 1;
            }
        }
        System.out.println("DecThread end");
    }
}

public class HelloWorld {
    public static void main(String[] args) throws Exception {
        // lambda 写法
        Thread t = new AddThread();
        Thread t1 = new DecThread();
        System.out.println(Num.num);  // 0
        t.start();
        t1.start();
        t.join();
        t1.join();
        System.out.println(Num.num);  // 0
    }
}

另外对于一些复制操作(基础类型除了long,double,还有引用类型)是原子操作,不需要加锁

我们来看一下如何进行加锁

class Num {
    public static int num = 0;
    public static final Object lock = new Object();
    private int nn = 0;

    // 同步方法,这条语句相当于 synchronized(this){this.nn += 1;} 就是把整个实例都锁住了
    public synchronized void add() {
        this.nn += 1;
    }

    public synchronized void dec() {
        this.nn -= 1;
    }

    public void printNn() {
        System.out.println(this.nn);
    }
}

class AddThread extends Thread {
    private Num n = null;

    public AddThread(Num n) {
        super();
        this.n = n;
    }

    @Override
    public void run() {
        int i = 0;
        for (; i < 100000; i++) {
            // synchronized 这个就锁住了Num.lock
            synchronized (Num.lock) {
                Num.num += 1;
            }// 释放锁
            this.n.add();
        }
        System.out.println("AddThread end");
    }
}

class DecThread extends Thread {
    private Num n = null;

    public DecThread(Num n) {
        super();
        this.n = n;
    }

    @Override
    public void run() {
        int i = 0;
        for (; i < 100000; i++) {
            synchronized (Num.lock) {
                Num.num -= 1;
            }
            this.n.dec();
        }
        System.out.println("DecThread end");
    }
}

public class HelloWorld {
    public static void main(String[] args) throws Exception {
        Num n = new Num();
        // lambda 写法
        Thread t = new AddThread(n);
        Thread t1 = new DecThread(n);
        System.out.println(Num.num);  // 0
        t.start();
        t1.start();
        t.join();
        t1.join();
        System.out.println(Num.num);  // 0
        n.printNn();  // 0
    }
}

java中的锁是可重入锁,什么意思,就是可以重复获取,获取一次锁计数+1,释放一次锁计数-1,最终释放完毕所有锁就归零

因此如果我们将锁住的信息定义为final标识符,那么他就不能重复获取锁,这样一旦锁的顺序不对,就会产生死锁,如下面的例子

class Num {
    public static int num = 0;
    public static final Object lock = new Object();
    public static final Object lock2 = new Object();

}

class AddThread extends Thread {

    @Override
    public void run() {
        int i = 0;
        for (; i < 100000; i++) {
            // synchronized 这个就锁住了Num.lock
            synchronized (Num.lock2) {
                Num.num += 1;
                synchronized (Num.lock) {
                    Num.num -= 1;
                }
            }// 释放锁
        }
        System.out.println("AddThread end");
    }
}

class DecThread extends Thread {

    @Override
    public void run() {
        int i = 0;
        for (; i < 100000; i++) {
            synchronized (Num.lock) {
                Num.num -= 1;
                synchronized (Num.lock2) {
                    Num.num += 1;
                }
            }
        }
        System.out.println("DecThread end");
    }
}

public class HelloWorld {
    public static void main(String[] args) throws Exception {
        Thread t = new AddThread();
        Thread t1 = new DecThread();
        System.out.println(Num.num);  // 0
        t.start();
        t1.start();
        t.join();
        t1.join();
        System.out.println(Num.num);  // 0
    }
}

t线程先锁住lock然后在去锁住lock2,t1线程先锁住lock2然后在去锁住lock,因为lock都是不可变的,有final标识符,这样t跟t1就产生了冲突,谁也无法获取对方的未释放的锁,产生了死锁,程序进入等待,因此一定要注意加锁的顺序

那么接下来我们需要思考对于队列来讲,如何进行多线程协同呢,也就是一些放任务,一些取任务,我们可能想到取任务时使用poll,但是我们要求必须取到任务,

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;

class Task {
    Queue<String> q = new LinkedList<>();

    public synchronized void add(String s) {
        q.add(s);
    }

    public synchronized String get() throws InterruptedException {
        while (q.isEmpty()) {
            System.out.println("wait me");

        }
        return q.remove();
    }
}

public class HelloWorld {
    public static void main(String[] args) throws Exception {
        Task t = new Task();
        List<Thread> tt = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Thread ts = new Thread(() -> {
                while (true) {
                    String s = null;
                    try {
                        s = t.get();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(s);
                }
            });
            ts.start();
            tt.add(ts);
        }
        Thread add = new Thread() {
            @Override
            public void run() {
                while (true) {
                    for (int i = 0; i < 100; i++) {
                        t.add("task" + i);
                        try {
                            Thread.sleep(10);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        };
        add.start();
    }
}

我看上述代码,这个就是我们之前看的加锁,但是看输出结果确实无限的wait me,这是因为在获取任务时,因为为空就会陷入while的无限循环中,这个这个方法锁住的是this,这个实例,add也就无法进行添加任务,形成了死循环,那么这个就需要wait跟notifyall这两个方法,他们必须在synchorized中使用,wait就是释放当前锁,并休眠,notifyall就是通知所有休眠的进行起来抢锁并执行,上述代码修改为下面这样

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;

class Task {
    Queue<String> q = new LinkedList<>();

    public synchronized void add(String s) {
        q.add(s);
        this.notifyAll();
    }

    public synchronized String get() throws InterruptedException {
        while (q.isEmpty()) {
            this.wait();
            System.out.println("wait me");

        }
        return q.remove();
    }
}

public class HelloWorld {
    public static void main(String[] args) throws Exception {
        Task t = new Task();
        List<Thread> tt = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Thread ts = new Thread(() -> {
                while (true) {
                    String s = null;
                    try {
                        s = t.get();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(s);
                }
            });
            ts.start();
            tt.add(ts);
        }
        Thread add = new Thread() {
            @Override
            public void run() {
                while (true) {
                    for (int i = 0; i < 100; i++) {
                        t.add("task" + i);
                        try {
                            Thread.sleep(10);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        };
        add.start();
    }
}

增加两行代码,就实现了我们的功能。

synchronized锁在锁比较多的情况下容易造成死锁,而且在想要获取锁的时候必须等待,没有任何尝试机制,接下来我们看一下另外一种锁的ReentrantLock

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class Task {
    Queue<String> q = new LinkedList<>();
    private final Lock lock = new ReentrantLock();  // 实例化锁
    private final Condition condition = lock.newCondition();  // 通过该锁,实例化一个与之匹配的condition

    public void add(String s) throws InterruptedException {
        // 这个就是尝试获取锁,如果1s获取不到就不再等待,跳过这里面的代码块
        if (lock.tryLock(1, TimeUnit.SECONDS)) {
            try {
                q.add(s);
                condition.signalAll();  // 等同于notifyall
            } finally {
                lock.unlock();
            }
        }

    }

    public String get() throws InterruptedException {
        // 普通的锁,会等待
        lock.lock();
        try {
            while (q.isEmpty()) {
                condition.await();  // 等同于wait
            }
            return q.remove();
        } finally {
            lock.unlock();
        }
    }
}

public class HelloWorld {
    public static void main(String[] args) throws Exception {
        Task t = new Task();
        List<Thread> tt = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Thread ts = new Thread(() -> {
                while (true) {
                    String s = null;
                    try {
                        s = t.get();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(s);
                }
            });
            ts.start();
            tt.add(ts);
        }
        Thread add = new Thread() {
            @Override
            public void run() {
                while (true) {
                    for (int i = 0; i < 100; i++) {
                        try {
                            t.add("task" + i);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        try {
                            Thread.sleep(10);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        };
        add.start();
    }
}

加下来看一下读写锁,上面的例子其实所有的读操作也会被锁,但对于读操作来讲,不影响我们的数据,我们希望在进行写的时候锁住,不写的时候,读没有限制,因此可以使用读写锁

import java.util.*;
import java.util.concurrent.locks.*;

class Task {
    Queue<String> q = new LinkedList<>();
    private final ReadWriteLock lock = new ReentrantReadWriteLock();  // 实例化读写锁
    private final Lock rlock = lock.readLock();  // 实例化读锁
    private final Lock wlock = lock.writeLock(); // 实例化写锁
    private int[] n = new int[100];

    public void add(int i) throws InterruptedException {
        // 写入锁
        wlock.lock();
        try {
            n[i] += 1;
        } finally {
            wlock.unlock();
        }

    }

    public int[] get() throws InterruptedException {
        // 读锁
        rlock.lock();
        try {

            return Arrays.copyOf(n, n.length);
        } finally {
            rlock.unlock();
        }
    }
}

public class HelloWorld {
    public static void main(String[] args) throws Exception {
        Task t = new Task();
        List<Thread> tt = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Thread ts = new Thread(() -> {
                while (true) {
                    int[] s = null;
                    try {
                        s = t.get();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(s);
                }
            });
            ts.start();
            tt.add(ts);
        }
        Thread add = new Thread() {
            @Override
            public void run() {
                while (true) {
                    for (int i = 0; i < 100; i++) {
                        try {
                            t.add(i);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        try {
                            Thread.sleep(10);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        };
        add.start();
    }
}

上面的读写锁是一种悲观锁,比如说,如果换成队列,也就是说,写的时候,读锁必须全部释放完毕,否则无法进行下一步操作,可以使用之前队列的例子试试,下面看一下乐观锁

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.*;

class Task {
    private final StampedLock lock = new StampedLock();  // 实例化锁
    private Double s = 0.00;

    public void add(Double d) {
        // 写入锁
        long stamp = lock.writeLock();
        try {
            s += d;
        } finally {
            lock.unlockWrite(stamp);
        }
    }

    public Double get() {
        // 获取乐观锁
        long stamp = lock.tryOptimisticRead();
        Double d = s;  // double 赋值非原子级操作
        System.out.println("du");
        // 检查当前所是否是最新的
        if (!lock.validate(stamp)) {
            System.out.println("not new");
            // 如果不是则获取悲观锁
            stamp = lock.readLock();
            try {
                d = s;
            } finally {
                lock.unlockRead(stamp);
            }
        }
        return d;
    }
}

public class HelloWorld {
    public static void main(String[] args) throws Exception {
        Task t = new Task();
        List<Thread> tt = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Thread ts = new Thread(() -> {
                while (true) {
                    Double s;
                    s = t.get();
                    System.out.println(s);
                }
            });
            ts.start();
            tt.add(ts);
        }
        Thread add = new Thread() {
            @Override
            public void run() {
                while (true) {
                    for (int i = 0; i < 100; i++) {
                        t.add(1.0000001);
                        try {
                            Thread.sleep(10000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        };
        add.start();
    }
}

说完线程,那接下来我们看一下线程池

import java.util.ArrayList;
import java.util.concurrent.*;

class Task implements Callable<String> {
    public String call() throws Exception {
        Thread.sleep(10000);
        return "mmm";
    }
}

class Task1 implements Runnable {
    private int i;

    public Task1(int d) {
        this.i = d;
    }

    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("end" + this.i);
    }
}

class MyThread extends Thread {
    private Future<String> f;

    public MyThread(Future<String> f) {
        super();
        this.f = f;
    }

    @Override
    public void run() {
        try {
            System.out.println(f.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

public class HelloWorld {
    public static void main(String[] args) throws Exception {
        ExecutorService es = Executors.newFixedThreadPool(4);  // 固定大小的线程池
        // <T> Future<T> submit(Callable<T> task); 这个方法是往线程池中塞任务,看到接受的是Callable类型,返回的是Future类型,那就按这个借口来定义

        Future<String> f = es.submit(new Task());  // 获取一个未来产生的Future对象
        try {
            String m = f.get(1, TimeUnit.SECONDS);  // 获取结果只等待指定时间,如果没有等到,会报出下面的错误
            System.out.println(m);
        } catch (Exception e) {
            System.out.println(e);  // java.util.concurrent.TimeoutException
        } finally {
            System.out.println("not recv");
        }
        System.out.println(f.isDone());  // 判断是否完成
        System.out.println(f.get());  // 等待获取结果,会一直阻塞
        es.shutdown();  // 关闭线程池

        ExecutorService es1 = Executors.newCachedThreadPool();  // 动态变化的线程池

        var f1 = new ArrayList<Future<String>>();
        for (int i = 0; i < 10; i++) {
            Future<String> ff = es1.submit(new Task());  // 在这里尽量不要用定值的线程池,如果超出线程池大小,会报错
            f1.add(ff);
        }  // 十个 mmm 同时打印
        for (Future<String> ffff : f1) {
            var mt = new MyThread(ffff);
            mt.start();
            mt.join();
        }
        Thread.sleep(10100);  // 记得晚点关闭线程池
        es1.shutdown();
        ExecutorService es2 = Executors.newFixedThreadPool(2);  // 固定大小的线程池
        for (int i = 0; i < 6; i++) {
            es2.submit(new Task1(i));
        }  // 两个两个的打印
        Thread.sleep(10000);
        es2.shutdown();

        // 定时反复执行任务的线程池
        ScheduledExecutorService es3 = Executors.newScheduledThreadPool(4);
        es3.schedule(new Task1(1), 4, TimeUnit.SECONDS);  // 4 s后执行这个任务
        es3.scheduleAtFixedRate(new Task1(2), 2, 1, TimeUnit.SECONDS);  // 两秒后,每隔1秒执行一次任务
        es3.scheduleWithFixedDelay(new Task1(3), 2, 1, TimeUnit.SECONDS);  // 两秒后,上一次任务结束,隔1秒执行一次任务
        Thread.sleep(10000);
        es3.shutdown();

    }
}

使用feature总是得用get等待,或者轮询看看是否isDone,来看一下自动调用回掉对象

import java.util.concurrent.*;

public class HelloWorld {
    public static void main(String[] args) throws Exception {
        // 这个是串行
        CompletableFuture<String> f = CompletableFuture.supplyAsync(HelloWorld::call);  // 第一个任务
        // 上面任务完成后执行这个任务,s是上一个的返回值
        CompletableFuture<String> f1 = f.thenApplyAsync((s) -> {
            System.out.println(s);  // mmm
            return HelloWorld.call();
        });
        // f1成功之后打印
        f1.thenAccept((res) -> {
            System.out.println(res);  // mmm
        });
        // f1失败打印
        f1.exceptionally(e -> {
            e.printStackTrace();
            return null;
        });
        Thread.sleep(4000);
        System.out.println("开始并行");
        // 接下来看一下并行的
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
            return call1(2);
        });
        CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> {
            return call1(3);
        });
        CompletableFuture<String> f4 = CompletableFuture.supplyAsync(() -> {
            return call1(4);
        });
        CompletableFuture<String> f5 = CompletableFuture.supplyAsync(() -> {
            return call1(5);
        });
        CompletableFuture<Object> ff = CompletableFuture.anyOf(f2, f3, f4, f5);
        f2.thenAccept(System.out::println);
        f3.thenAccept(System.out::println);
        f4.thenAccept(System.out::println);
        f5.thenAccept(System.out::println);
        ff.thenAccept(System.out::println);  // 感觉默认的线程应该是3个,因为第四个总感觉执行的慢一点,ff就是谁的快接受那个线程

        Thread.sleep(4000);

    }

    static String call() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "mmm";
    }

    static String call1(int n) {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "mmm" + n;
    }
}

相关推荐