Skip to content

Commit

Permalink
Merge pull request #3048 from sap-contributions/try-each-bbs-ip
Browse files Browse the repository at this point in the history
Diego client tries all bbs domain IPs
  • Loading branch information
FloThinksPi committed Nov 11, 2022
2 parents e5bcde4 + dce76a9 commit 0013b4e
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 186 deletions.
182 changes: 88 additions & 94 deletions lib/diego/client.rb
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
require 'diego/bbs/bbs'
require 'diego/errors'
require 'diego/routes'
require 'uri'
require 'resolv'
require 'net/http'

module Diego
class Client
PROTOBUF_HEADER = { 'Content-Type'.freeze => 'application/x-protobuf'.freeze }.freeze

def initialize(url:, ca_cert_file:, client_cert_file:, client_key_file:,
connect_timeout:, send_timeout:, receive_timeout:)
ENV['PB_IGNORE_DEPRECATIONS'] ||= 'true'
@client = build_client(
url,
@bbs_url = URI(url)
@http_client = new_http_client(
ca_cert_file,
client_cert_file,
client_key_file,
Expand All @@ -20,157 +21,136 @@ def initialize(url:, ca_cert_file:, client_cert_file:, client_key_file:,
end

def ping
response = with_request_error_handling do
client.post(Routes::PING)
end
req = post_request(path: Routes::PING)
response = request_with_error_handling(req)

validate_status!(response: response, statuses: [200])
validate_status!(response)
protobuf_decode!(response.body, Bbs::Models::PingResponse)
end

def upsert_domain(domain:, ttl:)
request = protobuf_encode!({ domain: domain, ttl: ttl.to_i }, Bbs::Models::UpsertDomainRequest)

response = with_request_error_handling do
client.post(Routes::UPSERT_DOMAIN, request, PROTOBUF_HEADER)
end
req = post_request(body: protobuf_encode!({ domain: domain, ttl: ttl.to_i }, Bbs::Models::UpsertDomainRequest), path: Routes::UPSERT_DOMAIN)
response = request_with_error_handling(req)

validate_status!(response: response, statuses: [200])
validate_status!(response)
protobuf_decode!(response.body, Bbs::Models::UpsertDomainResponse)
end

def desire_task(task_definition:, domain:, task_guid:)
request = protobuf_encode!({ task_definition: task_definition, domain: domain, task_guid: task_guid }, Bbs::Models::DesireTaskRequest)

response = with_request_error_handling do
client.post(Routes::DESIRE_TASK, request, PROTOBUF_HEADER)
end
req = post_request(body: protobuf_encode!({ task_definition: task_definition, domain: domain, task_guid: task_guid }, Bbs::Models::DesireTaskRequest),
path: Routes::DESIRE_TASK)
response = request_with_error_handling(req)

validate_status!(response: response, statuses: [200])
validate_status!(response)
protobuf_decode!(response.body, Bbs::Models::TaskLifecycleResponse)
end

def task_by_guid(task_guid)
request = protobuf_encode!({ task_guid: task_guid }, Bbs::Models::TaskByGuidRequest)
req = post_request(body: protobuf_encode!({ task_guid: task_guid }, Bbs::Models::TaskByGuidRequest), path: Routes::TASK_BY_GUID)
response = request_with_error_handling(req)

response = with_request_error_handling do
client.post(Routes::TASK_BY_GUID, request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
validate_status!(response)
protobuf_decode!(response.body, Bbs::Models::TaskResponse)
end

def tasks(domain: '', cell_id: '')
request = protobuf_encode!({ domain: domain, cell_id: cell_id }, Bbs::Models::TasksRequest)

response = with_request_error_handling do
client.post(Routes::LIST_TASKS, request, PROTOBUF_HEADER)
end
req = post_request(body: protobuf_encode!({ domain: domain, cell_id: cell_id }, Bbs::Models::TasksRequest), path: Routes::LIST_TASKS)
response = request_with_error_handling(req)

validate_status!(response: response, statuses: [200])
validate_status!(response)
protobuf_decode!(response.body, Bbs::Models::TasksResponse)
end

def cancel_task(task_guid)
request = protobuf_encode!({ task_guid: task_guid }, Bbs::Models::TaskGuidRequest)

response = with_request_error_handling do
client.post(Routes::CANCEL_TASK, request, PROTOBUF_HEADER)
end
req = post_request(body: protobuf_encode!({ task_guid: task_guid }, Bbs::Models::TaskGuidRequest), path: Routes::CANCEL_TASK)
response = request_with_error_handling(req)

validate_status!(response: response, statuses: [200])
validate_status!(response)
protobuf_decode!(response.body, Bbs::Models::TaskLifecycleResponse)
end

def desire_lrp(lrp)
request = protobuf_encode!({ desired_lrp: lrp }, Bbs::Models::DesireLRPRequest)
req = post_request(body: protobuf_encode!({ desired_lrp: lrp }, Bbs::Models::DesireLRPRequest), path: Routes::DESIRE_LRP)
response = request_with_error_handling(req)

response = with_request_error_handling do
client.post(Routes::DESIRE_LRP, request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
validate_status!(response)
protobuf_decode!(response.body, Bbs::Models::DesiredLRPLifecycleResponse)
end

def desired_lrp_by_process_guid(process_guid)
request = protobuf_encode!({ process_guid: process_guid }, Bbs::Models::DesiredLRPByProcessGuidRequest)

response = with_request_error_handling do
client.post(Routes::DESIRED_LRP_BY_PROCESS_GUID, request, PROTOBUF_HEADER)
end
req = post_request(body: protobuf_encode!({ process_guid: process_guid }, Bbs::Models::DesiredLRPByProcessGuidRequest), path: Routes::DESIRED_LRP_BY_PROCESS_GUID)
response = request_with_error_handling(req)

validate_status!(response: response, statuses: [200])
validate_status!(response)
protobuf_decode!(response.body, Bbs::Models::DesiredLRPResponse)
end

def update_desired_lrp(process_guid, lrp_update)
request = protobuf_encode!({ process_guid: process_guid, update: lrp_update }, Bbs::Models::UpdateDesiredLRPRequest)

response = with_request_error_handling do
client.post(Routes::UPDATE_DESIRED_LRP, request, PROTOBUF_HEADER)
end
req = post_request(body: protobuf_encode!({ process_guid: process_guid, update: lrp_update }, Bbs::Models::UpdateDesiredLRPRequest), path: Routes::UPDATE_DESIRED_LRP)
response = request_with_error_handling(req)

validate_status!(response: response, statuses: [200])
validate_status!(response)
protobuf_decode!(response.body, Bbs::Models::DesiredLRPLifecycleResponse)
end

def remove_desired_lrp(process_guid)
request = protobuf_encode!({ process_guid: process_guid }, Bbs::Models::RemoveDesiredLRPRequest)
req = post_request(body: protobuf_encode!({ process_guid: process_guid }, Bbs::Models::RemoveDesiredLRPRequest), path: Routes::REMOVE_DESIRED_LRP)
response = request_with_error_handling(req)

response = with_request_error_handling do
client.post(Routes::REMOVE_DESIRED_LRP, request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
validate_status!(response)
protobuf_decode!(response.body, Bbs::Models::DesiredLRPLifecycleResponse)
end

def retire_actual_lrp(actual_lrp_key)
request = protobuf_encode!({ actual_lrp_key: actual_lrp_key }, Bbs::Models::RetireActualLRPRequest)

response = with_request_error_handling do
client.post(Routes::RETIRE_ACTUAL_LRP, request, PROTOBUF_HEADER)
end
req = post_request(body: protobuf_encode!({ actual_lrp_key: actual_lrp_key }, Bbs::Models::RetireActualLRPRequest), path: Routes::RETIRE_ACTUAL_LRP)
response = request_with_error_handling(req)

validate_status!(response: response, statuses: [200])
validate_status!(response)
protobuf_decode!(response.body, Bbs::Models::ActualLRPLifecycleResponse)
end

def desired_lrp_scheduling_infos(domain)
request = protobuf_encode!({ domain: domain }, Bbs::Models::DesiredLRPsRequest)
req = post_request(body: protobuf_encode!({ domain: domain }, Bbs::Models::DesiredLRPsRequest), path: Routes::DESIRED_LRP_SCHEDULING_INFOS)
response = request_with_error_handling(req)

response = with_request_error_handling do
client.post(Routes::DESIRED_LRP_SCHEDULING_INFOS, request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
validate_status!(response)
protobuf_decode!(response.body, Bbs::Models::DesiredLRPSchedulingInfosResponse)
end

def actual_lrps_by_process_guid(process_guid)
request = protobuf_encode!({ process_guid: process_guid }, Bbs::Models::ActualLRPsRequest)

response = with_request_error_handling do
client.post(Routes::ACTUAL_LRPS, request, PROTOBUF_HEADER)
end
req = post_request(body: protobuf_encode!({ process_guid: process_guid }, Bbs::Models::ActualLRPsRequest), path: Routes::ACTUAL_LRPS)
response = request_with_error_handling(req)

validate_status!(response: response, statuses: [200])
validate_status!(response)
protobuf_decode!(response.body, Bbs::Models::ActualLRPsResponse)
end

def with_request_error_handling(&blk)
tries ||= 3
yield
def request_with_error_handling(req)
attempt ||= 1
http_client.ipaddr = bbs_ip # tell the HTTP client which exact IP to target
http_client.request(req)
rescue Resolv::ResolvError, Resolv::ResolvTimeout => e
raise DnsResolutionError.new("dns resolution failed for #{bbs_url.host}: #{e.message}")
rescue => e
retry unless (tries -= 1).zero?
eliminated_ip = ips_remaining.shift
logger.debug("attempt #{attempt} of 3: failed to reach the active bbs server on #{eliminated_ip}, removing from list")
retry unless ips_remaining.empty? && (attempt += 1) > 3
raise RequestError.new(e.message)
end

def bbs_ip
self.ips_remaining = bbs_ip_addresses if ips_remaining.nil? || ips_remaining.empty?
ips_remaining.first
end

private

attr_reader :client
attr_reader :http_client, :bbs_url
attr_accessor :ips_remaining

def logger
@logger ||= Steno.logger('cc.diego.client')
end

def protobuf_encode!(hash, protobuf_message_class)
# See below link to understand proto3 message encoding
Expand All @@ -180,8 +160,15 @@ def protobuf_encode!(hash, protobuf_message_class)
raise EncodeError.new(e.message)
end

def validate_status!(response:, statuses:)
raise ResponseError.new("failed with status: #{response.status}, body: #{response.body}") unless statuses.include?(response.status)
def post_request(body: nil, path:)
req = Net::HTTP::Post.new(path)
req.body = body if body
req['Content-Type'.freeze] = 'application/x-protobuf'.freeze
req
end

def validate_status!(response)
raise ResponseError.new("failed with status: #{response.code}, body: #{response.body}") unless response.code == '200'
end

def protobuf_decode!(message, protobuf_decoder)
Expand All @@ -190,14 +177,21 @@ def protobuf_decode!(message, protobuf_decoder)
raise DecodeError.new(e.message)
end

def build_client(url, ca_cert_file, client_cert_file, client_key_file,
def bbs_ip_addresses
Resolv.getaddresses(bbs_url.host).dup
end

def new_http_client(ca_cert_file, client_cert_file, client_key_file,
connect_timeout, send_timeout, receive_timeout)
client = HTTPClient.new(base_url: url)
client.connect_timeout = connect_timeout
client.send_timeout = send_timeout
client.receive_timeout = receive_timeout
client.ssl_config.set_client_cert_file(client_cert_file, client_key_file)
client.ssl_config.set_trust_ca(ca_cert_file)
client = Net::HTTP.new(bbs_url.host, bbs_url.port)
client.use_ssl = true
client.verify_mode = OpenSSL::SSL::VERIFY_PEER
client.key = OpenSSL::PKey::RSA.new(File.read(client_key_file))
client.cert = OpenSSL::X509::Certificate.new(File.read(client_cert_file))
client.ca_file = ca_cert_file
client.open_timeout = connect_timeout
client.read_timeout = receive_timeout
client.write_timeout = send_timeout
client
end
end
Expand Down
3 changes: 3 additions & 0 deletions lib/diego/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,7 @@ class DecodeError < Error

class EncodeError < Error
end

class DnsResolutionError < Error
end
end
Loading

0 comments on commit 0013b4e

Please sign in to comment.