Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eternal watch should be robust when blocks are slow #64

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions lib/etcd/keys.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,13 @@ def update(key, opts = {})
set(key, opts.merge(prevExist: true))
end

def eternal_watch(key, index = nil)
def eternal_watch(key, opts = {})
next_index = opts[:waitIndex]
loop do
response = watch(key, index)
opts = opts.merge(waitIndex: next_index) if next_index
response = watch(key, opts)
yield response
next_index = response.node.modified_index + 1
end
end

Expand Down
92 changes: 91 additions & 1 deletion spec/etcd/watch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
expect(response.node.value).to eq(value)
end


it 'with recrusive, waits and return when the key is updated' do
response = nil
key = random_key
Expand All @@ -45,4 +44,95 @@
thr.join
expect(response.node.value).to eq(value)
end

context :eternal_watch do
let(:key) { random_key }
let(:value) { uuid.generate }

it 'should loop multiple times collecting responses' do
responses = []
client.set(key, value: 'initial_value')

thr = Thread.new do
controlling_loop do |control|

client.eternal_watch(key, timeout: 3) do |response_in_loop|
responses << response_in_loop
end

end
end

sleep 2
client.set(key, value: 'value-1')
client.set(key, value: 'value-2')
thr.join

expect(responses.length).to eq 2
expect(responses.map { |r| r.node.value }).to eq ['value-1', 'value-2']
end

it 'can watch recursive keys' do
response = nil
client.set("#{key}/subkey", value:"initial_value")

thr = Thread.new do
controlling_loop do |control|

client.eternal_watch(key, recursive: true) do |response_in_loop|
response = response_in_loop
control.stop
end

end
end

sleep 2
client.set("#{key}/subkey", value: value)
thr.join

expect(response.node.value).to eq(value)
end

it "resumes watching after the index of the last response" do
responses = []
client.set(key, value:"initial_value")

thr = Thread.new do
controlling_loop do |control|

client.eternal_watch(key, timeout: 3) do |response_in_loop|
responses << response_in_loop
sleep 1
end

end
end

sleep 1
client.set(key, value: 'value-1')
client.set(key, value: 'value-2')
thr.join

expect(responses.length).to eq 2
expect(responses.map { |r| r.node.value }).to eq ['value-1', 'value-2']
end
end

private

class LoopControl
class Stop < Exception; end
def stop
raise Stop
end
end

def controlling_loop
control = LoopControl.new
begin
yield control
rescue LoopControl::Stop, Net::ReadTimeout
end
end
end