You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
koreader/frontend/ui/message/streammessagequeueserver.lua

87 lines
2.8 KiB
Lua

local ffi = require("ffi")
local logger = require("logger")
local MessageQueue = require("ui/message/messagequeue")
local _ = require("ffi/zeromq_h")
local czmq = ffi.load("libs/libczmq.so.1")
local C = ffi.C
local StreamMessageQueueServer = MessageQueue:extend{
host = nil,
port = nil,
}
function StreamMessageQueueServer:start()
self.context = czmq.zctx_new()
self.socket = czmq.zsocket_new(self.context, C.ZMQ_STREAM)
self.poller = czmq.zpoller_new(self.socket, nil)
local endpoint = string.format("tcp://%s:%d", self.host, self.port)
logger.dbg("StreamMessageQueueServer: Binding to endpoint", endpoint)
local rc = czmq.zsocket_bind(self.socket, endpoint)
-- If success, rc is port number
if rc == -1 then
logger.err("StreamMessageQueueServer: Cannot bind to ", endpoint)
end
end
function StreamMessageQueueServer:stop()
if self.poller ~= nil then
czmq.zpoller_destroy(ffi.new('zpoller_t *[1]', self.poller))
end
if self.socket ~= nil then
czmq.zsocket_destroy(self.context, self.socket)
end
if self.context ~= nil then
czmq.zctx_destroy(ffi.new('zctx_t *[1]', self.context))
end
end
function StreamMessageQueueServer:handleZframe(frame)
local size = czmq.zframe_size(frame)
local data = nil
if size > 0 then
local frame_data = czmq.zframe_data(frame)
if frame_data ~= nil then
data = ffi.string(frame_data, size)
end
end
czmq.zframe_destroy(ffi.new('zframe_t *[1]', frame))
return data
end
function StreamMessageQueueServer:waitEvent()
local request, id
while czmq.zpoller_wait(self.poller, 0) ~= nil do
-- See about ZMQ_STREAM and these 2 frames at http://hintjens.com/blog:42
local id_frame = czmq.zframe_recv(self.socket)
if id_frame ~= nil then
id = id_frame
end
local frame = czmq.zframe_recv(self.socket)
if frame ~= nil then
local data = self:handleZframe(frame)
if data then
logger.dbg("StreamMessageQueueServer: Received data: ", data)
request = data
end
end
end
if self.receiveCallback and request ~= nil then
self.receiveCallback(request, id)
end
end
function StreamMessageQueueServer:send(data, id_frame)
czmq.zframe_send(ffi.new('zframe_t *[1]', id_frame), self.socket, C.ZFRAME_MORE + C.ZFRAME_REUSE)
czmq.zmq_send(self.socket, ffi.cast("unsigned char*", data), #data, C.ZFRAME_MORE)
-- Note: We can't use czmq.zstr_send(self.socket, data), which would stop on the first
-- null byte in data (Lua strings can have null bytes inside).
-- Close connection
czmq.zframe_send(ffi.new('zframe_t *[1]', id_frame), self.socket, C.ZFRAME_MORE)
czmq.zmq_send(self.socket, nil, 0, 0)
end
return StreamMessageQueueServer