文章

OpenResty用户协程

OpenResty中提供的定时器接口ngx.timer,底层是创建了一个协程,可以异步并发的处理一些业务,在一些请求网络资源的场景下,可以利用并发来达到加速的效果。

协程特点

Lua是一个极其精简、高性能的脚本语言。它的协程是非抢占式的,不像golang中协程的调度模型(多线程抢占式),只有当前协程主动放弃执行权(yield),其他协程才有机会得到执行(consume)。OpenResty中使用时要注意这个特点。 看下这个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
server {
    listen       80  default_server;
    server_name  _;
    access_log   logs/access_unknownHost.log  main;

    location = /uri_1 {
        content_by_lua_block {
            local key = ngx.var.arg_key
            local cjson = require("cjson")
            local t = {name='rcy', location='guangzhou', age=27}
            for i = 1, 3 do
                for i=1,500000 do
                    local str = cjson.encode(t)
                    local res = cjson.decode(str)
                end
                ngx.log(ngx.NOTICE, key .. " finished:" .. i)
            end
        }
    }
}

/uri_1的请求会执行繁重的运算任务,在外层的每一个大循环执行一次后就会打印一条日志,然后同时发送两条请求: "http://127.0.0.1/uri_1?key=first" "http://127.0.0.1/uri_1?key=second"

查看日志:

1
2
3
4
5
6
 first finished:1
 first finished:2
 first finished:3
 second finished:1
 second finished:2
 second finished:3

second请求只有在first完成后才会得到处理。有办法让多个请求交替处理吗,那就需要利用OpenResty封装好的API让协程之间执行yield和resume:

1
2
3
4
5
6
7
8
for i = 1, 3 do
    for i=1,500000 do
        local str = cjson.encode(t)
        local res = cjson.decode(str)
    end
    ngx.log(ngx.NOTICE, key .. " finished:" .. i)
    ngx.sleep(0.001)
end

再请求查看日志:

1
2
3
4
5
6
 first finished:1
 first finished:2
 second finished:1
 first finished:3
 second finished:2
 second finished:3

利用timer并发加速

虽然每个worker对应的是单线程但进程,但在IO密集型任务下可以利用并发(多协程)加速任务的完成。

考虑这样一个场景:有一个业务场景需要给100台其他服务器分别发送一个HTTP请求,协程就可以在等待网络资源时让出CPU,资源就绪后唤醒指定协程,从而利用多协程并发达到加速的效果。

代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
local function timerExecTask(premature, tasks, result)
    if true == premature then
        ngx.log(ngx.WARN, "timerExecTask premature true!")
    end

    local taskKey = next(tasks)

    while taskKey ~= nil do
        -- get function
        local execFunc = tasks[taskKey].execFunc
        -- get function parameters
        local params = tasks[taskKey].params
        tasks[taskKey] = nil
        result[taskKey] = execFunc(table.unpack(params))
        taskKey = next(tasks)
    end
end

--[[concurrentNum:并发协程数量
--tasks:需要执行的任务,table类型,每个任务需包含以下key
execFunc:任务函数
params:函数参数
]]--
function ConcurrentExec(concurrentNum, tasks)
    local nkeys = require "table.nkeys"
    local tasksSum = nkeys(tasks)

    local result = {}
    local succTimer = 0
    for _ = 1, math.min(concurrentNum, tasksSum) do
        local ok, err = ngx.timer.at(0, timerExecTask, tasks, result)
        if not ok then
            ngx.log(ngx.ERR, "timer create fail!" .. err)
        else
            succTimer = succTimer + 1
        end
    end

    if 0 == succTimer then
        for taskKey, task in ipairs(tasks) do 
            local execFunc = task.execFunc
            local params = task.params
            result[taskKey] = execFunc(table.unpack(params))
        end
    else
        while nkeys(result) ~= tasksSum do
            ngx.sleep(3)
        end
    end
    return result
end

主协程创建N个协程执行任务,然后只需要等待任务全部被执行完然后放到result表中,也就是判断nkeys(result) ~= tasksSum,利用非抢占的特点,执行协程在从tasks取任务时不会被打断。 当然task需要是一些请求资源、yield、资源就绪、resume特点的执行流程(ngx.socket),如果是一直占用CPU的,并发也就没有意义了,因为底层是一个单进程单线程的worker。

本文由作者按照 CC BY 4.0 进行授权