From 63cb2013828c779bb704b55ee46e2f7ddecb1589 Mon Sep 17 00:00:00 2001 From: findstr Date: Sat, 8 Jun 2024 20:59:03 +0800 Subject: [PATCH] remove cluster.rpc and cluster.msg add cluster.lua for more flexible rpc --- examples/rpc.lua | 28 +++++++++++-------- lualib/core/cluster.lua | 15 +++-------- lualib/core/logger.lua | 3 ++- test/test.lua | 1 - test/testmulticast.lua | 60 ----------------------------------------- test/testnetpacket.lua | 4 +-- test/testrpc.lua | 16 +++++------ 7 files changed, 31 insertions(+), 96 deletions(-) delete mode 100644 test/testmulticast.lua diff --git a/examples/rpc.lua b/examples/rpc.lua index 0c71957..80bd14c 100644 --- a/examples/rpc.lua +++ b/examples/rpc.lua @@ -23,26 +23,28 @@ local function marshal(cmd, body) if type(cmd) == "string" then cmd = proto:tag(cmd) end + print("marshal", cmd, body) local dat, size = proto:encode(cmd, body, true) local buf, size = proto:pack(dat, size, true) return cmd, buf, size end -local function call(msg, fd) - print("callee", msg.txt, fd) - return "pong", msg -end - -local router = setmetatable({}, {__index = function(t, k) return call end}) +local callret = { + ["ping"] = "pong", + [0x01] = "pong", +} local server = cluster.new { marshal = marshal, unmarshal = unmarshal, - router = router, + callret = callret, accept = function(fd, addr) print("accept", fd, addr) end, - + call = function(msg, cmd, fd) + print("callee", msg.txt, fd) + return msg + end, close = function(fd, errno) print("close", fd, errno) end, @@ -53,7 +55,11 @@ server.listen("127.0.0.1:9999") local client = cluster.new { marshal = marshal, unmarshal = unmarshal, - router = router, + callret = callret, + call = function(msg, cmd, fd) + print("callee", msg.txt, fd) + return msg + end, close = function(fd, errno) print("close", fd, errno) end, @@ -67,9 +73,9 @@ core.start(function() for j = 1, 10000 do local txt = crypto.randomkey(5) local ack, cmd = client.ping(fd, {txt = txt}) - print("caller", fd, txt, ack.txt) + print("caller", fd, cmd, txt, ack.txt) assert(ack.txt == txt) - assert(cmd == proto:tag("pong")) + assert(cmd == "pong") core.sleep(1000) end end) diff --git a/lualib/core/cluster.lua b/lualib/core/cluster.lua index c926a38..03328cf 100644 --- a/lualib/core/cluster.lua +++ b/lualib/core/cluster.lua @@ -143,7 +143,7 @@ local function init_event(self, conf) local fdaddr = self.__fdaddr local callret = self.__callret local ctx = self.__ctx - local router = assert(conf.router, "router") + local call = assert(conf.call, "call") local close = assert(conf.close, "close") local marshal = assert(conf.marshal, "marshal") local unmarshal = assert(conf.unmarshal, "unmarshal") @@ -180,14 +180,7 @@ local function init_event(self, conf) core.fork(EVENT.data) while true do local dat - --pars - if cmd then - local fn = router[cmd] - if not fn then - np.drop(buf) - logger.error("[rpc.server] no router", cmd) - break - end + if cmd then --rpc request local body = unmarshal(cmd, buf, size) np.drop(buf) if not body then @@ -195,7 +188,7 @@ local function init_event(self, conf) session, cmd) break end - local ok, res = pcall(fn, body, fd) + local ok, res = pcall(call, body, cmd, fd) if not ok then logger.error("[rpc.server] call error", res) break @@ -208,7 +201,7 @@ local function init_event(self, conf) res = res or NIL local _, buf, sz = marshal(ackname, res) tcp_send(fd, np.ack(session, buf, sz)) - else + else -- rpc acknowledge local cmd = ackcmd[session] if not cmd then --timeout np.drop(buf) diff --git a/lualib/core/logger.lua b/lualib/core/logger.lua index 9225520..01e7697 100644 --- a/lualib/core/logger.lua +++ b/lualib/core/logger.lua @@ -2,7 +2,7 @@ local core = require "core" local env = require "core.env" local c = require "core.logger.c" -local function nop() +local function nop(...) end local logger = { @@ -54,3 +54,4 @@ core.signal("SIGUSR1", function(_) end) return logger + diff --git a/test/test.lua b/test/test.lua index a333b33..644ba76 100644 --- a/test/test.lua +++ b/test/test.lua @@ -14,7 +14,6 @@ local modules = { "testwakeup", "testwaitgroup", "testmutex", - "testmulticast", "testnetstream", "testnetpacket", "testchannel", diff --git a/test/testmulticast.lua b/test/testmulticast.lua deleted file mode 100644 index e2317ba..0000000 --- a/test/testmulticast.lua +++ /dev/null @@ -1,60 +0,0 @@ -local core = require "core" -local testaux = require "test.testaux" -local msg = require "core.cluster.msg" -local zproto = require "zproto" -local logic = zproto:parse [[ -test 0xff { - .str:string 1 -} -]] - - -local server -local accept = {} -local client = {} -local recv = {} - -server = msg.listen { - proto = logic, - addr = "127.0.0.1:8002", - accept = function(fd, addr) - accept[#accept + 1] = fd - --print("accept", addr) - end, - close = function(fd, errno) - --print("close", fd, errno) - end, - data = function(fd, cmd, obj) - local m, sz = server:multipack(cmd, obj, #accept) - for _, fd in pairs(accept) do - local ok = server:multicast(fd, m, sz) - testaux.assertneq(fd, nil, "multicast test send") - testaux.asserteq(ok, true, "multicast test send") - end - end -} -testaux.asserteq(not not server, true, "multicast test listen") - -local inst -for i = 1, 10 do - inst = msg.connect { - proto = logic, - addr = "127.0.0.1:8002", - data = function(fd, cmd, obj) - testaux.asserteq(obj.str, "testmulticast", "muticast validate data") - recv[i] = true - end, - close = function(fd, errno) - - end - } - client[i] = inst -end -inst:send("test", {str = "testmulticast"}) -core.sleep(1000) -for k, _ in pairs(client) do - testaux.asserteq(recv[k], true, "multicast recv count validate") -end -server:stop() - - diff --git a/test/testnetpacket.lua b/test/testnetpacket.lua index 2956d68..d0ddabf 100644 --- a/test/testnetpacket.lua +++ b/test/testnetpacket.lua @@ -23,7 +23,7 @@ local function buildpacket() local len = math.random(1, 30) local raw = testaux.randomdata(len) testaux.asserteq(#raw, len, "random packet length") - local pk = string.pack(">I2", #raw) .. string.pack("I2", #raw + 16) .. string.pack("