余额不足.

GCD线程管理-内核相关

字数统计: 1.2k阅读时长: 6 min
2019/07/04 Share

pthread_wqthread()

从上文的分析我们了解到了内核的调用链如下:

1
2
3
start_wqthread -> _pthread_wqthread ->_dispatch_worker_thread2 ->_dispatch_root_queue_drain 
->_dispatch_queue_override_invoke ->_dispatch_client_callout
-> _dispatch_call_block_and_release
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
_pthread_wqthread() 太长可绕过直接看分析
void
_pthread_wqthread(pthread_t self, mach_port_t kport, void *stacklowaddr,
void *keventlist, int flags, int nkevents)
{
if ((flags & WQ_FLAG_THREAD_REUSE) == 0) {
_pthread_wqthread_setup(self, kport, stacklowaddr, flags);
}

pthread_priority_t pp;
if (flags & WQ_FLAG_THREAD_OUTSIDEQOS) {
self->wqoutsideqos = 1;
pp = _pthread_priority_make_from_thread_qos(THREAD_QOS_LEGACY, 0,
_PTHREAD_PRIORITY_FALLBACK_FLAG);
} else {
self->wqoutsideqos = 0;
pp = _pthread_wqthread_priority(flags);
}

self->tsd[_PTHREAD_TSD_SLOT_PTHREAD_QOS_CLASS] = (void *)pp;

// avoid spills on the stack hard to keep used stack space minimal
if (nkevents == WORKQ_EXIT_THREAD_NKEVENT) {
goto exit;
} else if (flags & WQ_FLAG_THREAD_WORKLOOP) {
self->fun = (void *(*)(void*))__libdispatch_workloopfunction;
self->wq_retop = WQOPS_THREAD_WORKLOOP_RETURN;
self->wq_kqid_ptr = ((kqueue_id_t *)keventlist - 1);
self->arg = keventlist;
self->wq_nevents = nkevents;
} else if (flags & WQ_FLAG_THREAD_KEVENT) {
self->fun = (void *(*)(void*))__libdispatch_keventfunction;
self->wq_retop = WQOPS_THREAD_KEVENT_RETURN;
self->wq_kqid_ptr = NULL;
self->arg = keventlist;
self->wq_nevents = nkevents;
} else {
self->fun = (void *(*)(void*))__libdispatch_workerfunction;
self->wq_retop = WQOPS_THREAD_RETURN;
self->wq_kqid_ptr = NULL;
self->arg = (void *)(uintptr_t)pp;
self->wq_nevents = 0;
if (os_likely(__workq_newapi)) {
(*__libdispatch_workerfunction)(pp);
} else {
_pthread_wqthread_legacy_worker_wrap(pp);
}
goto just_return;
}

if (nkevents > 0) {
kevent_errors_retry:
if (self->wq_retop == WQOPS_THREAD_WORKLOOP_RETURN) {
((pthread_workqueue_function_workloop_t)self->fun)(self->wq_kqid_ptr, &self->arg, &self->wq_nevents);
} else {
((pthread_workqueue_function_kevent_t)self->fun)
(&self->arg, &self->wq_nevents);
}
int rc = __workq_kernreturn(self->wq_retop, self->arg, self->wq_nevents, 0);
if (os_unlikely(rc > 0)) {
self->wq_nevents = rc;
goto kevent_errors_retry;
}
if (os_unlikely(rc < 0)) {
PTHREAD_INTERNAL_CRASH(self->err_no, "kevent (workloop) failed");
}
} else {
just_return:
__workq_kernreturn(self->wq_retop, NULL, 0, 0);
}

exit:
_pthread_wqthread_exit(self);
}

我们只分析我们关心的代码,直接忽略异常代码。
首先进入WQ_FLAG_THREAD_WORKLOOP流程,将__libdispatch_workloopfunction等参数赋予self(就是pthread),然后进入kevent_errors_retry:,调用self->fun()((pthread_workqueue_function_workloop_t)self->fun)(self->wq_kqid_ptr, &self->arg, &self->wq_nevents);,然后调用__workq_kernreturn(self->wq_retop, self->arg, self->wq_nevents, 0);
到这_pthread_wqthread()的任务派发结束,派发出去的func想必就是_dispatch_worker_thread2了。

然后回到libdispatch.dylib

dispatch_introspection_thread_add()

先说说dispatch_introspection_thread_add()这个方法,因为dispatch_worker_thread2()有调用他

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void
_dispatch_introspection_thread_add(void)
{
if (_dispatch_thread_getspecific(dispatch_introspection_key)) {
return;
}
uintptr_t thread = _dispatch_thread_self();
dispatch_introspection_thread_t dit = (void*)_dispatch_continuation_alloc();
dit->dit_isa = (void*)0x41;
dit->thread = (void*)thread;
dit->queue = !_dispatch_introspection.thread_queue_offset ? NULL :
(void*)thread + _dispatch_introspection.thread_queue_offset;

_dispatch_thread_setspecific(dispatch_introspection_key, dit);
_dispatch_unfair_lock_lock(&_dispatch_introspection.threads_lock);
LIST_INSERT_HEAD(&_dispatch_introspection.threads, dit, dit_list);
_dispatch_unfair_lock_unlock(&_dispatch_introspection.threads_lock);
}

这个方法名从字面上理解是在内部添加一个thread,方法体内部首先初始化了一个dispatch_introspection_thread_t对象,然后进行赋值。注意,在LIST_INSERT_HEAD()也就是顶部插入dit操作之前有个加锁操作。实际上该方法可以简单的认为是将队列与线程塞进了一个字典里。

dispatch_worker_thread2()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
static void
_dispatch_worker_thread2(pthread_priority_t pp)
{
bool overcommit = pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
dispatch_queue_global_t dq;

pp &= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG | ~_PTHREAD_PRIORITY_FLAGS_MASK;
_dispatch_thread_setspecific(dispatch_priority_key, (void *)(uintptr_t)pp);
dq = _dispatch_get_root_queue(_dispatch_qos_from_pp(pp), overcommit);

_dispatch_introspection_thread_add();
_dispatch_trace_runtime_event(worker_unpark, dq, 0);

int pending = os_atomic_dec2o(dq, dgq_pending, relaxed);
dispatch_assert(pending >= 0);
_dispatch_root_queue_drain(dq, dq->dq_priority,
DISPATCH_INVOKE_WORKER_DRAIN | DISPATCH_INVOKE_REDIRECTING_DRAIN);
_dispatch_voucher_debug("root queue clear", NULL);
_dispatch_reset_voucher(NULL, DISPATCH_THREAD_PARK);
_dispatch_trace_runtime_event(worker_park, NULL, 0);
}
```
从`root_queue`中取出`global_queue`,然后触发了`_dispatch_introspection_thread_add()`操作往`dit_list`添加一个线程,接着就调用了`_dispatch_root_queue_drain()`。

## dispatch_root_queue_drain()

``` c
static void
_dispatch_root_queue_drain(dispatch_queue_global_t dq,
dispatch_priority_t pri, dispatch_invoke_flags_t flags)
{
_dispatch_queue_set_current(dq);
_dispatch_init_basepri(pri);
_dispatch_adopt_wlh_anon();

struct dispatch_object_s *item;
bool reset = false;
dispatch_invoke_context_s dic = { };
#if DISPATCH_COCOA_COMPAT
_dispatch_last_resort_autorelease_pool_push(&dic);
#endif // DISPATCH_COCOA_COMPAT
_dispatch_queue_drain_init_narrowing_check_deadline(&dic, pri);
_dispatch_perfmon_start();
while (likely(item = _dispatch_root_queue_drain_one(dq))) {
if (reset) _dispatch_wqthread_override_reset();
_dispatch_continuation_pop_inline(item, &dic, flags, dq);
reset = _dispatch_reset_basepri_override();
if (unlikely(_dispatch_queue_drain_should_narrow(&dic))) {
break;
}
}

// overcommit or not. worker thread
if (pri & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) {
_dispatch_perfmon_end(perfmon_thread_worker_oc);
} else {
_dispatch_perfmon_end(perfmon_thread_worker_non_oc);
}

#if DISPATCH_COCOA_COMPAT
_dispatch_last_resort_autorelease_pool_pop(&dic);
#endif // DISPATCH_COCOA_COMPAT
_dispatch_reset_wlh();
_dispatch_clear_basepri();
_dispatch_queue_set_current(NULL);
}

循环从队列头部中取出任务,然后执行_dispatch_continuation_pop_inline()

dispatch_continuation_pop_inline()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static inline void
_dispatch_continuation_pop_inline(dispatch_object_t dou,
dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
dispatch_queue_class_t dqu)
{
dispatch_pthread_root_queue_observer_hooks_t observer_hooks =
_dispatch_get_pthread_root_queue_observer_hooks();
if (observer_hooks) observer_hooks->queue_will_execute(dqu._dq);
flags &= _DISPATCH_INVOKE_PROPAGATE_MASK;
if (_dispatch_object_has_vtable(dou)) {
dx_invoke(dou._dq, dic, flags);
} else {
_dispatch_continuation_invoke_inline(dou, flags, dqu);
}
if (observer_hooks) observer_hooks->queue_did_execute(dqu._dq);
}

判断是否有vtable,从上文我们知道dispatch_async()调用dx_push()会触发dx_vtable(x)->dq_push(x, y, z),这里应该意味着队列拥有vtable成员,根据源码追溯dx_invoke()调用do_invoke(),那么do_invoke()会调用哪个函数就有说法了,这里也就不去补细节了,do_invoke()在全局并发队列的条件下会调用_dispatch_queue_override_invoke()

dispatch_queue_override_invoke()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static void
_dispatch_queue_override_invoke(dispatch_continuation_t dc,
dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags)
{
dispatch_queue_t old_rq = _dispatch_queue_get_current();
dispatch_queue_global_t assumed_rq = dc->dc_other;
dispatch_priority_t old_dp;
dispatch_object_t dou;
uintptr_t dc_flags = DC_FLAG_CONSUME;

dou._do = dc->dc_data;
old_dp = _dispatch_root_queue_identity_assume(assumed_rq);
if (dc_type(dc) == DISPATCH_CONTINUATION_TYPE(OVERRIDE_STEALING)) {
flags |= DISPATCH_INVOKE_STEALING;
dc_flags |= DC_FLAG_NO_INTROSPECTION;
}
_dispatch_continuation_pop_forwarded(dc, dc_flags, assumed_rq, {
if (_dispatch_object_has_vtable(dou._do)) {
dx_invoke(dou._dq, dic, flags);
} else {
_dispatch_continuation_invoke_inline(dou, flags, assumed_rq);
}
});
_dispatch_reset_basepri(old_dp);
_dispatch_queue_set_current(old_rq);
}

如果按照正常的流程走下来,这里的dou应该是个任务而不再是队列了,调用流程将走进_dispatch_continuation_invoke_inline()

dispatch_continuation_invoke_inline()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static inline void
_dispatch_continuation_invoke_inline(dispatch_object_t dou,
dispatch_invoke_flags_t flags, dispatch_queue_class_t dqu)
{
dispatch_continuation_t dc = dou._dc, dc1;
dispatch_invoke_with_autoreleasepool(flags, {
...
if (unlikely(dc_flags & DC_FLAG_GROUP_ASYNC)) {
_dispatch_continuation_with_group_invoke(dc);
} else {
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
_dispatch_trace_item_complete(dc);
}
...
});
_dispatch_perfmon_workitem_inc();
}

因为我们分析的是dispatch_async(),所以走_dispatch_client_callout(),而callout函数就是执行了了队列中的block任务。

一通胡乱的分析之后终于看明白了任务的调度执行流程,也明确了异步任务的执行顺序,但是距离理解GCD的工作流程还是差得很远。

发表日期:July 4th 2019, 8:13:17 pm

版权声明:本文采用知识共享署名-非商业性使用 4.0 国际许可协议进行许可

CATALOG
  1. 1. pthread_wqthread()
  2. 2. dispatch_introspection_thread_add()
  3. 3. dispatch_worker_thread2()
  4. 4. dispatch_continuation_pop_inline()
  5. 5. dispatch_queue_override_invoke()
  6. 6. dispatch_continuation_invoke_inline()