Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peeking over federation via MSC2444 #1391

Merged
merged 92 commits into from
Jan 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
b9342d9
a very very WIP first cut of peeking via MSC2753.
ara4n Aug 30, 2020
d7bdf71
make PeekingDeviceSet private
ara4n Aug 30, 2020
cfa0be5
merge master
ara4n Aug 31, 2020
9b79f9a
add server_name param
ara4n Aug 31, 2020
d343b8f
blind stab at adding a `peek` section to /sync
ara4n Aug 31, 2020
c4e5f60
make it build
ara4n Aug 31, 2020
d1e4d66
make it launch
ara4n Aug 31, 2020
f006b37
add peeking to getResponseWithPDUsForCompleteSync
ara4n Aug 31, 2020
6c3a896
cancel any peeks when we join a room
ara4n Aug 31, 2020
7b38d48
spell out how to runoutside of docker if you want speed
ara4n Aug 31, 2020
e589984
fix SQL
ara4n Aug 31, 2020
0bb2c2c
remove unnecessary txn for SelectPeeks
ara4n Aug 31, 2020
28219c6
Merge branch 'master' into matthew/peeking
ara4n Sep 1, 2020
86e9736
fix s/join/peek/ cargocult fail
ara4n Sep 1, 2020
d0d5f70
Merge branch 'master' into matthew/peeking
ara4n Sep 1, 2020
bfecc8e
HACK: Track goroutine IDs to determine when we write by the wrong thread
kegsay Sep 1, 2020
7bf2a27
Track partition offsets and only log unsafe for non-selects
kegsay Sep 1, 2020
fcdb90c
Put redactions in the writer goroutine
kegsay Sep 1, 2020
6410b70
Update filters on writer goroutine
kegsay Sep 1, 2020
ed4b3a5
Merge branch 'kegan/redact-txn' into matthew/peeking
ara4n Sep 1, 2020
3cebd8d
Merge branch 'kegan/HACK-goid-sqlite-db-is-locked' into matthew/peeking
ara4n Sep 1, 2020
5d7f688
wrap peek storage in goid hack
ara4n Sep 1, 2020
6424117
use exclusive writer, and MarkPeeksAsOld more efficiently
ara4n Sep 1, 2020
85bce11
don't log ascii in binary at sql trace...
ara4n Sep 1, 2020
75b91ac
strip out empty roomd deltas
ara4n Sep 1, 2020
b6cc441
re-add txn to SelectPeeks
ara4n Sep 2, 2020
f6af656
re-add accidentally deleted field
ara4n Sep 2, 2020
8712ea3
Merge branch 'master' into matthew/peeking
ara4n Sep 3, 2020
eda84cd
reject peeks for non-worldreadable rooms
ara4n Sep 3, 2020
da3742c
move perform_peek
ara4n Sep 3, 2020
3c5e079
fix package
ara4n Sep 3, 2020
994cc18
correctly refactor perform_peek
ara4n Sep 3, 2020
c1f1fcd
WIP of implementing MSC2444
ara4n Sep 4, 2020
4ca2cf4
typo
ara4n Sep 4, 2020
98cf898
Revert "Merge branch 'kegan/HACK-goid-sqlite-db-is-locked' into matth…
ara4n Sep 11, 2020
f236e82
Merge branch 'master' into matthew/peeking-over-fed
ara4n Sep 11, 2020
baee97b
(almost) make it build
ara4n Sep 11, 2020
4ef6a3c
clean up bad merge
ara4n Sep 11, 2020
65e59a1
support SendEventWithState with optional event
ara4n Sep 11, 2020
a5c0521
fix build & lint
ara4n Sep 11, 2020
df29509
fix build & lint
ara4n Sep 11, 2020
410ac72
reinstate federated peeks in the roomserver (doh)
ara4n Sep 11, 2020
f8bb448
fix sql thinko
ara4n Sep 11, 2020
fff1845
todo for authenticating state returned by /peek
ara4n Sep 12, 2020
c6a2604
support returning current state from QueryStateAndAuthChain
ara4n Sep 12, 2020
803647b
handle SS /peek
ara4n Sep 12, 2020
0ae0d11
reimplement SS /peek to prod the RS to tell the FS about the peek
ara4n Sep 12, 2020
4e96e62
rename RemotePeeks as OutboundPeeks
ara4n Sep 12, 2020
59e2be7
rename remote_peeks_table as outbound_peeks_table
ara4n Sep 12, 2020
36e32f1
add perform_handle_remote_peek.go
ara4n Sep 12, 2020
0dc422c
flesh out federation doc
ara4n Sep 12, 2020
71732f2
add inbound peeks table and hook it up
ara4n Sep 12, 2020
8f203fe
rename ambiguous RemotePeek as InboundPeek
ara4n Sep 12, 2020
3caae79
rename FSAPI's PerformPeek as PerformOutboundPeek
ara4n Sep 12, 2020
a160c07
setup inbound peeks db correctly
ara4n Sep 13, 2020
32f898d
fix api.SendEventWithState with no event
ara4n Sep 13, 2020
41b9b66
Merge branch 'master' into matthew/peeking-over-fed
ara4n Sep 19, 2020
20e2cb4
track latestevent on /peek
ara4n Sep 22, 2020
3202c7e
go fmt
ara4n Sep 22, 2020
0ab4bc9
document the peek send stream race better
ara4n Sep 22, 2020
75c3f2d
merge master
ara4n Sep 26, 2020
2b4353e
fix SendEventWithRewrite not to bail if handed a non-state event
ara4n Sep 26, 2020
927a62a
add fixme
ara4n Sep 26, 2020
1e6e23d
switch SS /peek to use SendEventWithRewrite
ara4n Sep 26, 2020
fd90849
fix comment
ara4n Sep 26, 2020
ed9e3fc
use reverse topo ordering to find latest extrem
ara4n Sep 26, 2020
efd8656
support postgres for federated peeking
ara4n Sep 27, 2020
ebeff8d
go fmt
ara4n Sep 27, 2020
4262095
back out bogus go.mod change
ara4n Sep 27, 2020
797085f
Merge branch 'master' into matthew/peeking-over-fed
neilalexander Oct 20, 2020
814c220
Merge branch 'master' into matthew/peeking-over-fed
neilalexander Oct 20, 2020
be5a4e6
Fix performOutboundPeekUsingServer
neilalexander Oct 20, 2020
90017d0
Fix getAuthChain -> GetAuthChain
neilalexander Oct 20, 2020
0fd9e96
Merge branch 'master' into matthew/peeking-over-fed
kegsay Oct 22, 2020
a2a5c7e
Merge branch 'master' into matthew/peeking-over-fed
neilalexander Dec 2, 2020
3ba3530
Merge branch 'master' into matthew/peeking-over-fed
neilalexander Dec 2, 2020
7fc3852
Fix build issues
neilalexander Dec 2, 2020
d25345d
Merge branch 'master' into matthew/peeking-over-fed
neilalexander Dec 3, 2020
c2f7c80
Fix build again
neilalexander Dec 3, 2020
fe1d2f8
Merge branch 'master' into matthew/peeking-over-fed
neilalexander Dec 3, 2020
45f0fdd
Merge branch 'master' into matthew/peeking-over-fed
neilalexander Dec 10, 2020
d47ab1f
Fix getAuthChain -> GetAuthChain
neilalexander Dec 10, 2020
0fe0e23
Don't repeat outbound peeks for the same room ID to the same servers
neilalexander Dec 10, 2020
e0a35c0
Fix lint
neilalexander Dec 10, 2020
8508af3
Merge branch 'master' into matthew/peeking-over-fed
neilalexander Dec 18, 2020
3985d03
Merge branch 'master' into matthew/peeking-over-fed
neilalexander Jan 13, 2021
f2dec90
Merge branch 'master' into matthew/peeking-over-fed
neilalexander Jan 18, 2021
609743b
Merge branch 'master' into matthew/peeking-over-fed
kegsay Jan 19, 2021
c71bf5d
Merge branch 'master' into matthew/peeking-over-fed
kegsay Jan 20, 2021
4491e53
Don't omitempty to appease sytest
kegsay Jan 22, 2021
552f583
Merge branch 'master' into matthew/peeking-over-fed
kegsay Jan 22, 2021
dd3c6ff
Merge branch 'master' into matthew/peeking-over-fed
kegsay Jan 22, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions docs/peeking.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
## Peeking

Peeking is implemented as per [MSC2753](https://github.com/matrix-org/matrix-doc/pull/2753).
Local peeking is implemented as per [MSC2753](https://github.com/matrix-org/matrix-doc/pull/2753).

Implementationwise, this means:
* Users call `/peek` and `/unpeek` on the clientapi from a given device.
* The clientapi delegates these via HTTP to the roomserver, which coordinates peeking in general for a given room
* The roomserver writes an NewPeek event into the kafka log headed to the syncserver
* The syncserver tracks the existence of the local peek in its DB, and then starts waking up the peeking devices for the room in question, putting it in the `peek` section of the /sync response.
* The syncserver tracks the existence of the local peek in the syncapi_peeks table in its DB, and then starts waking up the peeking devices for the room in question, putting it in the `peek` section of the /sync response.

Questions (given this is [my](https://github.com/ara4n) first time hacking on Dendrite):
* The whole clientapi -> roomserver -> syncapi flow to initiate a peek seems very indirect. Is there a reason not to just let syncapi itself host the implementation of `/peek`?
Peeking over federation is implemented as per [MSC2444](https://github.com/matrix-org/matrix-doc/pull/2444).

In future, peeking over federation will be added as per [MSC2444](https://github.com/matrix-org/matrix-doc/pull/2444).
* The `roomserver` will kick the `federationsender` much as it does for a federated `/join` in order to trigger a federated `/peek`
* The `federationsender` tracks the existence of the remote peek in question
For requests to peek our rooms ("inbound peeks"):
* Remote servers call `/peek` on federationapi
* The federationapi queries the federationsender to check if this is renewing an inbound peek or not.
* If not, it hits the PerformInboundPeek on the roomserver to ask it for the current state of the room.
* The roomserver atomically (in theory) adds a NewInboundPeek to its kafka stream to tell the federationserver to start peeking.
* The federationsender receives the event, tracks the inbound peek in the federationsender_inbound_peeks table, and starts sending events to the peeking server.
* The federationsender evicts stale inbound peeks which haven't been renewed.

For peeking into other server's rooms ("outbound peeks"):
* The `roomserver` will kick the `federationsender` much as it does for a federated `/join` in order to trigger a federated outbound `/peek`
* The `federationsender` tracks the existence of the outbound peek in in its federationsender_outbound_peeks table.
* The `federationsender` regularly renews the remote peek as long as there are still peeking devices syncing for it.
* TBD: how do we tell if there are no devices currently syncing for a given peeked room? The syncserver needs to tell the roomserver
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we care? We time out the peek anyway, and really just because there are no devices currently syncing doesn't mean we should stop peeking. Network interruptions are a reasonable reason, we should still be honouring the renewal period.

somehow who then needs to warn the federationsender.
102 changes: 102 additions & 0 deletions federationapi/routing/peek.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2020 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package routing

import (
"net/http"

"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)

// Peek implements the SS /peek API, handling inbound peeks
func Peek(
httpReq *http.Request,
request *gomatrixserverlib.FederationRequest,
cfg *config.FederationAPI,
rsAPI api.RoomserverInternalAPI,
roomID, peekID string,
remoteVersions []gomatrixserverlib.RoomVersion,
) util.JSONResponse {
// TODO: check if we're just refreshing an existing peek by querying the federationsender

verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
verRes := api.QueryRoomVersionForRoomResponse{}
if err := rsAPI.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil {
return util.JSONResponse{
Code: http.StatusInternalServerError,
JSON: jsonerror.InternalServerError(),
}
}

// Check that the room that the peeking server is trying to peek is actually
// one of the room versions that they listed in their supported ?ver= in
// the peek URL.
remoteSupportsVersion := false
for _, v := range remoteVersions {
if v == verRes.RoomVersion {
remoteSupportsVersion = true
break
}
}
// If it isn't, stop trying to peek the room.
if !remoteSupportsVersion {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.IncompatibleRoomVersion(verRes.RoomVersion),
}
}

// TODO: Check history visibility

// tell the peeking server to renew every hour
renewalInterval := int64(60 * 60 * 1000 * 1000)

var response api.PerformInboundPeekResponse
err := rsAPI.PerformInboundPeek(
httpReq.Context(),
&api.PerformInboundPeekRequest{
RoomID: roomID,
PeekID: peekID,
ServerName: request.Origin(),
RenewalInterval: renewalInterval,
},
&response,
)
if err != nil {
resErr := util.ErrorResponse(err)
return resErr
}

if !response.RoomExists {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would've failed earlier as QueryRoomVersionForRoom would return a failure.

return util.JSONResponse{Code: http.StatusNotFound, JSON: nil}
}

respPeek := gomatrixserverlib.RespPeek{
StateEvents: gomatrixserverlib.UnwrapEventHeaders(response.StateEvents),
AuthEvents: gomatrixserverlib.UnwrapEventHeaders(response.AuthChainEvents),
RoomVersion: response.RoomVersion,
LatestEvent: response.LatestEvent.Unwrap(),
RenewalInterval: renewalInterval,
}

return util.JSONResponse{
Code: http.StatusOK,
JSON: respPeek,
}
}
38 changes: 34 additions & 4 deletions federationapi/routing/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,37 @@ func Setup(
},
)).Methods(http.MethodGet)

v1fedmux.Handle("/make_join/{roomID}/{eventID}", httputil.MakeFedAPI(
v1fedmux.Handle("/peek/{roomID}/{peekID}", httputil.MakeFedAPI(
"federation_peek", cfg.Matrix.ServerName, keys, wakeup,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
if roomserverAPI.IsServerBannedFromRoom(httpReq.Context(), rsAPI, vars["roomID"], request.Origin()) {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden("Forbidden by server ACLs"),
}
}
roomID := vars["roomID"]
peekID := vars["peekID"]
queryVars := httpReq.URL.Query()
remoteVersions := []gomatrixserverlib.RoomVersion{}
if vers, ok := queryVars["ver"]; ok {
// The remote side supplied a ?ver= so use that to build up the list
// of supported room versions
for _, v := range vers {
remoteVersions = append(remoteVersions, gomatrixserverlib.RoomVersion(v))
}
} else {
// The remote side didn't supply a ?ver= so just assume that they only
// support room version 1
remoteVersions = append(remoteVersions, gomatrixserverlib.RoomVersionV1)
}
return Peek(
httpReq, request, cfg, rsAPI, roomID, peekID, remoteVersions,
)
},
)).Methods(http.MethodPut, http.MethodDelete)

v1fedmux.Handle("/make_join/{roomID}/{userID}", httputil.MakeFedAPI(
"federation_make_join", cfg.Matrix.ServerName, keys, wakeup,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
if roomserverAPI.IsServerBannedFromRoom(httpReq.Context(), rsAPI, vars["roomID"], request.Origin()) {
Expand All @@ -239,11 +269,11 @@ func Setup(
}
}
roomID := vars["roomID"]
eventID := vars["eventID"]
userID := vars["userID"]
queryVars := httpReq.URL.Query()
remoteVersions := []gomatrixserverlib.RoomVersion{}
if vers, ok := queryVars["ver"]; ok {
// The remote side supplied a ?=ver so use that to build up the list
// The remote side supplied a ?ver= so use that to build up the list
// of supported room versions
for _, v := range vers {
remoteVersions = append(remoteVersions, gomatrixserverlib.RoomVersion(v))
Expand All @@ -255,7 +285,7 @@ func Setup(
remoteVersions = append(remoteVersions, gomatrixserverlib.RoomVersionV1)
}
return MakeJoin(
httpReq, request, cfg, rsAPI, roomID, eventID, remoteVersions,
httpReq, request, cfg, rsAPI, roomID, userID, remoteVersions,
)
},
)).Methods(http.MethodGet)
Expand Down
16 changes: 16 additions & 0 deletions federationsender/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ type FederationSenderInternalAPI interface {
request *PerformJoinRequest,
response *PerformJoinResponse,
)
// Handle an instruction to peek a room on a remote server.
PerformOutboundPeek(
ctx context.Context,
request *PerformOutboundPeekRequest,
response *PerformOutboundPeekResponse,
) error
// Handle an instruction to make_leave & send_leave with a remote server.
PerformLeave(
ctx context.Context,
Expand Down Expand Up @@ -111,6 +117,16 @@ type PerformJoinResponse struct {
LastError *gomatrix.HTTPError
}

type PerformOutboundPeekRequest struct {
RoomID string `json:"room_id"`
// The sorted list of servers to try. Servers will be tried sequentially, after de-duplication.
ServerNames types.ServerNames `json:"server_names"`
}

type PerformOutboundPeekResponse struct {
LastError *gomatrix.HTTPError
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we be getting gomatrixserverlib.RespPeek somewhere?

}

type PerformLeaveRequest struct {
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
Expand Down
40 changes: 39 additions & 1 deletion federationsender/consumers/roomserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
}
return nil
}
case api.OutputTypeNewInboundPeek:
if err := s.processInboundPeek(*output.NewInboundPeek); err != nil {
log.WithFields(log.Fields{
"event": output.NewInboundPeek,
log.ErrorKey: err,
}).Panicf("roomserver output log: remote peek event failure")
return nil
}
default:
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
Expand All @@ -121,6 +129,23 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil
}

// processInboundPeek starts tracking a new federated inbound peek (replacing the existing one if any)
// causing the federationsender to start sending messages to the peeking server
func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPeek) error {

// FIXME: there's a race here - we should start /sending new peeked events
// atomically after the orp.LatestEventID to ensure there are no gaps between
// the peek beginning and the send stream beginning.
//
// We probably need to track orp.LatestEventID on the inbound peek, but it's
// unclear how we then use that to prevent the race when we start the send
// stream.
//
// This is making the tests flakey.

return s.db.AddInboundPeek(context.TODO(), orp.ServerName, orp.RoomID, orp.PeekID, orp.RenewalInterval)
}

// processMessage updates the list of currently joined hosts in the room
// and then sends the event to the hosts that were joined before the event.
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
Expand Down Expand Up @@ -164,14 +189,18 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
return err
}

// TODO: do housekeeping to evict unrenewed peeking hosts

// TODO: implement query to let the fedapi check whether a given peek is live or not

// Send the event.
return s.queues.SendEvent(
ore.Event, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent,
)
}

// joinedHostsAtEvent works out a list of matrix servers that were joined to
// the room at the event.
// the room at the event (including peeking ones)
// It is important to use the state at the event for sending messages because:
// 1) We shouldn't send messages to servers that weren't in the room.
// 2) If a server is kicked from the rooms it should still be told about the
Expand Down Expand Up @@ -222,6 +251,15 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent(
joined[joinedHost.ServerName] = true
}

// handle peeking hosts
inboundPeeks, err := s.db.GetInboundPeeks(context.TODO(), ore.Event.Event.RoomID())
if err != nil {
return nil, err
}
for _, inboundPeek := range inboundPeeks {
joined[inboundPeek.ServerName] = true
}

var result []gomatrixserverlib.ServerName
for serverName, include := range joined {
if include {
Expand Down
Loading