Skip to content

Commit

Permalink
Merge pull request #35 from zhongwencool/fixed-gun-trailers-unknown
Browse files Browse the repository at this point in the history
[handle gun_trailers message] return grpc_error message when stream is broken by etcd server
  • Loading branch information
zhongwencool authored May 7, 2021
2 parents 89e5db5 + 13698cc commit 69d50ac
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 12 deletions.
7 changes: 5 additions & 2 deletions rebar.lock
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
{"1.1.0",
{"1.2.0",
[{<<"cowlib">>,{pkg,<<"cowlib">>,<<"2.7.3">>},1},
{<<"gun">>,{pkg,<<"gun">>,<<"1.3.3">>},0}]}.
[
{pkg_hash,[
{<<"cowlib">>, <<"A7FFCD0917E6D50B4D5FB28E9E2085A0CEB3C97DEA310505F7460FF5ED764CE9">>},
{<<"gun">>, <<"CF8B51BEB36C22B9C8DF1921E3F2BC4D2B1F68B49AD4FBC64E91875AA14E16B4">>}]}
{<<"gun">>, <<"CF8B51BEB36C22B9C8DF1921E3F2BC4D2B1F68B49AD4FBC64E91875AA14E16B4">>}]},
{pkg_hash_ext,[
{<<"cowlib">>, <<"1E1A3D176D52DAEBBECBBCDFD27C27726076567905C2A9D7398C54DA9D225761">>},
{<<"gun">>, <<"3106CE167F9C9723F849E4FB54EA4A4D814E3996AE243A1C828B256E749041E0">>}]}
].
2 changes: 1 addition & 1 deletion src/eetcd.app.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{application, eetcd,
[
{description, "ETCD V3 client"},
{vsn, "0.3.3"},
{vsn, "0.3.4"},
{registered, [eetcd_sup, eetcd_conn_sup, eetcd_lease_sup]},
{mod, {eetcd_app, []}},
{applications, [kernel, stdlib, gun]},
Expand Down
12 changes: 11 additions & 1 deletion src/eetcd_election.erl
Original file line number Diff line number Diff line change
Expand Up @@ -285,12 +285,22 @@ observe_stream(OCtx, Msg) ->
resp_stream(#{stream_ref := Ref, http2_pid := Pid},
{gun_response, Pid, Ref, nofin, 200, _Headers}) ->
receive {gun_data, Pid, Ref, nofin, Bin} ->
{ok, Bin}
receive {gun_trailers, Pid, Ref, [{<<"grpc-status">>, <<"0">>}, {<<"grpc-message">>, <<>>}]} ->
{ok, Bin};
{gun_trailers, Pid, Ref, [{<<"grpc-status">>, GrpcStatus}, {<<"grpc-message">>, GrpcMsg}]} ->
{error, ?GRPC_ERROR(GrpcStatus, GrpcMsg)}
after 2000 -> unknown
end
after 2000 -> unknown
end;
resp_stream(#{stream_ref := Ref, http2_pid := Pid},
{gun_data, Pid, Ref, nofin, Bin}) ->
{ok, Bin};
resp_stream(#{stream_ref := SRef, http2_pid := Pid, monitor_ref := MRef},
{gun_trailers, Pid, SRef, [{<<"grpc-status">>, GrpcStatus}, {<<"grpc-message">>, GrpcMsg}]}) -> %% grpc error
erlang:demonitor(MRef, [flush]),
gun:cancel(Pid, SRef),
{error, ?GRPC_ERROR(GrpcStatus, GrpcMsg)};
resp_stream(#{stream_ref := SRef, http2_pid := Pid, monitor_ref := MRef},
{gun_error, Pid, SRef, Reason}) -> %% stream error
erlang:demonitor(MRef, [flush]),
Expand Down
1 change: 1 addition & 0 deletions src/eetcd_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ await_body(ServerPid, StreamRef, Timeout, MRef, Acc) ->
{gun_data, ServerPid, StreamRef, fin, Data} ->
{ok, <<Acc/binary, Data/binary>>};
%% It's OK to return trailers here because the client specifically requested them
%% Trailers are grpc_status and grpc_message headers
{gun_trailers, ServerPid, StreamRef, Trailers} ->
{ok, Acc, Trailers};
{gun_error, ServerPid, StreamRef, Reason} ->
Expand Down
9 changes: 7 additions & 2 deletions src/eetcd_watch.erl
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,13 @@ watch(Name, CreateReq, Timeout) ->
%%that is, a gun_* message received on the gun connection.
%%If it is, then this function will parse the message, turn it into watch responses, and possibly take action given the responses.
%%If there's no error, this function returns {ok, WatchConn, 'Etcd.WatchResponse'()}|{more, WatchConn}
%%If there's an error, {error, {stream_error | conn_error | http2_down, term()} | timeout} is returned.
%%If there's an error, {error, {grpc_error, stream_error | conn_error | http2_down, term()} | timeout} is returned.
%%If the given message is not from the gun connection, this function returns unknown.
-spec watch_stream(watch_conn(), Message) ->
{ok, watch_conn(), router_pb:'Etcd.WatchResponse'()}
| {more, watch_conn()}
| unknown
| {error, {stream_error | conn_error | http2_down, term()}} when
| {error, {grpc_error, stream_error | conn_error | http2_down, term()}} when
Message :: term().

watch_stream(#{stream_ref := Ref, http2_pid := Pid, unprocessed := Unprocessed} = Conn,
Expand All @@ -194,6 +194,11 @@ watch_stream(#{stream_ref := Ref, http2_pid := Pid, unprocessed := Unprocessed}
Resp};
more -> {more, Conn#{unprocessed => Bin}}
end;
watch_stream(#{stream_ref := SRef, http2_pid := Pid, monitor_ref := MRef},
{gun_trailers, Pid, SRef, [{<<"grpc-status">>, Status}, {<<"grpc-message">>, Msg}]}) ->
erlang:demonitor(MRef, [flush]),
gun:cancel(Pid, SRef),
{error, {grpc_error, ?GRPC_ERROR(Status, Msg)}};
watch_stream(#{stream_ref := SRef, http2_pid := Pid, monitor_ref := MRef},
{gun_error, Pid, SRef, Reason}) -> %% stream error
erlang:demonitor(MRef, [flush]),
Expand Down
20 changes: 14 additions & 6 deletions test/eetcd_election_leader_example.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ resign(Pid) ->
%%%===================================================================

init([Etcd, LeaderKey, Value]) ->
logger:set_primary_config(#{level => info}),
erlang:process_flag(trap_exit, true),
{ok, #{'ID' := LeaseID}} = eetcd_lease:grant(Etcd, 8),
{ok, _} = eetcd_lease:keep_alive(Etcd, LeaseID),
Expand Down Expand Up @@ -66,9 +67,11 @@ handle_info(Msg, State) ->
#{campaign := Campaign, observe := Observe} = State,
case eetcd_election:campaign_response(Campaign, Msg) of
{ok, NewCampaign = #{campaign := Leader}} ->
%% Only get this response when you win campaign by yourself.
%% You are leader!
win_campaign_event(Leader),
{noreply, State#{campaign => NewCampaign}};
{error, Reason} -> %% you can just let it crash and restart process
{error, Reason} -> %% you can just let it crash and restart process or recampaign !!!
campaign_unexpected_error(Reason),
{noreply, State};
unknown ->
Expand All @@ -95,17 +98,22 @@ code_change(_OldVsn, State = #{}, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
win_campaign_event(_Leader) ->
win_campaign_event(Leader) ->
logger:info("win campaign event:~p", [Leader]),
"Todo".

campaign_unexpected_error(_Reason) ->
campaign_unexpected_error(Reason) ->
logger:info("campaign unexpected error:~p", [Reason]),
"Todo: try to recampaign".

leader_change_event(_Leader) ->
leader_change_event(Leader) ->
logger:info("leader change event:~p", [Leader]),
"Todo".

observe_unexpected_error(_Reason) ->
observe_unexpected_error(Reason) ->
logger:info("observe unexpect error:~p", [Reason]),
"Todo: try to reobserve after some sleep.".

handle_info_your_own_msg(_Msg, _State) ->
handle_info_your_own_msg(Msg, State) ->
logger:info("hanle info your own msg:~p ~p", [Msg, State]),
"Todo".

0 comments on commit 69d50ac

Please sign in to comment.