diff --git a/lib/etcd/keys.rb b/lib/etcd/keys.rb index 0cf6e7c..6cc0dbe 100644 --- a/lib/etcd/keys.rb +++ b/lib/etcd/keys.rb @@ -115,10 +115,13 @@ def update(key, opts = {}) set(key, opts.merge(prevExist: true)) end - def eternal_watch(key, index = nil) + def eternal_watch(key, opts = {}) + next_index = opts[:waitIndex] loop do - response = watch(key, index) + opts = opts.merge(waitIndex: next_index) if next_index + response = watch(key, opts) yield response + next_index = response.node.modified_index + 1 end end diff --git a/spec/etcd/watch_spec.rb b/spec/etcd/watch_spec.rb index a219460..ff0a906 100644 --- a/spec/etcd/watch_spec.rb +++ b/spec/etcd/watch_spec.rb @@ -31,7 +31,6 @@ expect(response.node.value).to eq(value) end - it 'with recrusive, waits and return when the key is updated' do response = nil key = random_key @@ -45,4 +44,95 @@ thr.join expect(response.node.value).to eq(value) end + + context :eternal_watch do + let(:key) { random_key } + let(:value) { uuid.generate } + + it 'should loop multiple times collecting responses' do + responses = [] + client.set(key, value: 'initial_value') + + thr = Thread.new do + controlling_loop do |control| + + client.eternal_watch(key, timeout: 3) do |response_in_loop| + responses << response_in_loop + end + + end + end + + sleep 2 + client.set(key, value: 'value-1') + client.set(key, value: 'value-2') + thr.join + + expect(responses.length).to eq 2 + expect(responses.map { |r| r.node.value }).to eq ['value-1', 'value-2'] + end + + it 'can watch recursive keys' do + response = nil + client.set("#{key}/subkey", value:"initial_value") + + thr = Thread.new do + controlling_loop do |control| + + client.eternal_watch(key, recursive: true) do |response_in_loop| + response = response_in_loop + control.stop + end + + end + end + + sleep 2 + client.set("#{key}/subkey", value: value) + thr.join + + expect(response.node.value).to eq(value) + end + + it "resumes watching after the index of the last response" do + responses = [] + client.set(key, value:"initial_value") + + thr = Thread.new do + controlling_loop do |control| + + client.eternal_watch(key, timeout: 3) do |response_in_loop| + responses << response_in_loop + sleep 1 + end + + end + end + + sleep 1 + client.set(key, value: 'value-1') + client.set(key, value: 'value-2') + thr.join + + expect(responses.length).to eq 2 + expect(responses.map { |r| r.node.value }).to eq ['value-1', 'value-2'] + end + end + + private + + class LoopControl + class Stop < Exception; end + def stop + raise Stop + end + end + + def controlling_loop + control = LoopControl.new + begin + yield control + rescue LoopControl::Stop, Net::ReadTimeout + end + end end