Skip to content

Commit

Permalink
Improve Interdc communication (#421)
Browse files Browse the repository at this point in the history
* read log entries in range with continuation, limit log response entries, add log response timeout
* add test cases, add comments
* check if mecked modules are called in tests
* trying to print logs to console in travis
* add timetrap to multiple_dcs_node_failure_SUITE
* fix linter errors
* fix logging vnode read_from_to was returning operations from all DCs, but should only consider local
operations and local operation-ids, which it does now.

Co-authored-by: Peter Zeller <p_zeller@cs.uni-kl.de>
  • Loading branch information
dajenet and peterzeller committed Aug 31, 2020
1 parent 4b813d4 commit daaf815
Show file tree
Hide file tree
Showing 8 changed files with 477 additions and 92 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ script:
- rebar3 as test coveralls send
- make dialyzer
- make lint
after_failure:
- find logs -name '*.log' -exec echo -e "\e[41m#####################################" \; -exec echo {} \; -exec echo -e "#####################################\e[0m" \; -exec cat {} \;
notifications:
email: bieniusa@cs.uni-kl.de
sudo: required
Expand Down
4 changes: 4 additions & 0 deletions include/antidote.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@
-define(SAFE_TIME, true).
%% Version of log records being used
-define(LOG_RECORD_VERSION, 0).
%% Max number of entries per log get_entries request.
-define(LOG_REQUEST_MAX_ENTRIES, 1000).
%% Log request timeout in milliseconds.
-define(LOG_REQUEST_TIMEOUT, 60000).
%% Bounded counter manager parameters.
%% Period during which transfer requests from the same DC to the same key are ignored.
-define(GRACE_PERIOD, 1000000). % in Microseconds
Expand Down
5 changes: 3 additions & 2 deletions include/inter_dc_repl.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
pdcid :: pdcid(),
last_observed_opid :: non_neg_integer() | init,
queue :: queue:queue(),
logging_enabled :: boolean()
logging_enabled :: boolean(),
log_reader_timeout :: integer()
}).
-type inter_dc_sub_buf() :: #inter_dc_sub_buf{}.
-type inter_dc_sub_buf() :: #inter_dc_sub_buf{}.
15 changes: 8 additions & 7 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,14 @@
{extended_start_script, true}
]}.

{profiles,[
{lint, [
{plugins, [{rebar3_lint, {git, "https://github.com/project-fifo/rebar3_lint.git", {tag, "v0.1.10"}}}]}
]},
{test, [
{erl_opts, [warnings_as_errors, debug_info, no_inline_list_funcs]},
{plugins, [{coveralls, {git, "https://github.com/markusn/coveralls-erl", {branch, "master"}}}]}]}
{profiles, [
{lint, [
{plugins, [{rebar3_lint, {git, "https://github.com/project-fifo/rebar3_lint.git", {tag, "v0.1.10"}}}]}
]},
{test, [
{erl_opts, [warnings_as_errors, debug_info, no_inline_list_funcs]},
{plugins, [{coveralls, {git, "https://github.com/markusn/coveralls-erl", {branch, "master"}}}]},
{deps, [meck]}]}
]}.

% configuration of style rules
Expand Down
42 changes: 18 additions & 24 deletions src/inter_dc_query_response.erl
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ init([Num]) ->

handle_cast({get_entries, BinaryQuery, QueryState}, State) ->
{read_log, Partition, From, To} = binary_to_term(BinaryQuery),
Entries = get_entries_internal(Partition, From, To),
LimitedTo = erlang:min(To, From + ?LOG_REQUEST_MAX_ENTRIES), %% Limit number of returned entries
Entries = get_entries_internal(Partition, From, LimitedTo),
BinaryResp = term_to_binary({{dc_utilities:get_my_dc_id(), Partition}, Entries}),
BinaryPartition = inter_dc_txn:partition_to_bin(Partition),
FullResponse = <<BinaryPartition/binary, BinaryResp/binary>>,
Expand All @@ -100,34 +101,27 @@ handle_info(_Info, State) ->

-spec get_entries_internal(partition_id(), log_opid(), log_opid()) -> [interdc_txn()].
get_entries_internal(Partition, From, To) ->
Node = case lists:member(Partition, dc_utilities:get_my_partitions()) of
true -> node();
false ->
log_utilities:get_my_node(Partition)
end,
Logs = log_read_range(Partition, Node, From, To),
Asm = log_txn_assembler:new_state(),
{OpLists, _} = log_txn_assembler:process_all(Logs, Asm),
Txns = lists:map(fun(TxnOps) -> inter_dc_txn:from_ops(TxnOps, Partition, none) end, OpLists),
%% This is done in order to ensure that we only send the transactions we committed.
%% We can remove this once the read_log_range is reimplemented.
lists:filter(fun inter_dc_txn:is_local/1, Txns).
Node = case lists:member(Partition, dc_utilities:get_my_partitions()) of
true -> node();
false ->
log_utilities:get_my_node(Partition)
end,
Logs = log_read_range(Partition, Node, From, To),
Asm = log_txn_assembler:new_state(),
{OpLists, _} = log_txn_assembler:process_all(Logs, Asm),
%% Transforming operation lists to transactions and set PrevLogOpId
ProcessedOps = lists:map(fun(Ops) -> [FirstOp|_] = Ops, {Ops, #op_number{local = FirstOp#log_record.op_number#op_number.local - 1}} end, OpLists),
Txns = lists:map(fun({TxnOps, PrevLogOpId}) -> inter_dc_txn:from_ops(TxnOps, Partition, PrevLogOpId) end, ProcessedOps),
%% This is done in order to ensure that we only send the transactions we committed.
%% We can remove this once the read_log_range is reimplemented.
lists:filter(fun inter_dc_txn:is_local/1, Txns).

%% TODO: re-implement this method efficiently once the log provides efficient access by partition and DC (Santiago, here!)
%% TODO: also fix the method to provide complete snapshots if the log was trimmed
-spec log_read_range(partition_id(), node(), log_opid(), log_opid()) -> [log_record()].
log_read_range(Partition, Node, From, To) ->
{ok, RawOpList} = logging_vnode:read({Partition, Node}, [Partition]),
OpList = lists:map(fun({_Partition, Op}) -> Op end, RawOpList),
filter_operations(OpList, From, To).

-spec filter_operations([log_record()], log_opid(), log_opid()) -> [log_record()].
filter_operations(Ops, Min, Max) ->
F = fun(Op) ->
Num = Op#log_record.op_number#op_number.local,
(Num >= Min) and (Max >= Num)
end,
lists:filter(F, Ops).
{ok, RawOpList} = logging_vnode:read_from_to({Partition, Node}, [Partition], From, To),
lists:map(fun({_Partition, Op}) -> Op end, RawOpList).

%% @private
terminate(_Reason, _State) ->
Expand Down
153 changes: 134 additions & 19 deletions src/inter_dc_sub_buf.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
%% Expected time to wait until the logging_vnode is started
-define(LOG_STARTUP_WAIT, 1000).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.

%% API
-export([
new_state/1,
Expand All @@ -53,7 +57,8 @@ new_state(PDCID) ->
pdcid = PDCID,
last_observed_opid = init,
queue = queue:new(),
logging_enabled = EnableLogging
logging_enabled = EnableLogging,
log_reader_timeout = 0
}.

-spec process({txn, interdc_txn()} | {log_reader_resp, [interdc_txn()]}, inter_dc_sub_buf()) -> inter_dc_sub_buf().
Expand All @@ -77,23 +82,24 @@ process({txn, Txn}, State = #inter_dc_sub_buf{last_observed_opid = init, pdcid =
State
end;
process({txn, Txn}, State = #inter_dc_sub_buf{state_name = normal}) -> process_queue(push(Txn, State));
process({txn, Txn}, State = #inter_dc_sub_buf{state_name = buffering}) ->
?LOG_INFO("Buffering txn in ~p", [State#inter_dc_sub_buf.pdcid]),
push(Txn, State);

process({log_reader_resp, Txns}, State = #inter_dc_sub_buf{queue = Queue, state_name = buffering}) ->
ok = lists:foreach(fun deliver/1, Txns),
NewLast = case queue:peek(Queue) of
empty -> State#inter_dc_sub_buf.last_observed_opid;
{value, Txn} -> Txn#interdc_txn.prev_log_opid#op_number.local
end,
NewState = State#inter_dc_sub_buf{last_observed_opid = NewLast},
process_queue(NewState);
process({txn, Txn}, State = #inter_dc_sub_buf{state_name = buffering, log_reader_timeout = Timeout}) ->
%% Buffering incoming transactions while waiting for log reader response.
%% Change to normal state, if response timeout exceed, to query for response again.
?LOG_INFO("Buffering txn in ~p", [State#inter_dc_sub_buf.pdcid]),
Time = erlang:system_time(millisecond),
if
Timeout < Time ->
?LOG_WARNING("Got timeout for log_reader_resp in ~p", [State#inter_dc_sub_buf.pdcid]),
process_queue(push(Txn, State#inter_dc_sub_buf{state_name = normal}));
true ->
push(Txn, State)
end;

process({log_reader_resp, Txns}, State = #inter_dc_sub_buf{state_name = normal}) ->
%% This case must not happen
?LOG_CRITICAL("Received unexpected log_reader_resp messages in state normal Message ~p. State ~p", [Txns, State]),
State.
process({log_reader_resp, Txns}, State = #inter_dc_sub_buf{queue = Queue}) ->
%% Add log response to buffer and process.
NewQueue = queue:join(queue:from_list(Txns), Queue),
NewState = State#inter_dc_sub_buf{queue = NewQueue},
process_queue(NewState).


%%%% Methods ----------------------------------------------------------------+
Expand All @@ -119,7 +125,8 @@ process_queue(State = #inter_dc_sub_buf{queue = Queue, last_observed_opid = Last
try
case query(State#inter_dc_sub_buf.pdcid, State#inter_dc_sub_buf.last_observed_opid + 1, TxnLast) of
ok ->
State#inter_dc_sub_buf{state_name = buffering};
%% Enter buffering state while waiting for response and set timeout
State#inter_dc_sub_buf{state_name = buffering, log_reader_timeout = erlang:system_time(millisecond) + ?LOG_REQUEST_TIMEOUT};
_ ->
?LOG_WARNING("Failed to send log query to DC, will retry on next ping message"),
State#inter_dc_sub_buf{state_name = normal}
Expand All @@ -138,7 +145,7 @@ process_queue(State = #inter_dc_sub_buf{queue = Queue, last_observed_opid = Last

%% If the transaction has an old value, drop it.
lt ->
?LOG_WARNING("Dropping duplicate message ~w, last time was ~w", [Txn, Last]),
?LOG_WARNING("Dropping duplicate message ~w, last time was ~w", [(TxnLast + 1), Last]),
process_queue(State#inter_dc_sub_buf{queue = queue:drop(Queue)})
end
end.
Expand All @@ -162,3 +169,111 @@ query({DCID, Partition}, From, To) ->
cmp(A, B) when A > B -> gt;
cmp(A, B) when B > A -> lt;
cmp(_, _) -> eq.

-ifdef(TEST).

process_init() ->
meck_reset(),
State = new_state({0, 0}),
Txn = make_txn(0),
NewState = process({txn, Txn}, State),
?assertEqual(normal, NewState#inter_dc_sub_buf.state_name),
check_meck_calls(1, 1, 1, 0).

process_old() ->
meck_reset(),
State = new_state({0, 0}),
Txn = make_txn(-1),
NewState = process({txn, Txn}, State#inter_dc_sub_buf{state_name = normal, last_observed_opid=0}),
?assertEqual(normal, NewState#inter_dc_sub_buf.state_name),
check_meck_calls(0, 0, 0, 0).

process_missing_txn() ->
meck_reset(),
State = new_state({0, 0}),
Txn = make_txn(1),
NewState = process({txn, Txn}, State#inter_dc_sub_buf{state_name = normal, last_observed_opid=0}),
?assertEqual(1, meck:num_calls(inter_dc_query, perform_request, '_')),
?assertEqual(buffering, NewState#inter_dc_sub_buf.state_name),
check_meck_calls(0, 0, 0, 1).

process_buffering() ->
meck_reset(),
State = new_state({0, 0}),
Txn = make_txn(1),
NewState = process({txn, Txn}, State#inter_dc_sub_buf{state_name = buffering, log_reader_timeout = erlang:system_time(millisecond) + 3000, last_observed_opid=0}),
?assertEqual(buffering, NewState#inter_dc_sub_buf.state_name),
check_meck_calls(0, 0, 0, 0),
NewState2 = process({txn, Txn}, State#inter_dc_sub_buf{state_name = buffering, log_reader_timeout = erlang:system_time(millisecond) - 1000, last_observed_opid=0}),
?assertEqual(buffering, NewState2#inter_dc_sub_buf.state_name),
check_meck_calls(0, 0, 0, 1).

process_resp() ->
meck_reset(),
State = new_state({0, 0}),
Txn = make_txn(1),
BufState = process({txn, Txn}, State#inter_dc_sub_buf{state_name = normal, last_observed_opid=0}),
?assertEqual(buffering, BufState#inter_dc_sub_buf.state_name),
?assertEqual(1, queue:len(BufState#inter_dc_sub_buf.queue)),
Txn2 = make_txn(0),
NormalState = process({log_reader_resp, [Txn2]}, BufState),
?assertEqual(normal, NormalState#inter_dc_sub_buf.state_name),
?assertEqual(0, queue:len(NormalState#inter_dc_sub_buf.queue)),
check_meck_calls(0, 0, 2, 1).

make_txn(Last) ->
#interdc_txn{
dcid = 0,
partition = 0,
prev_log_opid = #op_number{node = {node(), 0}, global = 0, local = Last},
log_records = [#log_record{
op_number = #op_number{node = {node(), 0}, global = 0, local = Last + 1},
log_operation = #log_operation{op_type = commit}
}]
}.

meck_reset() ->
meck:reset(dc_utilities),
meck:reset(logging_vnode),
meck:reset(inter_dc_dep_vnode),
meck:reset(inter_dc_query).

check_meck_calls(Dc_utilities, Logging_vnode, Inter_dc_dep_vnode, Inter_dc_query) ->
?assertEqual(Dc_utilities, meck:num_calls(dc_utilities, partition_to_indexnode, '_')),
?assertEqual(Logging_vnode, meck:num_calls(logging_vnode, request_op_id, '_')),
?assertEqual(Inter_dc_dep_vnode, meck:num_calls(inter_dc_dep_vnode, handle_transaction, '_')),
?assertEqual(Inter_dc_query, meck:num_calls(inter_dc_query, perform_request, '_')).

test_init() ->
application:set_env(antidote, enable_logging, true),
meck:new(dc_utilities),
meck:new(logging_vnode),
meck:new(inter_dc_dep_vnode),
meck:new(inter_dc_query),
meck:expect(logging_vnode, request_op_id, fun(_, _, _) -> {ok, 0} end),
meck:expect(dc_utilities, partition_to_indexnode, fun(_) -> {0, node()} end),
meck:expect(inter_dc_query, perform_request, fun(_, _, _, _) -> ok end),
meck:expect(inter_dc_dep_vnode, handle_transaction, fun(_) -> ok end),
logger:add_handler_filter(default, ?MODULE, {fun(_, _) -> stop end, nostate}),
ok.

test_cleanup(_) ->
meck:unload(dc_utilities),
meck:unload(logging_vnode),
meck:unload(inter_dc_dep_vnode),
meck:unload(inter_dc_query),
logger:remove_handler_filter(default, ?MODULE).

meck_test_() -> {
setup,
fun test_init/0,
fun test_cleanup/1,
[
fun process_init/0,
fun process_old/0,
fun process_missing_txn/0,
fun process_buffering/0,
fun process_resp/0
]}.

-endif.
Loading

0 comments on commit daaf815

Please sign in to comment.