Lua Coroutine
Lua Coroutine is a kind of collaborative multithreading, equivalent to a thread. A pair yield-resume switches control from one thread to another.
However, unlike “real” multithreading, coroutines are non preemptive. A coroutine only suspends its execution by explicitly calling a yield
function.
Coroutine Grammar
Method | Description |
---|---|
coroutine.create (f) | Creates a new coroutine, with f must be a function, Returns this new coroutine, an object with type “thread”. |
coroutine.resume (co [, val1, ···]) | Starts or continues the execution of coroutine |
coroutine.yield (…) | Suspends the execution of the calling coroutine. Any arguments to yield are passed as extra results to resume. |
coroutine.running () | Returns the running coroutine plus a boolean, true when the running coroutine is the main one. |
coroutine.status (co) | Returns the status of coroutine co, as a string: “running”, “suspended”,”normal” “dead” |
coroutine.wrap (f) | Creates a new coroutine, Returns a function that resumes the coroutine each time it is called. |
coroutine.isyieldable () | Returns true when the running coroutine can yield. |
Multithreading Example
Let us assume a typical multithreading situation: We want to download several remote files through HTTP. Of course, to download several remote files, we must know how to download one remote file. In this example, we will use the LuaSocket
library. To download a file, we must:
- open a connection to its site
- send a request to the file
- receive the file (in blocks)
- close the connection.
In Lua, we can write this task as follows.
local socket = require "socket"
local host = "www.w3.org"
local file = "/TR/REC-html32.html"
local c = assert(socket.connect(host, 80))
c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")
c:close()
To rewrite the program with coroutines, let us first rewrite the previous download code as a function:
function download (host, file)
local c = assert(socket.connect(host, 80))
local count = 0 -- counts number of bytes read
c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")
while true do
local s, status = receive(c)
count = count + string.len(s)
if status == "closed" then break end
end
c:close()
print(file, count)
end
function receive (connection)
return connection:receive(2^10)
end
For the concurrent implementation, this function must receive data without blocking. Instead, if there is not enough data available, it yields. The new code is like this:
function receive (connection)
connection:timeout(0) -- do not block
local s, status = connection:receive(2^10)
if status == "timeout" then
coroutine.yield(connection)
end
return s, status
end
The call to timeout(0) makes any operation over the connection a non-blocking operation. When the operation status is “timeout”, it means that the operation returned without completion. In this case, the thread yields. The non-false argument passed to yield signals to the dispatcher that the thread is still performing its task. (Later we will see another version where the dispatcher needs the timed-out connection.) Notice that, even in case of a timeout, the connection returns what it read until the timeout, so receive always returns s to its caller. The next function ensures that each download runs in an individual thread:
threads = {} -- list of all live threads
function get (host, file)
-- create coroutine
local co = coroutine.create(function ()
download(host, file)
end)
-- insert it in the list
table.insert(threads, co)
end
The table threads keeps a list of all live threads, for the dispatcher. The dispatcher is simple. It is mainly a loop that goes through all threads, calling one by one. It must also remove from the list the threads that finish their tasks. It stops the loop when there are no more threads to run:
function dispatcher ()
while true do
local n = table.getn(threads)
if n == 0 then break end -- no more threads to run
for i=1,n do
local status, res = coroutine.resume(threads[i])
if not res then -- thread finished its task?
table.remove(threads, i)
break
end
end
end
end
Finally, the main program creates the threads it needs and calls the dispatcher. For instance, to download four documents from the W3C site, the main program could be like this:
host = "www.w3.org"
get(host, "/TR/html401/html40.txt")
get(host,"/TR/2002/REC-xhtml1-20020801/xhtml1.pdf")
get(host,"/TR/REC-html32.html")
get(host,
"/TR/2000/REC-DOM-Level-2-Core-20001113/DOM2-Core.txt")
dispatcher() -- main loop
My machine takes six seconds to download those four files using coroutines. With the sequential implementation, it takes more than twice that time (15 seconds). Despite the speedup, this last implementation is far from optimal. Everything goes fine while at least one thread has something to read. However, when no thread has data to read, the dispatcher does a busy wait, going from thread to thread only to check that they still have no data. As a result, this coroutine implementation uses almost 30 times more CPU than the sequential solution.
To avoid this behavior, we can use the select function from LuaSocket. It allows a program to block while waiting for a status change in a group of sockets. The changes in our implementation are small. We only have to change the dispatcher. The new version is like this:
function dispatcher ()
while true do
local n = table.getn(threads)
if n == 0 then break end -- no more threads to run
local connections = {}
for i=1,n do
local status, res = coroutine.resume(threads[i])
if not res then -- thread finished its task?
table.remove(threads, i)
break
else -- timeout
table.insert(connections, res)
end
end
if table.getn(connections) == n then
socket.select(connections)
end
end
end
Along the inner loop, this new dispatcher collects the timed-out connections in table connections. Remember that receive passes such connections to yield; thus resume returns them. When all connections time out, the dispatcher calls select to wait for any of those connections to change status. This final implementation runs as fast as the first implementation with coroutines. Moreover, as it does no busy waits, it uses just a little more CPU than the sequential implementation.