pthread_wqthread() 从上文的分析我们了解到了内核的调用链如下:
1 2 3 start_wqthread -> _pthread_wqthread ->_dispatch_worker_thread2 ->_dispatch_root_queue_drain ->_dispatch_queue_override_invoke ->_dispatch_client_callout -> _dispatch_call_block_and_release
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 _pthread_wqthread() 太长可绕过直接看分析 void _pthread_wqthread(pthread_t self, mach_port_t kport, void *stacklowaddr, void *keventlist, int flags, int nkevents) { if ((flags & WQ_FLAG_THREAD_REUSE) == 0 ) { _pthread_wqthread_setup(self, kport, stacklowaddr, flags); } pthread_priority_t pp; if (flags & WQ_FLAG_THREAD_OUTSIDEQOS) { self->wqoutsideqos = 1 ; pp = _pthread_priority_make_from_thread_qos(THREAD_QOS_LEGACY, 0 , _PTHREAD_PRIORITY_FALLBACK_FLAG); } else { self->wqoutsideqos = 0 ; pp = _pthread_wqthread_priority(flags); } self->tsd[_PTHREAD_TSD_SLOT_PTHREAD_QOS_CLASS] = (void *)pp; if (nkevents == WORKQ_EXIT_THREAD_NKEVENT) { goto exit ; } else if (flags & WQ_FLAG_THREAD_WORKLOOP) { self->fun = (void *(*)(void *))__libdispatch_workloopfunction; self->wq_retop = WQOPS_THREAD_WORKLOOP_RETURN; self->wq_kqid_ptr = ((kqueue_id_t *)keventlist - 1 ); self->arg = keventlist; self->wq_nevents = nkevents; } else if (flags & WQ_FLAG_THREAD_KEVENT) { self->fun = (void *(*)(void *))__libdispatch_keventfunction; self->wq_retop = WQOPS_THREAD_KEVENT_RETURN; self->wq_kqid_ptr = NULL ; self->arg = keventlist; self->wq_nevents = nkevents; } else { self->fun = (void *(*)(void *))__libdispatch_workerfunction; self->wq_retop = WQOPS_THREAD_RETURN; self->wq_kqid_ptr = NULL ; self->arg = (void *)(uintptr_t )pp; self->wq_nevents = 0 ; if (os_likely(__workq_newapi)) { (*__libdispatch_workerfunction)(pp); } else { _pthread_wqthread_legacy_worker_wrap(pp); } goto just_return; } if (nkevents > 0 ) { kevent_errors_retry: if (self->wq_retop == WQOPS_THREAD_WORKLOOP_RETURN) { ((pthread_workqueue_function_workloop_t )self->fun)(self->wq_kqid_ptr, &self->arg, &self->wq_nevents); } else { ((pthread_workqueue_function_kevent_t )self->fun) (&self->arg, &self->wq_nevents); } int rc = __workq_kernreturn(self->wq_retop, self->arg, self->wq_nevents, 0 ); if (os_unlikely(rc > 0 )) { self->wq_nevents = rc; goto kevent_errors_retry; } if (os_unlikely(rc < 0 )) { PTHREAD_INTERNAL_CRASH(self->err_no, "kevent (workloop) failed" ); } } else { just_return: __workq_kernreturn(self->wq_retop, NULL , 0 , 0 ); } exit : _pthread_wqthread_exit(self); }
我们只分析我们关心的代码,直接忽略异常代码。 首先进入WQ_FLAG_THREAD_WORKLOOP
流程,将__libdispatch_workloopfunction
等参数赋予self(就是pthread)
,然后进入kevent_errors_retry:
,调用self->fun()
即((pthread_workqueue_function_workloop_t)self->fun)(self->wq_kqid_ptr, &self->arg, &self->wq_nevents);
,然后调用__workq_kernreturn(self->wq_retop, self->arg, self->wq_nevents, 0);
。 到这_pthread_wqthread()
的任务派发结束,派发出去的func
想必就是_dispatch_worker_thread2
了。
然后回到libdispatch.dylib
dispatch_introspection_thread_add() 先说说dispatch_introspection_thread_add()
这个方法,因为dispatch_worker_thread2()
有调用他
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 void _dispatch_introspection_thread_add(void ) { if (_dispatch_thread_getspecific(dispatch_introspection_key)) { return ; } uintptr_t thread = _dispatch_thread_self(); dispatch_introspection_thread_t dit = (void *)_dispatch_continuation_alloc(); dit->dit_isa = (void *)0x41 ; dit->thread = (void *)thread; dit->queue = !_dispatch_introspection.thread_queue_offset ? NULL : (void *)thread + _dispatch_introspection.thread_queue_offset; _dispatch_thread_setspecific(dispatch_introspection_key, dit); _dispatch_unfair_lock_lock(&_dispatch_introspection.threads_lock); LIST_INSERT_HEAD(&_dispatch_introspection.threads, dit, dit_list); _dispatch_unfair_lock_unlock(&_dispatch_introspection.threads_lock); }
这个方法名从字面上理解是在内部添加一个thread,方法体内部首先初始化了一个dispatch_introspection_thread_t
对象,然后进行赋值。注意,在LIST_INSERT_HEAD()
也就是顶部插入dit操作之前有个加锁操作。实际上该方法可以简单的认为是将队列与线程塞进了一个字典里。
dispatch_worker_thread2() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 static void _dispatch_worker_thread2(pthread_priority_t pp) { bool overcommit = pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG; dispatch_queue_global_t dq; pp &= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG | ~_PTHREAD_PRIORITY_FLAGS_MASK; _dispatch_thread_setspecific(dispatch_priority_key, (void *)(uintptr_t )pp); dq = _dispatch_get_root_queue(_dispatch_qos_from_pp(pp), overcommit); _dispatch_introspection_thread_add(); _dispatch_trace_runtime_event(worker_unpark, dq, 0 ); int pending = os_atomic_dec2o(dq, dgq_pending, relaxed); dispatch_assert(pending >= 0 ); _dispatch_root_queue_drain(dq, dq->dq_priority, DISPATCH_INVOKE_WORKER_DRAIN | DISPATCH_INVOKE_REDIRECTING_DRAIN); _dispatch_voucher_debug("root queue clear" , NULL ); _dispatch_reset_voucher(NULL , DISPATCH_THREAD_PARK); _dispatch_trace_runtime_event(worker_park, NULL , 0 ); } ``` 从`root_queue`中取出`global_queue`,然后触发了`_dispatch_introspection_thread_add()`操作往`dit_list`添加一个线程,接着就调用了`_dispatch_root_queue_drain()`。 ## dispatch_root_queue_drain() ``` c static void _dispatch_root_queue_drain(dispatch_queue_global_t dq, dispatch_priority_t pri, dispatch_invoke_flags_t flags) { _dispatch_queue_set_current(dq); _dispatch_init_basepri(pri); _dispatch_adopt_wlh_anon(); struct dispatch_object_s *item ; bool reset = false ; dispatch_invoke_context_s dic = { }; #if DISPATCH_COCOA_COMPAT _dispatch_last_resort_autorelease_pool_push(&dic); #endif _dispatch_queue_drain_init_narrowing_check_deadline(&dic, pri); _dispatch_perfmon_start(); while (likely(item = _dispatch_root_queue_drain_one(dq))) { if (reset) _dispatch_wqthread_override_reset(); _dispatch_continuation_pop_inline(item, &dic, flags, dq); reset = _dispatch_reset_basepri_override(); if (unlikely(_dispatch_queue_drain_should_narrow(&dic))) { break ; } } if (pri & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) { _dispatch_perfmon_end(perfmon_thread_worker_oc); } else { _dispatch_perfmon_end(perfmon_thread_worker_non_oc); } #if DISPATCH_COCOA_COMPAT _dispatch_last_resort_autorelease_pool_pop(&dic); #endif _dispatch_reset_wlh(); _dispatch_clear_basepri(); _dispatch_queue_set_current(NULL ); }
循环从队列头部中取出任务,然后执行_dispatch_continuation_pop_inline()
dispatch_continuation_pop_inline() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 static inline void _dispatch_continuation_pop_inline(dispatch_object_t dou, dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags, dispatch_queue_class_t dqu) { dispatch_pthread_root_queue_observer_hooks_t observer_hooks = _dispatch_get_pthread_root_queue_observer_hooks(); if (observer_hooks) observer_hooks->queue_will_execute(dqu._dq); flags &= _DISPATCH_INVOKE_PROPAGATE_MASK; if (_dispatch_object_has_vtable(dou)) { dx_invoke(dou._dq, dic, flags); } else { _dispatch_continuation_invoke_inline(dou, flags, dqu); } if (observer_hooks) observer_hooks->queue_did_execute(dqu._dq); }
判断是否有vtable
,从上文我们知道dispatch_async()
调用dx_push()
会触发dx_vtable(x)->dq_push(x, y, z)
,这里应该意味着队列拥有vtable
成员,根据源码追溯dx_invoke()
调用do_invoke()
,那么do_invoke()
会调用哪个函数就有说法了,这里也就不去补细节了,do_invoke()
在全局并发队列的条件下会调用_dispatch_queue_override_invoke()
dispatch_queue_override_invoke() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 static void _dispatch_queue_override_invoke(dispatch_continuation_t dc, dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags) { dispatch_queue_t old_rq = _dispatch_queue_get_current(); dispatch_queue_global_t assumed_rq = dc->dc_other; dispatch_priority_t old_dp; dispatch_object_t dou; uintptr_t dc_flags = DC_FLAG_CONSUME; dou._do = dc->dc_data; old_dp = _dispatch_root_queue_identity_assume(assumed_rq); if (dc_type(dc) == DISPATCH_CONTINUATION_TYPE(OVERRIDE_STEALING)) { flags |= DISPATCH_INVOKE_STEALING; dc_flags |= DC_FLAG_NO_INTROSPECTION; } _dispatch_continuation_pop_forwarded(dc, dc_flags, assumed_rq, { if (_dispatch_object_has_vtable(dou._do)) { dx_invoke(dou._dq, dic, flags); } else { _dispatch_continuation_invoke_inline(dou, flags, assumed_rq); } }); _dispatch_reset_basepri(old_dp); _dispatch_queue_set_current(old_rq); }
如果按照正常的流程走下来,这里的dou
应该是个任务而不再是队列了,调用流程将走进_dispatch_continuation_invoke_inline()
dispatch_continuation_invoke_inline() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 static inline void _dispatch_continuation_invoke_inline(dispatch_object_t dou, dispatch_invoke_flags_t flags, dispatch_queue_class_t dqu) { dispatch_continuation_t dc = dou._dc, dc1; dispatch_invoke_with_autoreleasepool(flags, { ... if (unlikely(dc_flags & DC_FLAG_GROUP_ASYNC)) { _dispatch_continuation_with_group_invoke(dc); } else { _dispatch_client_callout(dc->dc_ctxt, dc->dc_func); _dispatch_trace_item_complete(dc); } ... }); _dispatch_perfmon_workitem_inc(); }
因为我们分析的是dispatch_async()
,所以走_dispatch_client_callout()
,而callout函数就是执行了了队列中的block任务。
一通胡乱的分析之后终于看明白了任务的调度执行流程,也明确了异步任务的执行顺序,但是距离理解GCD的工作流程还是差得很远。