jling 2019-11-17
1.3.1 一般开启线程的操作如下所示
new Thread(new Runnable() { @Override public void run() { //做一些任务 } }).start();
使用线程池管理线程优点
大概的流程图如下
文字描述如下
代码如下所示
public class App extends Application{ private static App instance; private PoolThread executor; public static synchronized App getInstance() { if (null == instance) { instance = new App(); } return instance; } public App(){} @Override public void onCreate() { super.onCreate(); instance = this; //初始化线程池管理器 initThreadPool(); } /**
*/ private void initThreadPool() { // 创建一个独立的实例进行使用 executor = PoolThread.ThreadBuilder .createFixed(5) .setPriority(Thread.MAX_PRIORITY) .setCallback(new LogCallback()) .build(); } /** * 获取线程池管理器对象,统一的管理器维护所有的线程池 * @return executor对象 */ public PoolThread getExecutor(){ return executor; } } //自定义回调监听callback,可以全局设置,也可以单独设置。都行 public class LogCallback implements ThreadCallback { private final String TAG = "LogCallback"; @Override public void onError(String name, Throwable t) { Log.e(TAG, "LogCallback"+"------onError"+"-----"+name+"----"+Thread.currentThread()+"----"+t.getMessage()); } @Override public void onCompleted(String name) { Log.e(TAG, "LogCallback"+"------onCompleted"+"-----"+name+"----"+Thread.currentThread()); } @Override public void onStart(String name) { Log.e(TAG, "LogCallback"+"------onStart"+"-----"+name+"----"+Thread.currentThread()); } } ```
关于设置callback回调监听,我这里在app初始化的时候设置了全局的logCallBack,所以这里没有添加,对于每个单独的执行任务,可以添加独立callback。
PoolThread executor = App.getInstance().getExecutor(); executor.setName("最简单的线程调用方式"); executor.setDeliver(new AndroidDeliver()); executor.execute(new Runnable() { @Override public void run() { Log.e("MainActivity","最简单的线程调用方式"); } });
如下所示
PoolThread executor = App.getInstance().getExecutor(); executor.setName("异步回调"); executor.setDelay(2,TimeUnit.MILLISECONDS); // 启动异步任务 executor.async(new Callable<Login>(){ @Override public Login call() throws Exception { // 做一些操作 return null; } }, new AsyncCallback<Login>() { @Override public void onSuccess(Login user) { Log.e("AsyncCallback","成功"); } @Override public void onFailed(Throwable t) { Log.e("AsyncCallback","失败"); } @Override public void onStart(String threadName) { Log.e("AsyncCallback","开始"); } });
4.1.1 首先看看Runnable和Callable接口代码
public interface Runnable { public void run(); } public interface Callable<V> { V call() throws Exception; }
4.1.4 自定义Runnable包装类,重点看run方法代码逻辑
public final class RunnableWrapper implements Runnable { private String name; private CallbackDelegate delegate; private Runnable runnable; private Callable callable; public RunnableWrapper(ThreadConfigs configs) { this.name = configs.name; this.delegate = new CallbackDelegate(configs.callback, configs.deliver, configs.asyncCallback); } /** * 启动异步任务,普通的 * @param runnable runnable
*/ public RunnableWrapper setRunnable(Runnable runnable) { this.runnable = runnable; return this; } /** * 异步任务,回调用于接收可调用任务的结果 * @param callable callable * @return 对象 */ public RunnableWrapper setCallable(Callable callable) { this.callable = callable; return this; } /** * 自定义xxRunnable继承Runnable,实现run方法 * 详细可以看我的GitHub:https://github.com/yangchong211 */ @Override public void run() { Thread current = Thread.currentThread(); ThreadToolUtils.resetThread(current, name, delegate); //开始 delegate.onStart(name); //注意需要判断runnable,callable非空 // avoid NullPointException if (runnable != null) { runnable.run(); } else if (callable != null) { try { Object result = callable.call(); //监听成功 delegate.onSuccess(result); } catch (Exception e) { //监听异常 delegate.onError(name, e); } } //监听完成 delegate.onCompleted(name); } } ```
4.1.5 自定义Callable<T>包装类,重点看call方法代码逻辑
public final class CallableWrapper<T> implements Callable<T> { private String name; private ThreadCallback callback; private Callable<T> proxy; /** * 构造方法 * @param configs thread配置,主要参数有:线程name,延迟time,回调callback,异步callback
*/ public CallableWrapper(ThreadConfigs configs, Callable<T> proxy) { this.name = configs.name; this.proxy = proxy; this.callback = new CallbackDelegate(configs.callback, configs.deliver, configs.asyncCallback); } /** * 详细可以看我的GitHub:https://github.com/yangchong211 * 自定义Callable继承Callable<T>类,Callable 是在 JDK1.5 增加的。 * Callable 的 call() 方法可以返回值和抛出异常 * @return 泛型 * @throws Exception 异常 */ @Override public T call() { ThreadToolUtils.resetThread(Thread.currentThread(),name,callback); if (callback != null) { //开始 callback.onStart(name); } T t = null; try { t = proxy == null ? null : proxy.call(); } catch (Exception e) { e.printStackTrace(); //异常错误 if(callback!=null){ callback.onError(name,e); } }finally { //完成 if (callback != null) { callback.onCompleted(name); } } return t; } } ```
4.1 AsyncCallback类代码如下所示
/** * <pre> * @author 杨充 * blog https://www.jianshu.com/p/53017c3fc75d * time * desc 异步callback回调接口 * revise * GitHub https://github.com/yangchong211
*/ public interface AsyncCallback<T> { /** * 成功时调用 * @param t 泛型 */ void onSuccess(T t); /** * 异常时调用 * @param t 异常 */ void onFailed(Throwable t); /** * 通知用户任务开始运行 * @param threadName 正在运行线程的名字 */ void onStart(String threadName); } ```
4.2 ThreadCallback类代码如下所示
/** * <pre> * @author: yangchong * blog : https://github.com/yangchong211 * time : * desc : 一个回调接口,用于通知用户任务的状态回调委托类 * 线程的名字可以自定义 * revise:
*/ public interface ThreadCallback { /** * 当线程发生错误时,将调用此方法。 * @param threadName 正在运行线程的名字 * @param t 异常 */ void onError(String threadName, Throwable t); /** * 通知用户知道它已经完成 * @param threadName 正在运行线程的名字 */ void onCompleted(String threadName); /** * 通知用户任务开始运行 * @param threadName 正在运行线程的名字 */ void onStart(String threadName); } ```
为什么要添加配置文件,配置文件的作用主要是存储当前任务的某些配置,比如线程的名称,回调callback等等这些参数。还可以用于参数的传递!
public final class ThreadConfigs { /** * 线程的名称
*/ public String name; /** * 线程执行延迟的时间 * 通过setDelay方法设置 */ public long delay; /** * 线程执行者 * JAVA或者ANDROID */ public Executor deliver; /** * 用户任务的状态回调callback */ public ThreadCallback callback; /** * 异步callback回调callback */ public AsyncCallback asyncCallback; } ```
4.4.3 如何判断环境是java环境还是Android环境呢
public final class ThreadToolUtils { /**
*/ public static boolean isAndroid; /* * 静态代码块 * 判断是否是android环境 * Class.forName(xxx.xx.xx) 返回的是一个类对象 * 首先要明白在java里面任何class都要装载在虚拟机上才能运行。 */ static { try { Class.forName("android.os.Build"); isAndroid = true; } catch (Exception e) { isAndroid = false; } } } ```
4.5.1 继承Executor接口,并且实现execute方法
public final class PoolThread implements Executor{ /** * 启动任务 * 这个是实现接口Executor中的execute方法 * 提交任务无返回值
*/ @Override public void execute (@NonNull Runnable runnable) { //获取线程thread配置信息 ThreadConfigs configs = getLocalConfigs(); //设置runnable任务 runnable = new RunnableWrapper(configs).setRunnable(runnable); //启动任务 DelayTaskDispatcher.get().postDelay(configs.delay, pool, runnable); //重置线程Thread配置 resetLocalConfigs(); } /** * 当启动任务或者发射任务之后需要调用该方法 * 重置本地配置,置null */ private synchronized void resetLocalConfigs() { local.set(null); } /** * 注意需要用synchronized修饰,解决了多线程的安全问题 * 获取本地配置参数 * @return */ private synchronized ThreadConfigs getLocalConfigs() { ThreadConfigs configs = local.get(); if (configs == null) { configs = new ThreadConfigs(); configs.name = defName; configs.callback = defCallback; configs.deliver = defDeliver; local.set(configs); } return configs; } } ```
直接列出代码,如下所示:
public final class PoolThread implements Executor{ //省略部分代码…… public static class ThreadBuilder { final static int TYPE_CACHE = 0; final static int TYPE_FIXED = 1; final static int TYPE_SINGLE = 2; final static int TYPE_SCHEDULED = 3; int type; int size; int priority = Thread.NORM_PRIORITY; String name; ThreadCallback callback; Executor deliver; ExecutorService pool; private ThreadBuilder(int size, int type, ExecutorService pool) { this.size = Math.max(1, size); this.type = type; this.pool = pool; } /** * 通过Executors.newSingleThreadExecutor()创建线程池
*/ public static ThreadBuilder create(ExecutorService pool) { return new ThreadBuilder(1, TYPE_SINGLE, pool); } /** * 通过Executors.newCachedThreadPool()创建线程池 * 它是一个数量无限多的线程池,都是非核心线程,适合执行大量耗时小的任务 */ public static ThreadBuilder createCacheable() { return new ThreadBuilder(0, TYPE_CACHE, null); } /** * 通过Executors.newFixedThreadPool()创建线程池 * 线程数量固定的线程池,全部为核心线程,响应较快,不用担心线程会被回收。 */ public static ThreadBuilder createFixed(int size) { return new ThreadBuilder(size, TYPE_FIXED, null); } /** * 通过Executors.newScheduledThreadPool()创建线程池 * 有数量固定的核心线程,且有数量无限多的非核心线程,适合用于执行定时任务和固定周期的重复任务 */ public static ThreadBuilder createScheduled(int size) { return new ThreadBuilder(size, TYPE_SCHEDULED, null); } /** * 通过Executors.newSingleThreadPool()创建线程池 * 内部只有一个核心线程,所有任务进来都要排队按顺序执行 * 和create区别是size数量 */ public static ThreadBuilder createSingle() { return new ThreadBuilder(0, TYPE_SINGLE, null); } /** * 将默认线程名设置为“已使用”。 */ public ThreadBuilder setName (@NonNull String name) { if (name.length()>0) { this.name = name; } return this; } /** * 将默认线程优先级设置为“已使用”。 */ public ThreadBuilder setPriority (int priority) { this.priority = priority; return this; } /** * 将默认线程回调设置为“已使用”。 */ public ThreadBuilder setCallback (ThreadCallback callback) { this.callback = callback; return this; } /** * 设置默认线程交付使用 */ public ThreadBuilder setDeliver(Executor deliver) { this.deliver = deliver; return this; } /** * 创建用于某些配置的线程管理器。 * @return 对象 */ public PoolThread build () { //最大值 priority = Math.max(Thread.MIN_PRIORITY, priority); //最小值 priority = Math.min(Thread.MAX_PRIORITY, priority); size = Math.max(1, size); if (name==null || name.length()==0) { // 如果没有设置名字,那么就使用下面默认的线程名称 switch (type) { case TYPE_CACHE: name = "CACHE"; break; case TYPE_FIXED: name = "FIXED"; break; case TYPE_SINGLE: name = "SINGLE"; break; default: name = "POOL_THREAD"; break; } } if (deliver == null) { if (ThreadToolUtils.isAndroid) { deliver = AndroidDeliver.getInstance(); } else { deliver = JavaDeliver.getInstance(); } } return new PoolThread(type, size, priority, name, callback, deliver, pool); } } } ```
4.6.2 添加设置thread配置信息的方法
/** * 为当前的任务设置线程名。 * @param name 线程名字
*/ public PoolThread setName(String name) { getLocalConfigs().name = name; return this; } /** * 设置当前任务的线程回调,如果未设置,则应使用默认回调。 * @param callback 线程回调 * @return PoolThread */ public PoolThread setCallback (ThreadCallback callback) { getLocalConfigs().callback = callback; return this; } /** * 设置当前任务的延迟时间. * 只有当您的线程池创建时,它才会产生效果。 * @param time 时长 * @param unit time unit * @return PoolThread */ public PoolThread setDelay (long time, TimeUnit unit) { long delay = unit.toMillis(time); getLocalConfigs().delay = Math.max(0, delay); return this; } /** * 设置当前任务的线程传递。如果未设置,则应使用默认传递。 * @param deliver thread deliver * @return PoolThread */ public PoolThread setDeliver(Executor deliver){ getLocalConfigs().deliver = deliver; return this; } ```
通过Executors的工厂方法来创建线程池极其简便,其实它的内部还是通过new ThreadPoolExecutor(…)的方式创建线程池的,具体可以看看源码,这里省略呢……
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5); ExecutorService singleThreadPool = Executors.newSingleThreadExecutor(); ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); ScheduledExecutorService singleThreadScheduledPool = Executors.newSingleThreadScheduledExecutor();
4.7.2.1 创建不同类型线程池代码如下所示:
/** * 创建线程池,目前支持以下四种 * @param type 类型 * @param size 数量size * @param priority 优先级
*/ private ExecutorService createPool(int type, int size, int priority) { switch (type) { case Builder.TYPE_CACHE: //它是一个数量无限多的线程池,都是非核心线程,适合执行大量耗时小的任务 return Executors.newCachedThreadPool(new DefaultFactory(priority)); case Builder.TYPE_FIXED: //线程数量固定的线程池,全部为核心线程,响应较快,不用担心线程会被回收。 return Executors.newFixedThreadPool(size, new DefaultFactory(priority)); case Builder.TYPE_SCHEDULED: //有数量固定的核心线程,且有数量无限多的非核心线程,适合用于执行定时任务和固定周期的重复任务 return Executors.newScheduledThreadPool(size, new DefaultFactory(priority)); case Builder.TYPE_SINGLE: default: //内部只有一个核心线程,所有任务进来都要排队按顺序执行 return Executors.newSingleThreadExecutor(new DefaultFactory(priority)); } } ```
通过有道词典对这个类的说明进行翻译是:根据需要创建新线程的对象。使用线程工厂可以消除对{@link Thread#Thread(Runnable)新线程}的硬连接,从而使应用程序能够使用特殊的线程子类、优先级等。
public interface ThreadFactory { /** * Constructs a new {@code Thread}. Implementations may also initialize
* * @param r a runnable to be executed by new thread instance * @return constructed thread, or {@code null} if the request to * create a thread is rejected */ Thread newThread(Runnable r); } ```
代码如下所示
public class MyThreadFactory implements ThreadFactory { private int priority; public MyThreadFactory(int priority) { this.priority = priority; } @Override public Thread newThread(@NonNull Runnable runnable) { Thread thread = new Thread(runnable); thread.setPriority(priority); return thread; } }
具体逻辑看DelayTaskExecutor中的postDelay方法
/** * 启动 * @param delay 延迟执行的时间,注意默认单位是TimeUnit.MILLISECONDS * @param pool pool线程池
*/ void postDelay(long delay, final ExecutorService pool, final Runnable task) { if (delay == 0) { //如果时间是0,那么普通开启 pool.execute(task); return; } //延时操作 dispatcher.schedule(new Runnable() { @Override public void run() { //在将来的某个时间执行给定的命令。该命令可以在新线程、池线程或调用线程中执行 pool.execute(task); } }, delay, TimeUnit.MILLISECONDS); } ```
代码如下所示
/**
*/ public void stop(){ try { // shutdown只是起到通知的作用 // 只调用shutdown方法结束线程池是不够的 pool.shutdown(); // (所有的任务都结束的时候,返回TRUE) if(!pool.awaitTermination(0, TimeUnit.MILLISECONDS)){ // 超时的时候向线程池中所有的线程发出中断(interrupted)。 pool.shutdownNow(); } } catch (InterruptedException e) { // awaitTermination方法被中断的时候也中止线程池中全部的线程的执行。 e.printStackTrace(); } finally { pool.shutdownNow(); } } ```