OpenResty用户协程
OpenResty中提供的定时器接口ngx.timer
,底层是创建了一个协程,可以异步并发的处理一些业务,在一些请求网络资源的场景下,可以利用并发来达到加速的效果。
协程特点
Lua是一个极其精简、高性能的脚本语言。它的协程是非抢占式的,不像golang中协程的调度模型(多线程抢占式),只有当前协程主动放弃执行权(yield),其他协程才有机会得到执行(consume)。OpenResty中使用时要注意这个特点。 看下这个例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
server {
listen 80 default_server;
server_name _;
access_log logs/access_unknownHost.log main;
location = /uri_1 {
content_by_lua_block {
local key = ngx.var.arg_key
local cjson = require("cjson")
local t = {name='rcy', location='guangzhou', age=27}
for i = 1, 3 do
for i=1,500000 do
local str = cjson.encode(t)
local res = cjson.decode(str)
end
ngx.log(ngx.NOTICE, key .. " finished:" .. i)
end
}
}
}
/uri_1的请求会执行繁重的运算任务,在外层的每一个大循环执行一次后就会打印一条日志,然后同时发送两条请求: "http://127.0.0.1/uri_1?key=first"
"http://127.0.0.1/uri_1?key=second"
查看日志:
1
2
3
4
5
6
first finished:1
first finished:2
first finished:3
second finished:1
second finished:2
second finished:3
second请求只有在first完成后才会得到处理。有办法让多个请求交替处理吗,那就需要利用OpenResty封装好的API让协程之间执行yield和resume:
1
2
3
4
5
6
7
8
for i = 1, 3 do
for i=1,500000 do
local str = cjson.encode(t)
local res = cjson.decode(str)
end
ngx.log(ngx.NOTICE, key .. " finished:" .. i)
ngx.sleep(0.001)
end
再请求查看日志:
1
2
3
4
5
6
first finished:1
first finished:2
second finished:1
first finished:3
second finished:2
second finished:3
利用timer并发加速
虽然每个worker对应的是单线程但进程,但在IO密集型任务下可以利用并发(多协程)加速任务的完成。
考虑这样一个场景:有一个业务场景需要给100台其他服务器分别发送一个HTTP请求,协程就可以在等待网络资源时让出CPU,资源就绪后唤醒指定协程,从而利用多协程并发达到加速的效果。
代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
local function timerExecTask(premature, tasks, result)
if true == premature then
ngx.log(ngx.WARN, "timerExecTask premature true!")
end
local taskKey = next(tasks)
while taskKey ~= nil do
-- get function
local execFunc = tasks[taskKey].execFunc
-- get function parameters
local params = tasks[taskKey].params
tasks[taskKey] = nil
result[taskKey] = execFunc(table.unpack(params))
taskKey = next(tasks)
end
end
--[[concurrentNum:并发协程数量
--tasks:需要执行的任务,table类型,每个任务需包含以下key
execFunc:任务函数
params:函数参数
]]--
function ConcurrentExec(concurrentNum, tasks)
local nkeys = require "table.nkeys"
local tasksSum = nkeys(tasks)
local result = {}
local succTimer = 0
for _ = 1, math.min(concurrentNum, tasksSum) do
local ok, err = ngx.timer.at(0, timerExecTask, tasks, result)
if not ok then
ngx.log(ngx.ERR, "timer create fail!" .. err)
else
succTimer = succTimer + 1
end
end
if 0 == succTimer then
for taskKey, task in ipairs(tasks) do
local execFunc = task.execFunc
local params = task.params
result[taskKey] = execFunc(table.unpack(params))
end
else
while nkeys(result) ~= tasksSum do
ngx.sleep(3)
end
end
return result
end
主协程创建N
个协程执行任务,然后只需要等待任务全部被执行完然后放到result
表中,也就是判断nkeys(result) ~= tasksSum
,利用非抢占的特点,执行协程在从tasks
取任务时不会被打断。 当然task
需要是一些请求资源、yield、资源就绪、resume特点的执行流程(ngx.socket
),如果是一直占用CPU的,并发也就没有意义了,因为底层是一个单进程单线程的worker。