-
Notifications
You must be signed in to change notification settings - Fork 225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement SearchValue #174
Conversation
Tell me when you want this reviewed. |
bcb8474
to
75bae8d
Compare
Should be ready |
routing.go
Outdated
return nil, routing.ErrNotFound | ||
responsesNeeded := 0 | ||
if !cfg.Offline { | ||
responsesNeeded = getQuorum(&cfg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if we should be applying the quorum in here. That's more of a GetValue
thing. Really, we should probably have a separate "limit" option that defaults to -1. We can then get the quorum in GetValue and then apply it as the limit. Otherwise, we'll always stop after the default quorum (if not specified).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: by default, we should continue searching until we run out of peers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, we may just want to make GetValue
and SearchValue
orthogonal (using the same GetValues under the covers).
routing.go
Outdated
select { | ||
case r, ok := <-responses: | ||
if !ok { | ||
responses = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd just use a break label.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would lead to occasionally ignoring errors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah. I didn't see the &&
below.
Actually, do we even need an error channel? I'd just return an error up front (valid key, non-empty routing table, etc.) and then close when done. At that point, I don't see how we can really "fail". If the user wants more context, they can register a notifier.
Is that possible? It'll be a lot nicer to the user.
routing.go
Outdated
} | ||
|
||
// GetValues gets nvals values corresponding to the given key. | ||
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) { | ||
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) <-chan RecvdVal { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may not want to change the API here. For now, I'd just add a private getValues
function and make GetValue
a simple wrapper.
4ca5152
to
09f5251
Compare
On quorum on |
Yeah, you're probably right. However, I'd default to "continue till we run out of peers" instead of the default quorum of 16. |
790b5ca
to
f02c32e
Compare
Done |
ext_test.go
Outdated
@@ -48,7 +47,7 @@ func TestGetFailures(t *testing.T) { | |||
err = merr[0] | |||
} | |||
|
|||
if err != io.EOF { | |||
if err != routing.ErrNotFound { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be a context.ErrDeadlineExceeded
. We'll probably need to check if the context was canceled in GetValue
when the channel is closed.
routing.go
Outdated
if responsesNeeded < 0 { | ||
responsesNeeded = 0 | ||
} | ||
vals := make([]RecvdVal, 0, responsesNeeded) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could grow very large (with no quorum). We may want to fire off a round of corrections ocationally (i.e., see 30 values, correct peers and reset). However, we can probably leave that as a TODO for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, we could just cap responsesNeeded
for now. Actually, that's probably the best thing to do. Cap it at 50 or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving it unbounded could also cause a bunch of goroutines below.
} | ||
if len(recs) == 0 { | ||
|
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
routing.go
Outdated
if err != nil { | ||
continue //TODO: Do we want to do something with the error here? | ||
} | ||
if i != best && !bytes.Equal(v.Val, vals[best].Val) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will be either 0 or 1, not best.
routing.go
Outdated
if best > -1 { | ||
i, err := dht.Validator.Select(key, [][]byte{vals[best].Val, v.Val}) | ||
if err != nil { | ||
continue //TODO: Do we want to do something with the error here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log an error. This should never happen. Honestly, we could just panic (but I'd rather not).
routing.go
Outdated
out = append(out, val) | ||
} | ||
|
||
return out, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs to return this plus ctx.Error()
(the context could have been canceled, etc.).
aeecd87
to
c22e1e1
Compare
ext_test.go
Outdated
record "github.com/libp2p/go-libp2p-record" | ||
routing "github.com/libp2p/go-libp2p-routing" | ||
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" | ||
"github.com/libp2p/go-libp2p-record" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that we never really use consistent naming, in package names, I'd rather leave the explicit names in-place.
@@ -48,7 +48,7 @@ func TestGetFailures(t *testing.T) { | |||
err = merr[0] | |||
} | |||
|
|||
if err != io.EOF { | |||
if err != context.DeadlineExceeded { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
routing.go
Outdated
if len(vals) < maxVals { | ||
vals = append(vals, v) | ||
} else { | ||
i = (best + 1) % maxVals |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i
could still be out of bounds. It may be better to simply put the "best" value in a separate variable and then keep an array of peers to correct.
routing.go
Outdated
return out, ctx.Err() | ||
} | ||
|
||
func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-chan RecvdVal, error) { | ||
eip := log.EventBegin(ctx, "GetValues") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably go into GetValues
. That may also help simplify/get rid of the done
dance.
routing.go
Outdated
@@ -260,10 +349,11 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []R | |||
From: p, | |||
} | |||
valslock.Lock() | |||
vals = append(vals, rv) | |||
vals <- rv |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should select on the context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed two blocking channel writes.
routing.go
Outdated
} | ||
if sel == 1 && !bytes.Equal(v.Val, best.Val) { | ||
best = &v | ||
out <- v.Val |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs to select on the context.
routing.go
Outdated
if err := dht.Validator.Validate(key, v.Val); err == nil { | ||
best = &v | ||
out <- v.Val | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same.
3091c6b
to
246579e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I have a few nits and cleanups but this should work. We can do that in a separate PR.
This exposes new function which streams entries as they are found - https://github.com/libp2p/go-libp2p-routing/blob/master/routing.go#L61
Part of ipfs/kubo#5232
Replaces #49
TODO: