Skip to content

Commit

Permalink
add distributed tracing via log
Browse files Browse the repository at this point in the history
  • Loading branch information
findstr committed Jul 1, 2023
1 parent 2e165d1 commit 1f2843c
Show file tree
Hide file tree
Showing 11 changed files with 251 additions and 37 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ SRC_FILE = \
silly_env.c \
silly_malloc.c \
silly_log.c \
silly_trace.c \
silly_monitor.c \

SRC = $(addprefix $(SRC_PATH)/, $(SRC_FILE))
Expand Down
46 changes: 46 additions & 0 deletions lualib-src/lualib-core.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "silly.h"
#include "compiler.h"
#include "silly_trace.h"
#include "silly_log.h"
#include "silly_run.h"
#include "silly_worker.h"
Expand Down Expand Up @@ -432,7 +433,47 @@ lsendsize(lua_State *L)
return 1;
}

static int
ltracespan(lua_State *L)
{
silly_trace_span_t span;
span = (silly_trace_span_t)luaL_checkinteger(L, 1);
silly_trace_span(span);
return 0;
}

static int
ltracenew(lua_State *L)
{
silly_trace_id_t traceid;
traceid = silly_trace_new();
lua_pushinteger(L, (lua_Integer)traceid);
return 1;
}

static int
ltraceset(lua_State *L)
{
silly_trace_id_t traceid;
if lua_isnoneornil(L, 1) {
traceid = TRACE_WORKER_ID;
} else {
traceid = (silly_trace_id_t)luaL_checkinteger(L, 1);

}
traceid = silly_trace_set(traceid);
lua_pushinteger(L, (lua_Integer)traceid);
return 1;
}

static int
ltraceget(lua_State *L)
{
silly_trace_id_t traceid;
traceid = silly_trace_get();
lua_pushinteger(L, (lua_Integer)traceid);
return 1;
}

int
luaopen_sys_core_c(lua_State *L)
Expand All @@ -446,6 +487,11 @@ luaopen_sys_core_c(lua_State *L)
{"tostring", ltostring},
{"getpid", lgetpid},
{"exit", lexit},
//trace
{"trace_span", ltracespan},
{"trace_new", ltracenew},
{"trace_set", ltraceset},
{"trace_get", ltraceget},
//socket
{"tcp_connect", ltcpconnect},
{"tcp_listen", ltcplisten},
Expand Down
8 changes: 7 additions & 1 deletion lualib-src/lualib-netpacket.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "silly.h"
#include "silly_log.h"
#include "silly_trace.h"
#include "silly_malloc.h"

#define DEFAULT_QUEUE_SIZE 2048
Expand Down Expand Up @@ -335,6 +336,7 @@ lmsgpack(lua_State *L)
struct rpc_cookie {
cmd_t cmd;
session_t session;
silly_trace_id_t traceid;
};

static int
Expand All @@ -357,7 +359,8 @@ lrpcpop(lua_State *L)
lua_pushinteger(L, size);
lua_pushinteger(L, rpc->cmd);
lua_pushinteger(L, rpc->session);
return 5;
lua_pushinteger(L, (lua_Integer)rpc->traceid);
return 6;
}

static int
Expand All @@ -370,13 +373,15 @@ lrpcpack(lua_State *L)
struct rpc_cookie *rpc;
int stk = 1;
session_t session;
silly_trace_id_t traceid;
str = getbuffer(L, &stk, &size);
if (size > (USHRT_MAX - sizeof(struct rpc_cookie))) {
luaL_error(L, "netpacket.pack data large then:%d\n",
USHRT_MAX - sizeof(struct rpc_cookie));
}
cmd = luaL_checkinteger(L, stk);
session = luaL_checkinteger(L, stk+1);
traceid = luaL_checkinteger(L, stk+2);
body = size + sizeof(struct rpc_cookie);
p = silly_malloc(2 + body);
p[0] = (body >> 8) & 0xff;
Expand All @@ -386,6 +391,7 @@ lrpcpack(lua_State *L)
rpc = (struct rpc_cookie *)&p[2 + size];
rpc->cmd = cmd;
rpc->session = session;
rpc->traceid = traceid;
lua_pushlightuserdata(L, p);
lua_pushinteger(L, 2 + body);
return 2;
Expand Down
40 changes: 20 additions & 20 deletions lualib/cluster/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ local function server_listen(self)
end

function EVENT.data()
local fd, buf, size, cmd, session = np.rpcpop(queue)
local fd, buf, size, cmd, session, traceid = np.rpcpop(queue)
if not fd then
return
end
local otrace = core.trace(traceid)
core.fork(EVENT.data)
while true do
local dat
Expand Down Expand Up @@ -83,14 +84,15 @@ local function server_listen(self)
end
local bodydat, sz = proto:encode(ret, res, true)
bodydat, sz = proto:pack(bodydat, sz, true)
tcp_send(fd, np.rpcpack(bodydat, sz, ret, session))
tcp_send(fd, np.rpcpack(bodydat, sz, ret, session, traceid))
--next
fd, buf, size, cmd, session = np.rpcpop(queue)
fd, buf, size, cmd, session, traceid = np.rpcpop(queue)
if not fd then
return
end
core.trace(traceid)
end

core.trace(otrace)
end
local callback = function(type, fd, message, ...)
np.message(queue, message)
Expand Down Expand Up @@ -183,7 +185,7 @@ local function doconnect(self)
end

function EVENT.data()
local fd, d, sz, cmd, session = np.rpcpop(queue)
local fd, d, sz, cmd, session, _ = np.rpcpop(queue)
if not fd then
return
end
Expand All @@ -210,7 +212,7 @@ local function doconnect(self)
self.ackcmd[session] = cmd
core.wakeup(co, body)
--next
fd, d, sz, cmd, session = np.rpcpop(queue)
fd, d, sz, cmd, session, _ = np.rpcpop(queue)
if not fd then
break
end
Expand Down Expand Up @@ -277,35 +279,33 @@ local function waitfor(self, session)
return body, cmd
end

function client.send(self, cmd, body)
local function send_request(self, cmd, body)
local ok = checkconnect(self)
if not ok then
return ok, "closed"
return false, "closed"
end
local proto = self.__proto
if type(cmd) == "string" then
cmd = proto:tag(cmd)
end
local session = core.genid()
local traceid = core.tracepropagate()
local body, sz = proto:encode(cmd, body, true)
body, sz = proto:pack(body, sz, true)
return tcp_send(self.fd, np.rpcpack(body, sz, cmd, session))
local ok = tcp_send(self.fd, np.rpcpack(body, sz, cmd, session, traceid))
if not ok then
return false, "send fail"
end
return true, session
end

client.send = send_request

function client.call(self, cmd, body)
local ok = checkconnect(self)
local ok, session = send_request(self, cmd, body)
if not ok then
return nil, "closed"
end
local proto = self.__proto
if type(cmd) == "string" then
cmd = proto:tag(cmd)
return false, session
end
body = body or NIL
local session = core.genid()
local body, sz = proto:encode(cmd, body, true)
body, sz = proto:pack(body, sz, true)
tcp_send(self.fd, np.rpcpack(body, sz, cmd, session))
return waitfor(self, session)
end

Expand Down
27 changes: 26 additions & 1 deletion lualib/sys/core.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ local weakmt = {__mode="kv"}
local log_info = logger.info
local log_error = logger.error
local readctrl = assert(c.readctrl)
local trace_new = assert(c.trace_new)
local trace_set = assert(c.trace_set)
local trace_get = assert(c.trace_get)
local trace_span = assert(c.trace_span)

core.genid = c.genid
core.getpid = c.getpid
core.version = c.version()
Expand All @@ -34,6 +39,7 @@ end
--coroutine
--state migrate(RUN (WAIT->READY)/SLEEP RUN)
local task_status = setmetatable({}, weakmt)
local task_traceid = setmetatable({}, weakmt)
local task_running = nil
local cocreate = coroutine.create
local corunning = coroutine.running
Expand All @@ -45,7 +51,9 @@ local function task_resume(t, ...)
local save = task_running
task_status[t] = "RUN"
task_running = t
local traceid = trace_set(task_traceid[t])
local ok, err = coresume(t, ...)
trace_set(traceid)
task_running = save
if not ok then
task_status[t] = nil
Expand All @@ -68,6 +76,21 @@ local function core_pcall(f, ...)
return xpcall(f, errmsg, ...)
end

core.tracespan = trace_span
core.tracepropagate = trace_new
function core.tracenew()
local traceid = task_traceid[task_running]
if traceid then
return traceid
end
return trace_new()
end

function core.trace(id)
task_traceid[task_running] = id
return (trace_set(id))
end

function core.error(errmsg)
log_error(errmsg)
log_error(traceback())
Expand All @@ -94,7 +117,9 @@ local function task_create(f)
while true do
local ret
f = nil
copool[#copool + 1] = corunning()
local co = corunning()
task_traceid[co] = nil
copool[#copool + 1] = co
ret, f = coyield("EXIT")
if ret ~= "STARTUP" then
log_error("[sys.core] task create", ret)
Expand Down
2 changes: 2 additions & 0 deletions silly-src/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <lauxlib.h>
#include <string.h>
#include "silly.h"
#include "silly_trace.h"
#include "silly_log.h"
#include "silly_env.h"
#include "silly_timer.h"
Expand Down Expand Up @@ -222,6 +223,7 @@ int main(int argc, char *argv[])
printf("USAGE:%s <config file> --parameters\n", argv[0]);
return -1;
}
silly_trace_init();
silly_env_init();
silly_timer_init();
config.selfname = selfname(argv[0]);
Expand Down
5 changes: 5 additions & 0 deletions silly-src/silly_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,9 @@

#define LOG_BUF_SIZE (4*1024)

#define TRACE_WORKER_ID (0)
#define TRACE_TIMER_ID (1)
#define TRACE_SOCKET_ID (2)
#define TRACE_MONITOR_ID (3)

#endif
Loading

0 comments on commit 1f2843c

Please sign in to comment.