GCD源码分析

laorenmen 2016-10-10

背景

最近在浏览阵营本地代码的时候发现有提到主队列和主线程的区别,很早就有阅读GCD源码的冲动,这回总算找到机会了。

阅读源码之前先给个结论:主线程和主队列是两个不同的东西。

  • 主队列是绑定到主线程。

  • 主线程未绑定到主队列。

源码剖析

关键点

队列和线程的关系

GCD源码分析

请记住这一点PNG!

慢路径VS快速路径

大家可能对__builtin_expect比较熟悉,这是编译器可以用来优化执行速度的函数。

程序员在写,如果条件的时候,可能知道比较的值更可能是哪种情况,因此可以就使用fastpath

或者slowpath来告诉compile-器,让编译来帮忙优化。

  • fastpath表示条件更可能成立

  • slowpath表示条件更不可能成立

所以简单的来说,当我们遇到这个东西的时候,直接忽略,并不会影响我们对代码的理解。

#define fastpath(x) ((typeof(x))__builtin_expect((long)(x), ~0l))#define slowpath(x) ((typeof(x))__builtin_expect((long)(x), 0l))

功能VS块

GCD支持功能和块的执行,相应的其提供了两种方法来支持功能和块的入队,简单的举个例子。所以当我们看到不带˚F和带˚F的同名函数,默认他们干的是同一回事。

块的执行底层调用的是函数的执行。

// 执行blockvoid dispatch_async(dispatch_queue_t dq, void (^work)(void));// 执行functionvoid dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func);

常用宏定义

libdispatch为了在保证性能的情况下,尽量增加代码的可读性,大量的使用了宏。

因此了解结构体之前,我们需要先知道几个宏定义,方便后续分析代码。

// dispatch结构体定义宏#define DISPATCH_DECL(name) typedef struct name##_s *name##_t// os object头部定义宏#define _OS_OBJECT_HEADER(isa, ref_cnt, xref_cnt)

isa // isa

ref_cnt // 引用计数,这是内部gcd内部使用的计数器

xref_cnt // 外部引用计数,这是gcd外部使用的计数器,两者都为0的时候才能dispose

// dispatch结构体头部#define DISPATCH_STRUCT_HEADER(x)

_OS_OBJECT_HEADER

do_next // 下一个do

do_targetq // 目标queue

do_ctxt // do上下文

do_finalizer // do销毁时候调用函数

do_suspend_cnt // suspend计数,用作暂停标志

// dispatch continuation的头部#define DISPATCH_CONTINUATION_HEADER(x)

_OS_OBJECT_HEADER

do_next // 下一个do

dc_func // dc封装的函数

dc_ctxt // dc封装的上下文

dc_data // dc封装的数据

dc_other // dc封装的其他信息

// dispatch queue头部#define DISPATCH_QUEUE_HEADER

dq_running; // 队列正在运行的任务数

dq_width; // 队列可以并发的数目

dq_items_tail; // 队列末尾节点

dq_items_head; // 队列开头节点

dq_serialnum // 队列序列号,每个队列都有唯一的序列号

dq_specific_q //

// dispatch vtable头部#define DISPATCH_VTABLE_HEADER(x)

do_type // do类型

do_kind // 种类,例如:semaphore/group/queue...

do_debug // debug回调

do_invoke // invoke回调,唤醒队列回调

do_probe // probe回调,important

do_dispose // dispose回调,销毁队列的方法

GCD结构体

因为本文只关心队列的实现,所以暂时省略了其他结构体,有兴趣的童鞋可以直接下源码看。

os_object_t

系统基类

typedef struct _os_object_s {

_OS_OBJECT_HEADER(

const _os_object_class_s *os_obj_isa,

os_obj_ref_cnt,

os_obj_xref_cnt); } _os_object_s;

dispatch_object_t

该结构体可以看成GCD的基类。

typedef union {

struct _os_object_s *_os_obj;

struct dispatch_object_s *_do;

struct dispatch_continuation_s *_dc;

struct dispatch_queue_s *_dq;

struct dispatch_queue_attr_s *_dqa;

struct dispatch_group_s *_dg;

struct dispatch_source_s *_ds;

struct dispatch_source_attr_s *_dsa;

struct dispatch_semaphore_s *_dsema;

struct dispatch_data_s *_ddata;

struct dispatch_io_s *_dchannel;

struct dispatch_operation_s *_doperation;

struct dispatch_disk_s *_ddisk;} dispatch_object_t __attribute__((__transparent_union__));

通过上面的结构体定义可以发现,dispatch_object_t可以是工会的结构体中任何一种类型。

dispatch_continuation_t

该结构体主要用来封装模块和功能

struct dispatch_continuation_s {

DISPATCH_CONTINUATION_HEADER(continuation);};

dispatch_queue_t

队列结构体,可能是GCD最重要的结构体了。

struct dispatch_queue_s { DISPATCH_STRUCT_HEADER(queue);

DISPATCH_QUEUE_HEADER;

char dq_label[DISPATCH_QUEUE_MIN_LABEL_SIZE]; // must be last

char _dq_pad[DISPATCH_QUEUE_CACHELINE_PAD]; // for static queues only};

dispatch_queue_attr_t

队列属性结构体。

struct dispatch_queue_attr_s {

DISPATCH_STRUCT_HEADER(queue_attr);};

常用API解析

GCD提供了非常多的功能来简化针对多核设备的代码编写,这里我们慢慢添加对常用API的源码剖析。

dispatch_get_global_queue

dispatch_queue_tdispatch_get_global_queue(long priority, unsigned long flags){

if (flags & ~DISPATCH_QUEUE_OVERCOMMIT) { return NULL;

}

return _dispatch_get_root_queue(priority,

flags & DISPATCH_QUEUE_OVERCOMMIT);}

当我们获取全局队列来使用的时候,实质上其通过_dispatch_get_root_queue来的电子杂志过量非的预先生成的队列的。

_dispatch_get_root_queue从是结构行业释义体育_dispatch_root_queues中相应电子杂志优先的级的队列。_dispatch_root_queues区分是否过量使用,定义了4中优先级的队列,他们分别是(参考前文的图)。最后1位是1的代表overcommit.overcommit用来控制线程数能不能超越物理内核数,显然通过该接口获得的队列不会给系统创建过多的队列。

DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY = 0,

DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY,

DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY,

DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY,

DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY,

DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY,

DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY,

DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY,

最后提及一下,_dispatch_root_queues对应的线程在实现_dispatch_root_queue_contexts中,每一个方面都是一个线程池,每个线程池的最大线程数限制是255。

dispatch_get_current_queue

dispatch_queue_tdispatch_get_current_queue(void){

return _dispatch_queue_get_current() ?: _dispatch_get_root_queue(0, true);}static inline dispatch_queue_t_dispatch_queue_get_current(void){

return (dispatch_queue_t)_dispatch_thread_getspecific(dispatch_queue_key);}

发现可以当我们通过dispatch_get_current_queue来电子杂志当前运行的队列的时候,我们是通过TSD(线程特定数据)来确定到底当前是运行在那个队列上的,每当我们切换队列的时候,通过都是_dispatch_thread_setspecific来设置当前队列。

这里主要涉及到TSD,也有叫TLD的,是个比较有意思的技术。

dispatch_queue_create

dispatch_queue_tdispatch_queue_create(const char *label, dispatch_queue_attr_t attr){

dispatch_queue_t dq; size_t label_len;

if (!label) {

label = "";

}

label_len = strlen(label); if (label_len < (DISPATCH_QUEUE_MIN_LABEL_SIZE - 1)) {

label_len = (DISPATCH_QUEUE_MIN_LABEL_SIZE - 1);

}

// XXX switch to malloc()

dq = _dispatch_alloc(DISPATCH_VTABLE(queue), sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_MIN_LABEL_SIZE -

DISPATCH_QUEUE_CACHELINE_PAD + label_len + 1);

_dispatch_queue_init(dq);

strcpy(dq->dq_label, label);

if (fastpath(!attr)) { return dq;

}

if (fastpath(attr == DISPATCH_QUEUE_CONCURRENT)) {

dq->dq_width = UINT32_MAX;

dq->do_targetq = _dispatch_get_root_queue(0, false);

} else {

dispatch_debug_assert(!attr, "Invalid attribute");

}

return dq;}static inline void_dispatch_queue_init(dispatch_queue_t dq){

dq->do_next = (struct dispatch_queue_s *)DISPATCH_OBJECT_LISTLESS; // Default target queue is overcommit!

dq->do_targetq = _dispatch_get_root_queue(0, true);

dq->dq_running = 0;

dq->dq_width = 1;

dq->dq_serialnum = dispatch_atomic_inc(&_dispatch_queue_serial_numbers) - 1;}

当我们调用dispatch_queue_create进行队列的创建的时候,其会首先调用_dispatch_queue_init初始化一个队列,该队列默认是默认的优先级(还记得前文的图么?),并且dq_width是1,也就是串行队列,并且序列号加1。

如果是并发队列的话,会将dq_width改成UINT32_MAX,并且将目标队列设置成非过量的.overcommit如果被设置成真,那就意味着可以创建超过物理核数目的线程数。因此可以发现,自定义并发队列线程数目是不会超过物理内核数的,而串行队列一般是没有这个限制的。

前面说到序列号加1了,那序列号是干什么的呢?在源码中有这样一段注释。

// skip zero// 1 - main_q// 2 - mgr_q// 3 - _unused_// 4,5,6,7,8,9,10,11 - global queues// we use 'xadd' on Intel, so the initial value == next assigned

可以发现,序列号是1的时候表示主队列,2的时候表示经理队列,3没有使用,4到11表示全局队列,再往后就是用户自定义队列了。

那接下来我们看看序列1和序列2的队列是怎么定义的。

struct dispatch_queue_s _dispatch_main_q = { .do_vtable = DISPATCH_VTABLE(queue),#if !DISPATCH_USE_RESOLVERS

.do_targetq = &_dispatch_root_queues[

DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],#endif

.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, .dq_label = "com.apple.main-thread", .dq_running = 1, .dq_width = 1, .dq_serialnum = 1,

};

值得说明的是,主队列的目标队列也只是一个优先级为默认的过量使用队列,其背后也是普通的线程池,没什么特别的。

struct dispatch_queue_s _dispatch_mgr_q = { .do_vtable = DISPATCH_VTABLE(queue_mgr), .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, .do_targetq = &_dispatch_root_queues[

DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY], .dq_label = "com.apple.libdispatch-manager", .dq_width = 1, .dq_serialnum = 2,

};

该队列是用来管理GCD内部的任务的,比如对于各类资源的管理等。

dispatch_sync

voiddispatch_sync(dispatch_queue_t dq, void (^work)(void)){#if DISPATCH_COCOA_COMPAT

if (slowpath(dq == &_dispatch_main_q)) { return _dispatch_sync_slow(dq, work);

}#endif

struct Block_basic *bb = (void *)work;

dispatch_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);}

无论走哪个分支,深入查看之后,可以发现,其最终走的都是dispatch_sync_f,通过_dispatch_Block_copy或者Block_basic来实现块到功能的转换。

voiddispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func){

if (fastpath(dq->dq_width == 1)) { return dispatch_barrier_sync_f(dq, ctxt, func);

}

if (slowpath(!dq->do_targetq)) {

// the global root queues do not need strict ordering

(void)dispatch_atomic_add2o(dq, dq_running, 2);

return _dispatch_sync_f_invoke(dq, ctxt, func);

}

_dispatch_sync_f2(dq, ctxt, func);}

如果dq_width是1的话,也就是DQ是串行队列的话,必须要等待前面的任务执行完成之后才能执行该任务,因此会调用dispatch_barrier_sync_f.barrier的实现是依靠信号量机制来保证的。

voiddispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,

dispatch_function_t func){ // 1) ensure that this thread hasn't enqueued anything ahead of this call

// 2) the queue is not suspended

if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){ return _dispatch_barrier_sync_f_slow(dq, ctxt, func);

} if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1))) { // global queues and main queue bound to main thread always falls into

// the slow case

return _dispatch_barrier_sync_f_slow(dq, ctxt, func);

} if (slowpath(dq->do_targetq->do_targetq)) { return _dispatch_barrier_sync_f_recurse(dq, ctxt, func);

} _dispatch_barrier_sync_f_invoke(dq, ctxt, func);}

如果当前队列中有对象或者当前队列处于暂停状态或者当前队列没有运行任何任务的时候,就走_dispatch_barrier_sync_f_slow慢通道,否则的话就直接调用_dispatch_barrier_sync_f_invoke。发现可以dispatch_sync一般来说都是在**当前线程**执行的,不进行线程的切换,这一点还是要特别注意的。走只有slow的时候,才会做线程切换。就下面看下走slow路径的英文什么样的。

static void_dispatch_barrier_sync_f_slow(dispatch_queue_t dq, void *ctxt, dispatch_function_t func){

// It's preferred to execute synchronous blocks on the current thread

// due to thread-local side effects, garbage collection, etc. However,

// blocks submitted to the main thread MUST be run on the main thread

struct dispatch_barrier_sync_slow2_s dbss2 = {

.dbss2_dq = dq,#if DISPATCH_COCOA_COMPAT || DISPATCH_LINUX_COMPAT

.dbss2_func = func,

.dbss2_ctxt = ctxt,#endif

.dbss2_sema = _dispatch_get_thread_semaphore(),

};

struct dispatch_barrier_sync_slow_s dbss = {

.do_vtable = (void *)(DISPATCH_OBJ_BARRIER_BIT |

DISPATCH_OBJ_SYNC_SLOW_BIT),

.dc_func = _dispatch_barrier_sync_f_slow_invoke,

.dc_ctxt = &dbss2,

};

_dispatch_queue_push(dq, (void *)&dbss);

_dispatch_thread_semaphore_wait(dbss2.dbss2_sema);

_dispatch_put_thread_semaphore(dbss2.dbss2_sema);

......

代码比较长,截取开头一部分,我们可以发现,其通过信号量来同步任务的执行,需要切换线程。

简单看下_dispatch_function_invoke,主要关注的队列和线程的关系并非一一对应的.queue是基于线程的,我们在使用队列的时候,就不应该再操作线程,防止出现意外的情况。

static inline void_dispatch_function_invoke(dispatch_queue_t dq, void *ctxt, dispatch_function_t func){

dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);

_dispatch_thread_setspecific(dispatch_queue_key, dq);

_dispatch_client_callout(ctxt, func);

_dispatch_workitem_inc();

_dispatch_thread_setspecific(dispatch_queue_key, old_dq);}

当我们切换线程的时候,我们会先更改TSD,执行块,然后再将之前的DQ设置回去。

dispatch_async

voiddispatch_async(dispatch_queue_t dq, void (^work)(void)){

dispatch_async_f(dq, _dispatch_Block_copy(work),

_dispatch_call_block_and_release);}

这个就比较显而易见了,直接就把块转成函数,然后掉xxx_f函数了。

voiddispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func){

dispatch_continuation_t dc;

// No fastpath/slowpath hint because we simply don't know

if (dq->dq_width == 1) { return dispatch_barrier_async_f(dq, ctxt, func);

}

dc = fastpath(_dispatch_continuation_alloc_cacheonly());

if (!dc) { return _dispatch_async_f_slow(dq, ctxt, func);

}

dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;

dc->dc_func = func;

dc->dc_ctxt = ctxt;

// No fastpath/slowpath hint because we simply don't know

if (dq->do_targetq) { return _dispatch_async_f2(dq, dc);

}

_dispatch_queue_push(dq, dc);}

看过前面几个例子之后就比较清晰了,如果DQ是串行队列的话,就走dispatch_barrier_async_f,的否则就话教育需要创建³³一个dispatch_continuation对象来存放功能,然后塞到队列后面。其实这两个路径都是干了一件事情,就是创建DC对象,然后塞到队列后面,唯一的区别是

// barrierdc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);// not barrierdc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;

其实到这里我们可以知道,GCD的队列等待阻塞就是通过DISPATCH_OBJ_BARRIER_BIT这个位标识来实现的,全部的标识有如下几种:

#define DISPATCH_OBJ_ASYNC_BIT 0x1 // 异步#define DISPATCH_OBJ_BARRIER_BIT 0x2 // 阻塞#define DISPATCH_OBJ_GROUP_BIT 0x4 // 组#define DISPATCH_OBJ_SYNC_SLOW_BIT 0x8 // 同步慢

理解这个对于阅读源码作用还是比较大的。

接下去是比较重要的部分,也是隐藏的比较深的部分,我们现在的确把DC给放进队列里面了,那什么时候改DC才会被执行呢?正常的实现应该是上一个任务完成之后主动触发下一个任务。我们看下几个函数

// 同步任务执行static void_dispatch_sync_f_invoke(dispatch_queue_t dq, void *ctxt, dispatch_function_t func){

_dispatch_function_invoke(dq, ctxt, func);

if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2) == 0)) {

_dispatch_wakeup(dq);

}}// 异步任务执行static void_dispatch_async_f_redirect_invoke(void *_ctxt){

struct dispatch_continuation_s *dc = _ctxt; struct dispatch_continuation_s *other_dc = dc->dc_other; dispatch_queue_t old_dq, dq = dc->dc_data, rq;

old_dq = _dispatch_thread_getspecific(dispatch_queue_key);

_dispatch_thread_setspecific(dispatch_queue_key, dq);

_dispatch_continuation_pop(other_dc);

_dispatch_thread_setspecific(dispatch_queue_key, old_dq);

rq = dq->do_targetq;

while (slowpath(rq->do_targetq) && rq != old_dq) { if (dispatch_atomic_sub2o(rq, dq_running, 2) == 0) {

_dispatch_wakeup(rq);

}

rq = rq->do_targetq;

}

if (dispatch_atomic_sub2o(dq, dq_running, 2) == 0) {

_dispatch_wakeup(dq);

}

_dispatch_release(dq);}

可以发现无论是同步的任务执行,还是异步的任务执行,其最终都会调用_dispatch_wakeup,该函数的作用就是触发下一个任务。

dispatch_queue_t_dispatch_wakeup(dispatch_object_t dou){

dispatch_queue_t tq;

if (slowpath(DISPATCH_OBJECT_SUSPENDED(dou._do))) { return NULL;

}

if (!dx_probe(dou._do) && !dou._dq->dq_items_tail) { return NULL;

}

// _dispatch_source_invoke() relies on this testing the whole suspend count

// word, not just the lock bit. In other words, no point taking the lock

// if the source is suspended or canceled.

if (!dispatch_atomic_cmpxchg2o(dou._do, do_suspend_cnt, 0,

DISPATCH_OBJECT_SUSPEND_LOCK)) {#if DISPATCH_COCOA_COMPAT || DISPATCH_LINUX_COMPAT

if (dou._dq == &_dispatch_main_q) { return _dispatch_queue_wakeup_main();

}#endif

return NULL;

}

dispatch_atomic_acquire_barrier();

_dispatch_retain(dou._do);

tq = dou._do->do_targetq;

_dispatch_queue_push(tq, dou._do);

return tq; // libdispatch does not need this, but the Instrument DTrace

// probe does}

一开始看半天没看出来咋整的,有个非常不起眼的宏dx_probe,他的定义是(x)->do_vtable->do_probe(x),搜了一下do_probe,可以发现,在INIT.C中有相关的定义,我们讨论到现在的基本都是root_queue ,所以只看root_queue。

DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_root, queue,

.do_type = DISPATCH_QUEUE_GLOBAL_TYPE,

.do_kind = "global-queue",

.do_debug = dispatch_queue_debug,

.do_probe = _dispatch_queue_probe_root,);bool_dispatch_queue_probe_root(dispatch_queue_t dq){

_dispatch_queue_wakeup_global2(dq, 1); return false;}static inline void_dispatch_queue_wakeup_global2(dispatch_queue_t dq, unsigned int n){

struct dispatch_root_queue_context_s *qc = dq->do_ctxt;

if (!dq->dq_items_tail) { return;

}#if HAVE_PTHREAD_WORKQUEUES

if (#if DISPATCH_USE_PTHREAD_POOL

(qc->dgq_kworkqueue != (void*)(~0ul)) &&#endif

!dispatch_atomic_cmpxchg2o(qc, dgq_pending, 0, n)) {

_dispatch_debug("work thread request still pending on global queue: "

"%p", dq); return;

}#endif // HAVE_PTHREAD_WORKQUEUES

return _dispatch_queue_wakeup_global_slow(dq, n);}

_dispatch_queue_wakeup_global_slow的代码这里就不贴了,在这个函数中会发送生产者消息让下一个任务执行,同时也会在必要的时候创建线程。这里是比较隐晦的,但原理和我们猜的也差不多。

结论

看这个代码断断续续也看了几天,终于看出了一点端倪。感谢网友博客提供的关键信息说明,少走了很多弯路。特此记录,以备后用。

参数文献

  • 深入理解GCD

  • GCD源码的分析

  • libdispatch

  • 队列不绑定到任何特定线程

相关推荐

TiDBPingCAP / 0评论 2020-07-29