Skip to content

Commit

Permalink
Merge branch 'main' into tulir/bridgev2-deadlock-detect
Browse files Browse the repository at this point in the history
  • Loading branch information
tulir committed Sep 13, 2024
2 parents 80d5daa + e12ecbe commit 5384d69
Show file tree
Hide file tree
Showing 15 changed files with 165 additions and 121 deletions.
7 changes: 7 additions & 0 deletions bridgev2/bridgeconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Config struct {
Homeserver HomeserverConfig `yaml:"homeserver"`
AppService AppserviceConfig `yaml:"appservice"`
Matrix MatrixConfig `yaml:"matrix"`
Analytics AnalyticsConfig `yaml:"analytics"`
Provisioning ProvisioningConfig `yaml:"provisioning"`
PublicMedia PublicMediaConfig `yaml:"public_media"`
DirectMedia DirectMediaConfig `yaml:"direct_media"`
Expand Down Expand Up @@ -78,6 +79,12 @@ type MatrixConfig struct {
UploadFileThreshold int64 `yaml:"upload_file_threshold"`
}

type AnalyticsConfig struct {
Token string `yaml:"token"`
URL string `yaml:"url"`
UserID string `yaml:"user_id"`
}

type ProvisioningConfig struct {
Prefix string `yaml:"prefix"`
SharedSecret string `yaml:"shared_secret"`
Expand Down
5 changes: 5 additions & 0 deletions bridgev2/bridgeconfig/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ func doUpgrade(helper up.Helper) {
helper.Copy(up.Bool, "matrix", "federate_rooms")
helper.Copy(up.Int, "matrix", "upload_file_threshold")

helper.Copy(up.Str|up.Null, "analytics", "token")
helper.Copy(up.Str|up.Null, "analytics", "url")
helper.Copy(up.Str|up.Null, "analytics", "user_id")

helper.Copy(up.Str, "provisioning", "prefix")
if secret, ok := helper.Get(up.Str, "provisioning", "shared_secret"); !ok || secret == "generate" {
sharedSecret := random.String(64)
Expand Down Expand Up @@ -176,6 +180,7 @@ var SpacedBlocks = [][]string{
{"appservice", "as_token"},
{"appservice", "username_template"},
{"matrix"},
{"analytics"},
{"provisioning"},
{"public_media"},
{"direct_media"},
Expand Down
9 changes: 9 additions & 0 deletions bridgev2/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package bridgev2
import (
"errors"
"fmt"
"net/http"

"maunium.net/go/mautrix"
)
Expand Down Expand Up @@ -80,6 +81,14 @@ func (re RespError) Is(err error) bool {
return errors.Is(err, mautrix.RespError(re))
}

func (re RespError) Write(w http.ResponseWriter) {
mautrix.RespError(re).Write(w)
}

func (re RespError) WithMessage(msg string, args ...any) RespError {
return RespError(mautrix.RespError(re).WithMessage(msg, args...))
}

func (re RespError) AppendMessage(append string, args ...any) RespError {
re.Err += fmt.Sprintf(append, args...)
return re
Expand Down
62 changes: 62 additions & 0 deletions bridgev2/matrix/analytics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package matrix

import (
"bytes"
"encoding/json"
"fmt"
"net/http"

"maunium.net/go/mautrix/id"
)

func (br *Connector) trackSync(userID id.UserID, event string, properties map[string]any) error {
var buf bytes.Buffer
var analyticsUserID string
if br.Config.Analytics.UserID != "" {
analyticsUserID = br.Config.Analytics.UserID
} else {
analyticsUserID = userID.String()
}
err := json.NewEncoder(&buf).Encode(map[string]any{
"userId": analyticsUserID,
"event": event,
"properties": properties,
})
if err != nil {
return err
}

req, err := http.NewRequest(http.MethodPost, br.Config.Analytics.URL, &buf)
if err != nil {
return err
}
req.SetBasicAuth(br.Config.Analytics.Token, "")
resp, err := br.AS.HTTPClient.Do(req)
if err != nil {
return err
}
_ = resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code %d", resp.StatusCode)
}
return nil
}

func (br *Connector) TrackAnalytics(userID id.UserID, event string, props map[string]any) {
if br.Config.Analytics.Token == "" || br.Config.Analytics.URL == "" {
return
}

if props == nil {
props = map[string]any{}
}
props["bridge"] = br.Bridge.Network.GetName().BeeperBridgeType
go func() {
err := br.trackSync(userID, event, props)
if err != nil {
br.Log.Err(err).Str("component", "analytics").Str("event", event).Msg("Error tracking event")
} else {
br.Log.Debug().Str("component", "analytics").Str("event", event).Msg("Tracked event")
}
}()
}
3 changes: 3 additions & 0 deletions bridgev2/matrix/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ var (
_ bridgev2.MatrixConnector = (*Connector)(nil)
_ bridgev2.MatrixConnectorWithServer = (*Connector)(nil)
_ bridgev2.MatrixConnectorWithPostRoomBridgeHandling = (*Connector)(nil)
_ bridgev2.MatrixConnectorWithPublicMedia = (*Connector)(nil)
_ bridgev2.MatrixConnectorWithNameDisambiguation = (*Connector)(nil)
_ bridgev2.MatrixConnectorWithAnalytics = (*Connector)(nil)
)

func NewConnector(cfg *bridgeconfig.Config) *Connector {
Expand Down
9 changes: 5 additions & 4 deletions bridgev2/matrix/crypto.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,12 @@ func (helper *CryptoHelper) allowKeyShare(ctx context.Context, device *id.Device
zerolog.Ctx(ctx).Err(err).Msg("Failed to get user to handle key request")
return &crypto.KeyShareRejectNoResponse
} else if user == nil {
// TODO
zerolog.Ctx(ctx).Debug().Msg("Couldn't find user to handle key request")
return &crypto.KeyShareRejectNoResponse
} else if true {
// TODO admin check and is in room check
return &crypto.KeyShareRejection{Code: event.RoomKeyWithheldUnauthorized, Reason: "Key sharing is not yet implemented in bridgev2"}
} else if !user.Permissions.Admin {
zerolog.Ctx(ctx).Debug().Msg("Rejecting key request: user is not admin")
// TODO is in room check?
return &crypto.KeyShareRejection{Code: event.RoomKeyWithheldUnauthorized, Reason: "Key sharing for non-admins is not yet implemented"}
}
zerolog.Ctx(ctx).Debug().Msg("Accepting key request")
return nil
Expand Down
9 changes: 9 additions & 0 deletions bridgev2/matrix/mxmain/example-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,15 @@ matrix:
# rather than keeping the whole file in memory.
upload_file_threshold: 5242880

# Segment-compatible analytics endpoint for tracking some events, like provisioning API login and encryption errors.
analytics:
# API key to send with tracking requests. Tracking is disabled if this is null.
token: null
# Address to send tracking requests to.
url: https://api.segment.io/v1/track
# Optional user ID for tracking events. If null, defaults to using Matrix user ID.
user_id: null

# Settings for provisioning API
provisioning:
# Prefix for the provisioning API paths.
Expand Down
4 changes: 4 additions & 0 deletions bridgev2/matrixinterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ type MatrixConnectorWithPostRoomBridgeHandling interface {
HandleNewlyBridgedRoom(ctx context.Context, roomID id.RoomID) error
}

type MatrixConnectorWithAnalytics interface {
TrackAnalytics(userID id.UserID, event string, properties map[string]any)
}

type MatrixSendExtra struct {
Timestamp time.Time
MessageMeta *database.Message
Expand Down
11 changes: 11 additions & 0 deletions bridgev2/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,14 @@ func (user *User) GetManagementRoom(ctx context.Context) (id.RoomID, error) {
func (user *User) Save(ctx context.Context) error {
return user.Bridge.DB.User.Update(ctx, user.User)
}

func (br *Bridge) TrackAnalytics(userID id.UserID, event string, props map[string]any) {
analyticSender, ok := br.Matrix.(MatrixConnectorWithAnalytics)
if ok {
analyticSender.TrackAnalytics(userID, event, props)
}
}

func (user *User) TrackAnalytics(event string, props map[string]any) {
user.Bridge.TrackAnalytics(user.MXID, event, props)
}
109 changes: 8 additions & 101 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1467,13 +1467,7 @@ func (cli *Client) State(ctx context.Context, roomID id.RoomID) (stateMap RoomSt

// GetMediaConfig fetches the configuration of the content repository, such as upload limitations.
func (cli *Client) GetMediaConfig(ctx context.Context) (resp *RespMediaConfig, err error) {
var u string
if cli.SpecVersions.Supports(FeatureAuthenticatedMedia) {
u = cli.BuildClientURL("v1", "media", "config")
} else {
u = cli.BuildURL(MediaURLPath{"v3", "config"})
}
_, err = cli.MakeRequest(ctx, http.MethodGet, u, nil, &resp)
_, err = cli.MakeRequest(ctx, http.MethodGet, cli.BuildClientURL("v1", "media", "config"), nil, &resp)
return
}

Expand All @@ -1494,94 +1488,13 @@ func (cli *Client) UploadLink(ctx context.Context, link string) (*RespMediaUploa
return cli.Upload(ctx, res.Body, res.Header.Get("Content-Type"), res.ContentLength)
}

// Deprecated: unauthenticated media is deprecated as of Matrix v1.11. Use [Download] or [DownloadBytes] instead.
func (cli *Client) GetDownloadURL(mxcURL id.ContentURI) string {
return cli.BuildURLWithQuery(MediaURLPath{"v3", "download", mxcURL.Homeserver, mxcURL.FileID}, map[string]string{"allow_redirect": "true"})
}

func (cli *Client) doMediaRetry(req *http.Request, cause error, retries int, backoff time.Duration) (*http.Response, error) {
log := zerolog.Ctx(req.Context())
if req.Body != nil {
var err error
if req.GetBody != nil {
req.Body, err = req.GetBody()
if err != nil {
log.Warn().Err(err).Msg("Failed to get new body to retry request")
return nil, cause
}
} else if bodySeeker, ok := req.Body.(io.ReadSeeker); ok {
_, err = bodySeeker.Seek(0, io.SeekStart)
if err != nil {
log.Warn().Err(err).Msg("Failed to seek to beginning of request body")
return nil, cause
}
} else {
log.Warn().Msg("Failed to get new body to retry request: GetBody is nil and Body is not an io.ReadSeeker")
return nil, cause
}
}
log.Warn().Err(cause).
Int("retry_in_seconds", int(backoff.Seconds())).
Msg("Request failed, retrying")
time.Sleep(backoff)
return cli.doMediaRequest(req, retries-1, backoff*2)
}

func (cli *Client) doMediaRequest(req *http.Request, retries int, backoff time.Duration) (*http.Response, error) {
cli.RequestStart(req)
startTime := time.Now()
res, err := cli.Client.Do(req)
duration := time.Now().Sub(startTime)
if err != nil {
if retries > 0 {
return cli.doMediaRetry(req, err, retries, backoff)
}
err = HTTPError{
Request: req,
Response: res,

Message: "request error",
WrappedError: err,
}
cli.LogRequestDone(req, res, err, nil, 0, duration)
return nil, err
}

if retries > 0 && retryafter.Should(res.StatusCode, !cli.IgnoreRateLimit) {
backoff = retryafter.Parse(res.Header.Get("Retry-After"), backoff)
return cli.doMediaRetry(req, fmt.Errorf("HTTP %d", res.StatusCode), retries, backoff)
}

if res.StatusCode < 200 || res.StatusCode >= 300 {
var body []byte
body, err = ParseErrorResponse(req, res)
cli.LogRequestDone(req, res, err, nil, len(body), duration)
} else {
cli.LogRequestDone(req, res, nil, nil, -1, duration)
}
return res, err
}

func (cli *Client) Download(ctx context.Context, mxcURL id.ContentURI) (*http.Response, error) {
ctxLog := zerolog.Ctx(ctx)
if ctxLog.GetLevel() == zerolog.Disabled || ctxLog == zerolog.DefaultContextLogger {
ctx = cli.Log.WithContext(ctx)
}
if cli.SpecVersions.Supports(FeatureAuthenticatedMedia) {
_, resp, err := cli.MakeFullRequestWithResp(ctx, FullRequest{
Method: http.MethodGet,
URL: cli.BuildClientURL("v1", "media", "download", mxcURL.Homeserver, mxcURL.FileID),
DontReadResponse: true,
})
return resp, err
} else {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, cli.GetDownloadURL(mxcURL), nil)
if err != nil {
return nil, err
}
req.Header.Set("User-Agent", cli.UserAgent+" (media downloader)")
return cli.doMediaRequest(req, cli.DefaultHTTPRetries, 4*time.Second)
}
_, resp, err := cli.MakeFullRequestWithResp(ctx, FullRequest{
Method: http.MethodGet,
URL: cli.BuildClientURL("v1", "media", "download", mxcURL.Homeserver, mxcURL.FileID),
DontReadResponse: true,
})
return resp, err
}

func (cli *Client) DownloadBytes(ctx context.Context, mxcURL id.ContentURI) ([]byte, error) {
Expand Down Expand Up @@ -1789,13 +1702,7 @@ func (cli *Client) UploadMedia(ctx context.Context, data ReqUploadMedia) (*RespM
//
// See https://spec.matrix.org/v1.2/client-server-api/#get_matrixmediav3preview_url
func (cli *Client) GetURLPreview(ctx context.Context, url string) (*RespPreviewURL, error) {
var urlPath PrefixableURLPath
if cli.SpecVersions.Supports(FeatureAuthenticatedMedia) {
urlPath = ClientURLPath{"v1", "media", "preview_url"}
} else {
urlPath = MediaURLPath{"v3", "preview_url"}
}
reqURL := cli.BuildURLWithQuery(urlPath, map[string]string{
reqURL := cli.BuildURLWithQuery(ClientURLPath{"v1", "media", "preview_url"}, map[string]string{
"url": url,
})
var output RespPreviewURL
Expand Down
7 changes: 5 additions & 2 deletions crypto/cryptohelper/cryptohelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ type CryptoHelper struct {

LoginAs *mautrix.ReqLogin

ASEventProcessor crypto.ASEventProcessor
ASEventProcessor crypto.ASEventProcessor
CustomPostDecrypt func(context.Context, *event.Event)

DBAccountID string
}
Expand Down Expand Up @@ -320,7 +321,9 @@ func (helper *CryptoHelper) HandleEncrypted(ctx context.Context, evt *event.Even

func (helper *CryptoHelper) postDecrypt(ctx context.Context, decrypted *event.Event) {
decrypted.Mautrix.EventSource |= event.SourceDecrypted
if helper.ASEventProcessor != nil {
if helper.CustomPostDecrypt != nil {
helper.CustomPostDecrypt(ctx, decrypted)
} else if helper.ASEventProcessor != nil {
helper.ASEventProcessor.Dispatch(ctx, decrypted)
} else {
helper.client.Syncer.(mautrix.DispatchableSyncer).Dispatch(ctx, decrypted)
Expand Down
20 changes: 20 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"fmt"
"net/http"

"go.mau.fi/util/exhttp"
"golang.org/x/exp/maps"
)

Expand All @@ -24,6 +25,9 @@ import (
// // logout
// }
var (
// Generic error for when the server encounters an error and it does not have a more specific error code.
// Note that `errors.Is` will check the error message rather than code for M_UNKNOWNs.
MUnknown = RespError{ErrCode: "M_UNKNOWN", StatusCode: http.StatusInternalServerError}
// Forbidden access, e.g. joining a room without permission, failed login.
MForbidden = RespError{ErrCode: "M_FORBIDDEN", StatusCode: http.StatusForbidden}
// Unrecognized request, e.g. the endpoint does not exist or is not implemented.
Expand Down Expand Up @@ -142,6 +146,22 @@ func (e *RespError) MarshalJSON() ([]byte, error) {
return json.Marshal(data)
}

func (e RespError) Write(w http.ResponseWriter) {
statusCode := e.StatusCode
if statusCode == 0 {
statusCode = http.StatusInternalServerError
}
exhttp.WriteJSONResponse(w, statusCode, &e)
}

func (e RespError) WithMessage(msg string, args ...any) RespError {
if len(args) > 0 {
msg = fmt.Sprintf(msg, args...)
}
e.Err = msg
return e
}

// Error returns the errcode and error message.
func (e RespError) Error() string {
return e.ErrCode + ": " + e.Err
Expand Down
Loading

0 comments on commit 5384d69

Please sign in to comment.