diff --git a/README.md b/README.md index b804e40c..4a392c00 100644 --- a/README.md +++ b/README.md @@ -382,6 +382,12 @@ watcher.each do |notice| end ``` +The `.each` will stop when the connection is interrupted, use `each_with_retry` if you want to continue: + +```ruby +client.watch_pods.each_with_retry { |notice| ... } +``` + It is possible to interrupt the watcher from another thread with: ```ruby diff --git a/lib/kubeclient/watch_stream.rb b/lib/kubeclient/watch_stream.rb index 4902f2aa..111f4785 100644 --- a/lib/kubeclient/watch_stream.rb +++ b/lib/kubeclient/watch_stream.rb @@ -21,16 +21,27 @@ def each end buffer = '' + done = false response.body.each do |chunk| + done = false buffer << chunk while (line = buffer.slice!(/.+\n/)) yield(@format == :json ? WatchNotice.new(JSON.parse(line)) : line.chomp) end + done = true end + done rescue IOError raise unless @finished end + def each_with_retry(&block) + loop do + done = each(&block) + break if !done || @finished + end + end + def finish @finished = true @http_client.close unless @http_client.nil? diff --git a/test/test_watch.rb b/test/test_watch.rb index 37ca615f..5ea49b2e 100644 --- a/test/test_watch.rb +++ b/test/test_watch.rb @@ -9,15 +9,7 @@ def test_watch_pod_success { 'type' => 'DELETED', 'resourceVersion' => '1398' } ] - stub_request(:get, %r{/api/v1$}) - .to_return(body: open_test_file('core_api_resource_list.json'), - status: 200) - - stub_request(:get, %r{.*\/watch/pods}) - .to_return(body: open_test_file('watch_stream.json'), - status: 200) - - client = Kubeclient::Client.new('http://localhost:8080/api/', 'v1') + client = setup_regular_response client.watch_pods.to_enum.with_index do |notice, index| assert_instance_of(Kubeclient::Common::WatchNotice, notice) @@ -116,4 +108,53 @@ def test_watch_with_field_selector "#{api_host}/v1/watch/events?fieldSelector=#{selector}", times: 1) end + + def test_watch_with_retry_continues_on_stream_closure + Timeout.timeout(1) do + events = 0 + setup_regular_response.watch_pods.each_with_retry do + events += 1 + break if events == 10 + end + assert_equal(10, events) + end + end + + def test_watch_with_retry_stops_on_finish + Timeout.timeout(1) do + events = 0 + watcher = setup_regular_response.watch_pods + watcher.each_with_retry do + watcher.finish + events += 1 + break if events == 10 + end + assert_equal(3, events) + end + end + + def test_watch_with_retry_stops_on_finish_exceptions + Timeout.timeout(1) do + events = 0 + watcher = setup_regular_response.watch_pods + watcher.each_with_retry do + watcher.finish + events += 1 + raise IOError + end + assert_equal(1, events) + end + end + + private + + def setup_regular_response + stub_request(:get, %r{/api/v1$}) + .to_return(body: open_test_file('core_api_resource_list.json'), status: 200) + + stub_request(:get, %r{.*\/watch/pods}) + .to_return(body: open_test_file('watch_stream.json'), status: 200) + + Kubeclient::Client.new('http://localhost:8080/api/', 'v1') + end end