Skip to content

Commit

Permalink
remove cluster.rpc and cluster.msg
Browse files Browse the repository at this point in the history
add cluster.lua for more flexible rpc
  • Loading branch information
findstr committed Jul 15, 2024
1 parent 228f784 commit 997e710
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 103 deletions.
28 changes: 17 additions & 11 deletions examples/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,28 @@ local function marshal(cmd, body)
if type(cmd) == "string" then
cmd = proto:tag(cmd)
end
print("marshal", cmd, body)
local dat, size = proto:encode(cmd, body, true)
local buf, size = proto:pack(dat, size, true)
return cmd, buf, size
end

local function call(msg, fd)
print("callee", msg.txt, fd)
return "pong", msg
end

local router = setmetatable({}, {__index = function(t, k) return call end})
local callret = {
["ping"] = "pong",
[0x01] = "pong",
}

local server = cluster.new {
marshal = marshal,
unmarshal = unmarshal,
router = router,
callret = callret,
accept = function(fd, addr)
print("accept", fd, addr)
end,

call = function(msg, cmd, fd)
print("callee", msg.txt, fd)
return msg
end,
close = function(fd, errno)
print("close", fd, errno)
end,
Expand All @@ -53,7 +55,11 @@ server.listen("127.0.0.1:9999")
local client = cluster.new {
marshal = marshal,
unmarshal = unmarshal,
router = router,
callret = callret,
call = function(msg, cmd, fd)
print("callee", msg.txt, fd)
return msg
end,
close = function(fd, errno)
print("close", fd, errno)
end,
Expand All @@ -67,9 +73,9 @@ core.start(function()
for j = 1, 10000 do
local txt = crypto.randomkey(5)
local ack, cmd = client.ping(fd, {txt = txt})
print("caller", fd, txt, ack.txt)
print("caller", fd, cmd, txt, ack.txt)
assert(ack.txt == txt)
assert(cmd == proto:tag("pong"))
assert(cmd == "pong")
core.sleep(1000)
end
end)
Expand Down
15 changes: 4 additions & 11 deletions lualib/core/cluster.lua
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ local function init_event(self, conf)
local fdaddr = self.__fdaddr
local callret = self.__callret
local ctx = self.__ctx
local router = assert(conf.router, "router")
local call = assert(conf.call, "call")
local close = assert(conf.close, "close")
local marshal = assert(conf.marshal, "marshal")
local unmarshal = assert(conf.unmarshal, "unmarshal")
Expand Down Expand Up @@ -180,22 +180,15 @@ local function init_event(self, conf)
core.fork(EVENT.data)
while true do
local dat
--pars
if cmd then
local fn = router[cmd]
if not fn then
np.drop(buf)
logger.error("[rpc.server] no router", cmd)
break
end
if cmd then --rpc request
local body = unmarshal(cmd, buf, size)
np.drop(buf)
if not body then
logger.error("[rpc.server] decode fail",
session, cmd)
break
end
local ok, res = pcall(fn, body, fd)
local ok, res = pcall(call, body, cmd, fd)
if not ok then
logger.error("[rpc.server] call error", res)
break
Expand All @@ -208,7 +201,7 @@ local function init_event(self, conf)
res = res or NIL
local _, buf, sz = marshal(ackname, res)
tcp_send(fd, np.ack(session, buf, sz))
else
else -- rpc acknowledge
local cmd = ackcmd[session]
if not cmd then --timeout
np.drop(buf)
Expand Down
3 changes: 2 additions & 1 deletion lualib/core/logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ local core = require "core"
local env = require "core.env"
local c = require "core.logger.c"

local function nop()
local function nop(...)
end

local logger = {
Expand Down Expand Up @@ -54,3 +54,4 @@ core.signal("SIGUSR1", function(_)
end)

return logger

1 change: 0 additions & 1 deletion test/test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ local modules = {
"testwakeup",
"testwaitgroup",
"testmutex",
"testmulticast",
"testnetstream",
"testnetpacket",
"testchannel",
Expand Down
60 changes: 0 additions & 60 deletions test/testmulticast.lua

This file was deleted.

18 changes: 9 additions & 9 deletions test/testnetpacket.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ local function buildpacket()
local len = math.random(1, 30)
local raw = testaux.randomdata(len)
testaux.asserteq(#raw, len, "random packet length")
local pk = string.pack(">I2", #raw) .. string.pack("<c" .. #raw, raw)
local pk = string.pack(">I2", #raw + 16) .. string.pack("<c" .. #raw, raw) .. string.pack("<I8I8", 0, 0)
return raw, pk
end

Expand All @@ -35,6 +35,7 @@ end
local function randompush(sid, pk)
local i = 1
local len = #pk + 1
local buf = {}
while i < len do
local last = len - i
if last > 2 then
Expand All @@ -43,12 +44,14 @@ local function randompush(sid, pk)
end
local x = pk:sub(i, i + last - 1)
i = i + last;
buf[#buf + 1] = x
justpush(sid, x)
end
assert(table.concat(buf), pk)
end

local function pushbroken(sid, pk)
local pk2 = pk:sub(1, #pk - 1)
local pk2 = pk:sub(1, #pk - 17)
randompush(sid, pk2)
end

Expand Down Expand Up @@ -100,13 +103,6 @@ local function testclear()
local fd, data = popdata()
testaux.asserteq(fd, nil, "netpacket broken test fd")
testaux.asserteq(data, nil, "netpacket broken test data")
randompush(sid, pk)
local fd, data = popdata()
testaux.asserteq(fd, sid, "netpacket broken test fd")
testaux.assertneq(data, raw, "netpacket broken test data")
local fd, data = popdata()
testaux.asserteq(fd, nil, "netpacket broken test fd")
testaux.asserteq(data, nil, "netpacket broken test data")
np.clear(BUFF, sid)
randompush(sid, pk)
local fd, data = popdata()
Expand Down Expand Up @@ -145,6 +141,10 @@ local function testexpand()
end

collectgarbage("collect")
local seedx, seedy = math.randomseed()
print("seed", seedx, seedy)
math.randomseed(1721030230,139887399596712)

BUFF = np.create()
testhashconflict_part1()
testpacket(justpush)
Expand Down
16 changes: 6 additions & 10 deletions test/testrpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,6 @@ end
local case = case_one
local accept_fd
local accept_addr
local router = setmetatable({}, {__index = function(t, k)
local fn = function(msg, fd)
return case(msg, k, fd)
end
t[k] = fn
return fn
end})

local callret = {
["foo"] = "bar",
Expand All @@ -72,12 +65,13 @@ local server = cluster.new {
marshal = marshal,
unmarshal = unmarshal,
callret = callret,
router = router,
accept = function(fd, addr)
accept_fd = fd
accept_addr = addr
end,

call = function(msg, cmd, fd)
return case(msg, cmd, fd)
end,
close = function(fd, errno)
end,
}
Expand All @@ -89,7 +83,9 @@ local client = cluster.new {
marshal = marshal,
unmarshal = unmarshal,
callret = callret,
router = router,
call = function(msg, cmd, fd)
return case(msg, cmd, fd)
end,
close = function(fd, errno)
end,
}
Expand Down

0 comments on commit 997e710

Please sign in to comment.