线程与消息循环-MessageLoop

1. 前言

每一个需要与用户进行交互的app都需要至少一个消息循环,来处理用户事件的输入和各种I/O事件。iOS和Andriod都有各自系统的MessageLoop,iOS的叫NSRunLoop,Android的叫Looper。消息循环有利于提交系统的影响速度,在现代操作系统中得到了广泛的应用。

2. 消息循环

消息循环的工作原理大同小异,原理上就是一个while循环+消息队列不断接收任务。具体可以分为下面几个部分的内容:

  1. 事件源接收与分发
  2. 延时处理
  3. 线程等待唤醒处理

2.1 事件源接收与分发

MessageLoop内部一般会有一个消息队列,等待要执行的任务push进来。下面通过两张图简单说明下:

  • iOS的NSRunLoop
    NSRunLoop

  • Android的Looper
    Looper

2.2 延时处理

延时任务在消息处理中很常见,消息循环一般要针对延时的任务进行等待处理。这个时候就需要两个东西:

  1. 延时队列
  2. 时钟计数

时钟计数需要一个系统固定的时间作为所有任务计时的起点,一般我们会采用系统的启动时间boottime,延时队列用来存储还未到时间执行的任务。消息循环每次loop的时候,会检查当前时钟有没有达到延时队列里任务的时间点,有的话就取出来执行,没有就等待下一次loop。假如有某些任务执行时间比较长,会导致延时队列里的任务已经过了要执行的时间点。为了保证效率,这个时候消息循环会统一对过期的任务进行调用。所以延时任务只能保证在>=delay的时间点执行
timeline

2.3 线程等待唤醒处理

消息循环本质是一个等待队列,那么这个时候经典的生产者消费者模型就派上用场了。“生产者消费者”模型可以用信号量,管道等方法来实现。在实际的项目中,一般会采用pthread的pthread_cond_wait,pthread_cond_signal,pthread_cond_broadcast等接口来实现。如果考虑到linux和window平台的跨平台场景,那可以直接用STL的condition_variable。(window对pthread的支持不是很好)。

3. 跨平台的MessageLoop实现

介绍完消息循环的工作原理后,我们来看一下在mailapp协议层的设计中,跨平台的messageloop是怎么实现的。跨平台的MessageLoop最开始是参照google的chromium框架实现的。google的chromium库庞大复杂,跨平台的MessageLoop只是抽取了其它一部分功能进行实现,去掉了平台相关的UI线程消息循环实现。下面通过一张图介绍一下MessageLoop的架构设计:

按照前面的消息循环原理,我们需要实现以下几个功能:

  • 线程存储空间
  • 优先级队列
  • 计时时钟
  • 线程等待唤醒

3.1 线程存储空间

每条线程对应着一条messageloop,所以需要创建线程特有的存储空间来存放messageloop对象。
关于线程特有存储空间可以参照 Using Thread Local Storage.aspx)。
threadlocal

实现线程特定存储有两种方式,c++11中的thread_local,pthread的pthread_key_t。c++11的thread_local关键字在mac系统还没支持,所以我们选取了pthread的相关接口来实现。这样我们就可以直接通过messageloop的static接口得到当前线程的messageloop对象,如Lite::MessageLoop::current(),从面进行消息派发。

3.2 优先级队列

优先级队列没有什么特殊要求,直接用std::priority_queue即可。
一般至少需要 TaskQueue, DelayTaskQueue两条队列。

3.3 计时时钟

计时时钟需要选取固定的参考时间点作为起点,一般可以选择设备的boottime,这里为了方便,直接选用Greenwich Time作为参考点,直接调用gettimeofday,时间精度可以达到微妙。

3.4 线程等待唤醒

STL对condition_variable的支持比较完善,这里对线程的等待唤醒处理可以直接用STL来处理。流程图如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
for (;;) {
bool did_work = delegate->DoWork();
if (!keep_running_)
break;

did_work |= delegate->DoDelayedWork(&delayed_work_time_);
if (!keep_running_)
break;

if (did_work)
continue;
// idle now
did_work = delegate->DoIdleWork();
if (!keep_running_)
break;

if (did_work)
continue;

if (delayed_work_time_.is_null()) {
std::unique_lock<std::mutex> lock(mutex_);
event_.wait(lock);
} else {
TimeDelta delay = delayed_work_time_ - TimeTicks::Now();
if (delay > TimeDelta()) {
std::unique_lock<std::mutex> lock(mutex_);
event_.wait_for(lock, std::chrono::milliseconds(delay.InMilliseconds()));
} else {
delayed_work_time_ = TimeTicks();
}
}
}

4. 总结

跨平台messageloop在客户端开发中得到了广泛的应用,很好地处理了客户端网络高并发前提下的数据处理流程。
现在已经已经抽取成独立的公共组件Dispatch-Lite,使用接口参照gcd的接口设计方案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/* init dispatch first */
dispatch_init();

/* dispatch on preallocate logic thread */
dispatch_logic_async([=]{
//your code here
});

/* dispatch on specific thread */
dispatch_thread_async(thread, [=]{
//your code here
});

/* dispatch on current thread after 500ms */
dispatch_current_after(500, [=]{
//your code here
});

有兴趣的同学可前往下载。欢迎拍砖~