Skip to content

Commit

Permalink
document that watcher stops see #273 and add each_with_retry
Browse files Browse the repository at this point in the history
  • Loading branch information
grosser committed Nov 27, 2017
1 parent 2700c33 commit 927bdd0
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 9 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,12 @@ watcher.each do |notice|
end
```

The `.each` will stop when the connection is interrupted, use `each_with_retry` if you want to continue:

```ruby
client.watch_pods.each_with_retry { |notice| ... }
```

It is possible to interrupt the watcher from another thread with:

```ruby
Expand Down
11 changes: 11 additions & 0 deletions lib/kubeclient/watch_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,27 @@ def each
end

buffer = ''
done = false
response.body.each do |chunk|
done = false
buffer << chunk
while (line = buffer.slice!(/.+\n/))
yield(@format == :json ? WatchNotice.new(JSON.parse(line)) : line.chomp)
end
done = true
end
done
rescue IOError
raise unless @finished
end

def each_with_retry(&block)
loop do
done = each(&block)
break if !done || @finished
end
end

def finish
@finished = true
@http_client.close unless @http_client.nil?
Expand Down
59 changes: 50 additions & 9 deletions test/test_watch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,7 @@ def test_watch_pod_success
{ 'type' => 'DELETED', 'resourceVersion' => '1398' }
]

stub_request(:get, %r{/api/v1$})
.to_return(body: open_test_file('core_api_resource_list.json'),
status: 200)

stub_request(:get, %r{.*\/watch/pods})
.to_return(body: open_test_file('watch_stream.json'),
status: 200)

client = Kubeclient::Client.new('http://localhost:8080/api/', 'v1')
client = setup_regular_response

client.watch_pods.to_enum.with_index do |notice, index|
assert_instance_of(Kubeclient::Common::WatchNotice, notice)
Expand Down Expand Up @@ -116,4 +108,53 @@ def test_watch_with_field_selector
"#{api_host}/v1/watch/events?fieldSelector=#{selector}",
times: 1)
end

def test_watch_with_retry_continues_on_stream_closure
Timeout.timeout(1) do
events = 0
setup_regular_response.watch_pods.each_with_retry do
events += 1
break if events == 10
end
assert_equal(10, events)
end
end

def test_watch_with_retry_stops_on_finish
Timeout.timeout(1) do
events = 0
watcher = setup_regular_response.watch_pods
watcher.each_with_retry do
watcher.finish
events += 1
break if events == 10
end
assert_equal(3, events)
end
end

def test_watch_with_retry_stops_on_finish_exceptions
Timeout.timeout(1) do
events = 0
watcher = setup_regular_response.watch_pods
watcher.each_with_retry do
watcher.finish
events += 1
raise IOError
end
assert_equal(1, events)
end
end

private

def setup_regular_response
stub_request(:get, %r{/api/v1$})
.to_return(body: open_test_file('core_api_resource_list.json'), status: 200)

stub_request(:get, %r{.*\/watch/pods})
.to_return(body: open_test_file('watch_stream.json'), status: 200)

Kubeclient::Client.new('http://localhost:8080/api/', 'v1')
end
end

0 comments on commit 927bdd0

Please sign in to comment.