jling 2019-11-17
1.3.1 一般开启线程的操作如下所示
new Thread(new Runnable() {
@Override
public void run() {
//做一些任务
}
}).start();使用线程池管理线程优点
大概的流程图如下

文字描述如下
代码如下所示
public class App extends Application{
private static App instance;
private PoolThread executor;
public static synchronized App getInstance() {
if (null == instance) {
instance = new App();
}
return instance;
}
public App(){}
@Override
public void onCreate() {
super.onCreate();
instance = this;
//初始化线程池管理器
initThreadPool();
}
/***/
private void initThreadPool() {
// 创建一个独立的实例进行使用
executor = PoolThread.ThreadBuilder
.createFixed(5)
.setPriority(Thread.MAX_PRIORITY)
.setCallback(new LogCallback())
.build();
}
/**
* 获取线程池管理器对象,统一的管理器维护所有的线程池
* @return executor对象
*/
public PoolThread getExecutor(){
return executor;
}
}
//自定义回调监听callback,可以全局设置,也可以单独设置。都行
public class LogCallback implements ThreadCallback {
private final String TAG = "LogCallback";
@Override
public void onError(String name, Throwable t) {
Log.e(TAG, "LogCallback"+"------onError"+"-----"+name+"----"+Thread.currentThread()+"----"+t.getMessage());
}
@Override
public void onCompleted(String name) {
Log.e(TAG, "LogCallback"+"------onCompleted"+"-----"+name+"----"+Thread.currentThread());
}
@Override
public void onStart(String name) {
Log.e(TAG, "LogCallback"+"------onStart"+"-----"+name+"----"+Thread.currentThread());
}
}
```关于设置callback回调监听,我这里在app初始化的时候设置了全局的logCallBack,所以这里没有添加,对于每个单独的执行任务,可以添加独立callback。
PoolThread executor = App.getInstance().getExecutor();
executor.setName("最简单的线程调用方式");
executor.setDeliver(new AndroidDeliver());
executor.execute(new Runnable() {
@Override
public void run() {
Log.e("MainActivity","最简单的线程调用方式");
}
});如下所示
PoolThread executor = App.getInstance().getExecutor();
executor.setName("异步回调");
executor.setDelay(2,TimeUnit.MILLISECONDS);
// 启动异步任务
executor.async(new Callable<Login>(){
@Override
public Login call() throws Exception {
// 做一些操作
return null;
}
}, new AsyncCallback<Login>() {
@Override
public void onSuccess(Login user) {
Log.e("AsyncCallback","成功");
}
@Override
public void onFailed(Throwable t) {
Log.e("AsyncCallback","失败");
}
@Override
public void onStart(String threadName) {
Log.e("AsyncCallback","开始");
}
});4.1.1 首先看看Runnable和Callable接口代码
public interface Runnable {
public void run();
}
public interface Callable<V> {
V call() throws Exception;
}4.1.4 自定义Runnable包装类,重点看run方法代码逻辑
public final class RunnableWrapper implements Runnable {
private String name;
private CallbackDelegate delegate;
private Runnable runnable;
private Callable callable;
public RunnableWrapper(ThreadConfigs configs) {
this.name = configs.name;
this.delegate = new CallbackDelegate(configs.callback, configs.deliver, configs.asyncCallback);
}
/**
* 启动异步任务,普通的
* @param runnable runnable*/
public RunnableWrapper setRunnable(Runnable runnable) {
this.runnable = runnable;
return this;
}
/**
* 异步任务,回调用于接收可调用任务的结果
* @param callable callable
* @return 对象
*/
public RunnableWrapper setCallable(Callable callable) {
this.callable = callable;
return this;
}
/**
* 自定义xxRunnable继承Runnable,实现run方法
* 详细可以看我的GitHub:https://github.com/yangchong211
*/
@Override
public void run() {
Thread current = Thread.currentThread();
ThreadToolUtils.resetThread(current, name, delegate);
//开始
delegate.onStart(name);
//注意需要判断runnable,callable非空
// avoid NullPointException
if (runnable != null) {
runnable.run();
} else if (callable != null) {
try {
Object result = callable.call();
//监听成功
delegate.onSuccess(result);
} catch (Exception e) {
//监听异常
delegate.onError(name, e);
}
}
//监听完成
delegate.onCompleted(name);
}
}
```4.1.5 自定义Callable<T>包装类,重点看call方法代码逻辑
public final class CallableWrapper<T> implements Callable<T> {
private String name;
private ThreadCallback callback;
private Callable<T> proxy;
/**
* 构造方法
* @param configs thread配置,主要参数有:线程name,延迟time,回调callback,异步callback*/
public CallableWrapper(ThreadConfigs configs, Callable<T> proxy) {
this.name = configs.name;
this.proxy = proxy;
this.callback = new CallbackDelegate(configs.callback, configs.deliver, configs.asyncCallback);
}
/**
* 详细可以看我的GitHub:https://github.com/yangchong211
* 自定义Callable继承Callable<T>类,Callable 是在 JDK1.5 增加的。
* Callable 的 call() 方法可以返回值和抛出异常
* @return 泛型
* @throws Exception 异常
*/
@Override
public T call() {
ThreadToolUtils.resetThread(Thread.currentThread(),name,callback);
if (callback != null) {
//开始
callback.onStart(name);
}
T t = null;
try {
t = proxy == null ? null : proxy.call();
} catch (Exception e) {
e.printStackTrace();
//异常错误
if(callback!=null){
callback.onError(name,e);
}
}finally {
//完成
if (callback != null) {
callback.onCompleted(name);
}
}
return t;
}
}
```4.1 AsyncCallback类代码如下所示
/** * <pre> * @author 杨充 * blog https://www.jianshu.com/p/53017c3fc75d * time * desc 异步callback回调接口 * revise * GitHub https://github.com/yangchong211
*/
public interface AsyncCallback<T> {
/**
* 成功时调用
* @param t 泛型
*/
void onSuccess(T t);
/**
* 异常时调用
* @param t 异常
*/
void onFailed(Throwable t);
/**
* 通知用户任务开始运行
* @param threadName 正在运行线程的名字
*/
void onStart(String threadName);
}
```4.2 ThreadCallback类代码如下所示
/** * <pre> * @author: yangchong * blog : https://github.com/yangchong211 * time : * desc : 一个回调接口,用于通知用户任务的状态回调委托类 * 线程的名字可以自定义 * revise:
*/
public interface ThreadCallback {
/**
* 当线程发生错误时,将调用此方法。
* @param threadName 正在运行线程的名字
* @param t 异常
*/
void onError(String threadName, Throwable t);
/**
* 通知用户知道它已经完成
* @param threadName 正在运行线程的名字
*/
void onCompleted(String threadName);
/**
* 通知用户任务开始运行
* @param threadName 正在运行线程的名字
*/
void onStart(String threadName);
}
```为什么要添加配置文件,配置文件的作用主要是存储当前任务的某些配置,比如线程的名称,回调callback等等这些参数。还可以用于参数的传递!
public final class ThreadConfigs {
/**
* 线程的名称*/
public String name;
/**
* 线程执行延迟的时间
* 通过setDelay方法设置
*/
public long delay;
/**
* 线程执行者
* JAVA或者ANDROID
*/
public Executor deliver;
/**
* 用户任务的状态回调callback
*/
public ThreadCallback callback;
/**
* 异步callback回调callback
*/
public AsyncCallback asyncCallback;
}
```

4.4.3 如何判断环境是java环境还是Android环境呢
public final class ThreadToolUtils {
/***/
public static boolean isAndroid;
/*
* 静态代码块
* 判断是否是android环境
* Class.forName(xxx.xx.xx) 返回的是一个类对象
* 首先要明白在java里面任何class都要装载在虚拟机上才能运行。
*/
static {
try {
Class.forName("android.os.Build");
isAndroid = true;
} catch (Exception e) {
isAndroid = false;
}
}
}
```4.5.1 继承Executor接口,并且实现execute方法
public final class PoolThread implements Executor{
/**
* 启动任务
* 这个是实现接口Executor中的execute方法
* 提交任务无返回值*/
@Override
public void execute (@NonNull Runnable runnable) {
//获取线程thread配置信息
ThreadConfigs configs = getLocalConfigs();
//设置runnable任务
runnable = new RunnableWrapper(configs).setRunnable(runnable);
//启动任务
DelayTaskDispatcher.get().postDelay(configs.delay, pool, runnable);
//重置线程Thread配置
resetLocalConfigs();
}
/**
* 当启动任务或者发射任务之后需要调用该方法
* 重置本地配置,置null
*/
private synchronized void resetLocalConfigs() {
local.set(null);
}
/**
* 注意需要用synchronized修饰,解决了多线程的安全问题
* 获取本地配置参数
* @return
*/
private synchronized ThreadConfigs getLocalConfigs() {
ThreadConfigs configs = local.get();
if (configs == null) {
configs = new ThreadConfigs();
configs.name = defName;
configs.callback = defCallback;
configs.deliver = defDeliver;
local.set(configs);
}
return configs;
}
}
```直接列出代码,如下所示:
public final class PoolThread implements Executor{
//省略部分代码……
public static class ThreadBuilder {
final static int TYPE_CACHE = 0;
final static int TYPE_FIXED = 1;
final static int TYPE_SINGLE = 2;
final static int TYPE_SCHEDULED = 3;
int type;
int size;
int priority = Thread.NORM_PRIORITY;
String name;
ThreadCallback callback;
Executor deliver;
ExecutorService pool;
private ThreadBuilder(int size, int type, ExecutorService pool) {
this.size = Math.max(1, size);
this.type = type;
this.pool = pool;
}
/**
* 通过Executors.newSingleThreadExecutor()创建线程池*/
public static ThreadBuilder create(ExecutorService pool) {
return new ThreadBuilder(1, TYPE_SINGLE, pool);
}
/**
* 通过Executors.newCachedThreadPool()创建线程池
* 它是一个数量无限多的线程池,都是非核心线程,适合执行大量耗时小的任务
*/
public static ThreadBuilder createCacheable() {
return new ThreadBuilder(0, TYPE_CACHE, null);
}
/**
* 通过Executors.newFixedThreadPool()创建线程池
* 线程数量固定的线程池,全部为核心线程,响应较快,不用担心线程会被回收。
*/
public static ThreadBuilder createFixed(int size) {
return new ThreadBuilder(size, TYPE_FIXED, null);
}
/**
* 通过Executors.newScheduledThreadPool()创建线程池
* 有数量固定的核心线程,且有数量无限多的非核心线程,适合用于执行定时任务和固定周期的重复任务
*/
public static ThreadBuilder createScheduled(int size) {
return new ThreadBuilder(size, TYPE_SCHEDULED, null);
}
/**
* 通过Executors.newSingleThreadPool()创建线程池
* 内部只有一个核心线程,所有任务进来都要排队按顺序执行
* 和create区别是size数量
*/
public static ThreadBuilder createSingle() {
return new ThreadBuilder(0, TYPE_SINGLE, null);
}
/**
* 将默认线程名设置为“已使用”。
*/
public ThreadBuilder setName (@NonNull String name) {
if (name.length()>0) {
this.name = name;
}
return this;
}
/**
* 将默认线程优先级设置为“已使用”。
*/
public ThreadBuilder setPriority (int priority) {
this.priority = priority;
return this;
}
/**
* 将默认线程回调设置为“已使用”。
*/
public ThreadBuilder setCallback (ThreadCallback callback) {
this.callback = callback;
return this;
}
/**
* 设置默认线程交付使用
*/
public ThreadBuilder setDeliver(Executor deliver) {
this.deliver = deliver;
return this;
}
/**
* 创建用于某些配置的线程管理器。
* @return 对象
*/
public PoolThread build () {
//最大值
priority = Math.max(Thread.MIN_PRIORITY, priority);
//最小值
priority = Math.min(Thread.MAX_PRIORITY, priority);
size = Math.max(1, size);
if (name==null || name.length()==0) {
// 如果没有设置名字,那么就使用下面默认的线程名称
switch (type) {
case TYPE_CACHE:
name = "CACHE";
break;
case TYPE_FIXED:
name = "FIXED";
break;
case TYPE_SINGLE:
name = "SINGLE";
break;
default:
name = "POOL_THREAD";
break;
}
}
if (deliver == null) {
if (ThreadToolUtils.isAndroid) {
deliver = AndroidDeliver.getInstance();
} else {
deliver = JavaDeliver.getInstance();
}
}
return new PoolThread(type, size, priority, name, callback, deliver, pool);
}
}
}
```4.6.2 添加设置thread配置信息的方法
/** * 为当前的任务设置线程名。 * @param name 线程名字
*/
public PoolThread setName(String name) {
getLocalConfigs().name = name;
return this;
}
/**
* 设置当前任务的线程回调,如果未设置,则应使用默认回调。
* @param callback 线程回调
* @return PoolThread
*/
public PoolThread setCallback (ThreadCallback callback) {
getLocalConfigs().callback = callback;
return this;
}
/**
* 设置当前任务的延迟时间.
* 只有当您的线程池创建时,它才会产生效果。
* @param time 时长
* @param unit time unit
* @return PoolThread
*/
public PoolThread setDelay (long time, TimeUnit unit) {
long delay = unit.toMillis(time);
getLocalConfigs().delay = Math.max(0, delay);
return this;
}
/**
* 设置当前任务的线程传递。如果未设置,则应使用默认传递。
* @param deliver thread deliver
* @return PoolThread
*/
public PoolThread setDeliver(Executor deliver){
getLocalConfigs().deliver = deliver;
return this;
}
```通过Executors的工厂方法来创建线程池极其简便,其实它的内部还是通过new ThreadPoolExecutor(…)的方式创建线程池的,具体可以看看源码,这里省略呢……
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5); ExecutorService singleThreadPool = Executors.newSingleThreadExecutor(); ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); ScheduledExecutorService singleThreadScheduledPool = Executors.newSingleThreadScheduledExecutor();
4.7.2.1 创建不同类型线程池代码如下所示:
/** * 创建线程池,目前支持以下四种 * @param type 类型 * @param size 数量size * @param priority 优先级
*/
private ExecutorService createPool(int type, int size, int priority) {
switch (type) {
case Builder.TYPE_CACHE:
//它是一个数量无限多的线程池,都是非核心线程,适合执行大量耗时小的任务
return Executors.newCachedThreadPool(new DefaultFactory(priority));
case Builder.TYPE_FIXED:
//线程数量固定的线程池,全部为核心线程,响应较快,不用担心线程会被回收。
return Executors.newFixedThreadPool(size, new DefaultFactory(priority));
case Builder.TYPE_SCHEDULED:
//有数量固定的核心线程,且有数量无限多的非核心线程,适合用于执行定时任务和固定周期的重复任务
return Executors.newScheduledThreadPool(size, new DefaultFactory(priority));
case Builder.TYPE_SINGLE:
default:
//内部只有一个核心线程,所有任务进来都要排队按顺序执行
return Executors.newSingleThreadExecutor(new DefaultFactory(priority));
}
}
```通过有道词典对这个类的说明进行翻译是:根据需要创建新线程的对象。使用线程工厂可以消除对{@link Thread#Thread(Runnable)新线程}的硬连接,从而使应用程序能够使用特殊的线程子类、优先级等。
public interface ThreadFactory {
/**
* Constructs a new {@code Thread}. Implementations may also initialize*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
Thread newThread(Runnable r);
}
```代码如下所示
public class MyThreadFactory implements ThreadFactory {
private int priority;
public MyThreadFactory(int priority) {
this.priority = priority;
}
@Override
public Thread newThread(@NonNull Runnable runnable) {
Thread thread = new Thread(runnable);
thread.setPriority(priority);
return thread;
}
}具体逻辑看DelayTaskExecutor中的postDelay方法
/** * 启动 * @param delay 延迟执行的时间,注意默认单位是TimeUnit.MILLISECONDS * @param pool pool线程池
*/
void postDelay(long delay, final ExecutorService pool, final Runnable task) {
if (delay == 0) {
//如果时间是0,那么普通开启
pool.execute(task);
return;
}
//延时操作
dispatcher.schedule(new Runnable() {
@Override
public void run() {
//在将来的某个时间执行给定的命令。该命令可以在新线程、池线程或调用线程中执行
pool.execute(task);
}
}, delay, TimeUnit.MILLISECONDS);
}
```代码如下所示
/**
*/
public void stop(){
try {
// shutdown只是起到通知的作用
// 只调用shutdown方法结束线程池是不够的
pool.shutdown();
// (所有的任务都结束的时候,返回TRUE)
if(!pool.awaitTermination(0, TimeUnit.MILLISECONDS)){
// 超时的时候向线程池中所有的线程发出中断(interrupted)。
pool.shutdownNow();
}
} catch (InterruptedException e) {
// awaitTermination方法被中断的时候也中止线程池中全部的线程的执行。
e.printStackTrace();
} finally {
pool.shutdownNow();
}
}
```