Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Diego client tries all bbs domain IPs #3048

Merged
merged 4 commits into from
Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 88 additions & 94 deletions lib/diego/client.rb
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
require 'diego/bbs/bbs'
require 'diego/errors'
require 'diego/routes'
require 'uri'
require 'resolv'
require 'net/http'

module Diego
class Client
PROTOBUF_HEADER = { 'Content-Type'.freeze => 'application/x-protobuf'.freeze }.freeze

def initialize(url:, ca_cert_file:, client_cert_file:, client_key_file:,
connect_timeout:, send_timeout:, receive_timeout:)
ENV['PB_IGNORE_DEPRECATIONS'] ||= 'true'
@client = build_client(
url,
@bbs_url = URI(url)
@http_client = new_http_client(
ca_cert_file,
client_cert_file,
client_key_file,
Expand All @@ -20,157 +21,136 @@ def initialize(url:, ca_cert_file:, client_cert_file:, client_key_file:,
end

def ping
response = with_request_error_handling do
client.post(Routes::PING)
end
req = post_request(path: Routes::PING)
response = request_with_error_handling(req)

validate_status!(response: response, statuses: [200])
validate_status!(response)
protobuf_decode!(response.body, Bbs::Models::PingResponse)
end

def upsert_domain(domain:, ttl:)
request = protobuf_encode!({ domain: domain, ttl: ttl.to_i }, Bbs::Models::UpsertDomainRequest)

response = with_request_error_handling do
client.post(Routes::UPSERT_DOMAIN, request, PROTOBUF_HEADER)
end
req = post_request(body: protobuf_encode!({ domain: domain, ttl: ttl.to_i }, Bbs::Models::UpsertDomainRequest), path: Routes::UPSERT_DOMAIN)
response = request_with_error_handling(req)

validate_status!(response: response, statuses: [200])
validate_status!(response)
protobuf_decode!(response.body, Bbs::Models::UpsertDomainResponse)
end

def desire_task(task_definition:, domain:, task_guid:)
request = protobuf_encode!({ task_definition: task_definition, domain: domain, task_guid: task_guid }, Bbs::Models::DesireTaskRequest)

response = with_request_error_handling do
client.post(Routes::DESIRE_TASK, request, PROTOBUF_HEADER)
end
req = post_request(body: protobuf_encode!({ task_definition: task_definition, domain: domain, task_guid: task_guid }, Bbs::Models::DesireTaskRequest),
path: Routes::DESIRE_TASK)
response = request_with_error_handling(req)

validate_status!(response: response, statuses: [200])
validate_status!(response)
protobuf_decode!(response.body, Bbs::Models::TaskLifecycleResponse)
end

def task_by_guid(task_guid)
request = protobuf_encode!({ task_guid: task_guid }, Bbs::Models::TaskByGuidRequest)
req = post_request(body: protobuf_encode!({ task_guid: task_guid }, Bbs::Models::TaskByGuidRequest), path: Routes::TASK_BY_GUID)
response = request_with_error_handling(req)

response = with_request_error_handling do
client.post(Routes::TASK_BY_GUID, request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
validate_status!(response)
protobuf_decode!(response.body, Bbs::Models::TaskResponse)
end

def tasks(domain: '', cell_id: '')
request = protobuf_encode!({ domain: domain, cell_id: cell_id }, Bbs::Models::TasksRequest)

response = with_request_error_handling do
client.post(Routes::LIST_TASKS, request, PROTOBUF_HEADER)
end
req = post_request(body: protobuf_encode!({ domain: domain, cell_id: cell_id }, Bbs::Models::TasksRequest), path: Routes::LIST_TASKS)
response = request_with_error_handling(req)

validate_status!(response: response, statuses: [200])
validate_status!(response)
protobuf_decode!(response.body, Bbs::Models::TasksResponse)
end

def cancel_task(task_guid)
request = protobuf_encode!({ task_guid: task_guid }, Bbs::Models::TaskGuidRequest)

response = with_request_error_handling do
client.post(Routes::CANCEL_TASK, request, PROTOBUF_HEADER)
end
req = post_request(body: protobuf_encode!({ task_guid: task_guid }, Bbs::Models::TaskGuidRequest), path: Routes::CANCEL_TASK)
response = request_with_error_handling(req)

validate_status!(response: response, statuses: [200])
validate_status!(response)
protobuf_decode!(response.body, Bbs::Models::TaskLifecycleResponse)
end

def desire_lrp(lrp)
request = protobuf_encode!({ desired_lrp: lrp }, Bbs::Models::DesireLRPRequest)
req = post_request(body: protobuf_encode!({ desired_lrp: lrp }, Bbs::Models::DesireLRPRequest), path: Routes::DESIRE_LRP)
response = request_with_error_handling(req)

response = with_request_error_handling do
client.post(Routes::DESIRE_LRP, request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
validate_status!(response)
protobuf_decode!(response.body, Bbs::Models::DesiredLRPLifecycleResponse)
end

def desired_lrp_by_process_guid(process_guid)
request = protobuf_encode!({ process_guid: process_guid }, Bbs::Models::DesiredLRPByProcessGuidRequest)

response = with_request_error_handling do
client.post(Routes::DESIRED_LRP_BY_PROCESS_GUID, request, PROTOBUF_HEADER)
end
req = post_request(body: protobuf_encode!({ process_guid: process_guid }, Bbs::Models::DesiredLRPByProcessGuidRequest), path: Routes::DESIRED_LRP_BY_PROCESS_GUID)
response = request_with_error_handling(req)

validate_status!(response: response, statuses: [200])
validate_status!(response)
protobuf_decode!(response.body, Bbs::Models::DesiredLRPResponse)
end

def update_desired_lrp(process_guid, lrp_update)
request = protobuf_encode!({ process_guid: process_guid, update: lrp_update }, Bbs::Models::UpdateDesiredLRPRequest)

response = with_request_error_handling do
client.post(Routes::UPDATE_DESIRED_LRP, request, PROTOBUF_HEADER)
end
req = post_request(body: protobuf_encode!({ process_guid: process_guid, update: lrp_update }, Bbs::Models::UpdateDesiredLRPRequest), path: Routes::UPDATE_DESIRED_LRP)
response = request_with_error_handling(req)

validate_status!(response: response, statuses: [200])
validate_status!(response)
protobuf_decode!(response.body, Bbs::Models::DesiredLRPLifecycleResponse)
end

def remove_desired_lrp(process_guid)
request = protobuf_encode!({ process_guid: process_guid }, Bbs::Models::RemoveDesiredLRPRequest)
req = post_request(body: protobuf_encode!({ process_guid: process_guid }, Bbs::Models::RemoveDesiredLRPRequest), path: Routes::REMOVE_DESIRED_LRP)
response = request_with_error_handling(req)

response = with_request_error_handling do
client.post(Routes::REMOVE_DESIRED_LRP, request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
validate_status!(response)
protobuf_decode!(response.body, Bbs::Models::DesiredLRPLifecycleResponse)
end

def retire_actual_lrp(actual_lrp_key)
request = protobuf_encode!({ actual_lrp_key: actual_lrp_key }, Bbs::Models::RetireActualLRPRequest)

response = with_request_error_handling do
client.post(Routes::RETIRE_ACTUAL_LRP, request, PROTOBUF_HEADER)
end
req = post_request(body: protobuf_encode!({ actual_lrp_key: actual_lrp_key }, Bbs::Models::RetireActualLRPRequest), path: Routes::RETIRE_ACTUAL_LRP)
response = request_with_error_handling(req)

validate_status!(response: response, statuses: [200])
validate_status!(response)
protobuf_decode!(response.body, Bbs::Models::ActualLRPLifecycleResponse)
end

def desired_lrp_scheduling_infos(domain)
request = protobuf_encode!({ domain: domain }, Bbs::Models::DesiredLRPsRequest)
req = post_request(body: protobuf_encode!({ domain: domain }, Bbs::Models::DesiredLRPsRequest), path: Routes::DESIRED_LRP_SCHEDULING_INFOS)
response = request_with_error_handling(req)

response = with_request_error_handling do
client.post(Routes::DESIRED_LRP_SCHEDULING_INFOS, request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
validate_status!(response)
protobuf_decode!(response.body, Bbs::Models::DesiredLRPSchedulingInfosResponse)
end

def actual_lrps_by_process_guid(process_guid)
request = protobuf_encode!({ process_guid: process_guid }, Bbs::Models::ActualLRPsRequest)

response = with_request_error_handling do
client.post(Routes::ACTUAL_LRPS, request, PROTOBUF_HEADER)
end
req = post_request(body: protobuf_encode!({ process_guid: process_guid }, Bbs::Models::ActualLRPsRequest), path: Routes::ACTUAL_LRPS)
response = request_with_error_handling(req)

validate_status!(response: response, statuses: [200])
validate_status!(response)
protobuf_decode!(response.body, Bbs::Models::ActualLRPsResponse)
end

def with_request_error_handling(&blk)
tries ||= 3
yield
def request_with_error_handling(req)
attempt ||= 1
http_client.ipaddr = bbs_ip # tell the HTTP client which exact IP to target
http_client.request(req)
rescue Resolv::ResolvError, Resolv::ResolvTimeout => e
raise DnsResolutionError.new("dns resolution failed for #{bbs_url.host}: #{e.message}")
rescue => e
retry unless (tries -= 1).zero?
eliminated_ip = ips_remaining.shift
philippthun marked this conversation as resolved.
Show resolved Hide resolved
logger.debug("attempt #{attempt} of 3: failed to reach the active bbs server on #{eliminated_ip}, removing from list")
retry unless ips_remaining.empty? && (attempt += 1) > 3
raise RequestError.new(e.message)
end

def bbs_ip
self.ips_remaining = bbs_ip_addresses if ips_remaining.nil? || ips_remaining.empty?
philippthun marked this conversation as resolved.
Show resolved Hide resolved
ips_remaining.first
end

private

attr_reader :client
attr_reader :http_client, :bbs_url
attr_accessor :ips_remaining

def logger
@logger ||= Steno.logger('cc.diego.client')
end

def protobuf_encode!(hash, protobuf_message_class)
# See below link to understand proto3 message encoding
Expand All @@ -180,8 +160,15 @@ def protobuf_encode!(hash, protobuf_message_class)
raise EncodeError.new(e.message)
end

def validate_status!(response:, statuses:)
raise ResponseError.new("failed with status: #{response.status}, body: #{response.body}") unless statuses.include?(response.status)
def post_request(body: nil, path:)
req = Net::HTTP::Post.new(path)
req.body = body if body
req['Content-Type'.freeze] = 'application/x-protobuf'.freeze
req
end

def validate_status!(response)
raise ResponseError.new("failed with status: #{response.code}, body: #{response.body}") unless response.code == '200'
end

def protobuf_decode!(message, protobuf_decoder)
Expand All @@ -190,14 +177,21 @@ def protobuf_decode!(message, protobuf_decoder)
raise DecodeError.new(e.message)
end

def build_client(url, ca_cert_file, client_cert_file, client_key_file,
def bbs_ip_addresses
Resolv.getaddresses(bbs_url.host).dup
will-gant marked this conversation as resolved.
Show resolved Hide resolved
end

def new_http_client(ca_cert_file, client_cert_file, client_key_file,
connect_timeout, send_timeout, receive_timeout)
client = HTTPClient.new(base_url: url)
client.connect_timeout = connect_timeout
client.send_timeout = send_timeout
client.receive_timeout = receive_timeout
client.ssl_config.set_client_cert_file(client_cert_file, client_key_file)
client.ssl_config.set_trust_ca(ca_cert_file)
client = Net::HTTP.new(bbs_url.host, bbs_url.port)
client.use_ssl = true
client.verify_mode = OpenSSL::SSL::VERIFY_PEER
client.key = OpenSSL::PKey::RSA.new(File.read(client_key_file))
client.cert = OpenSSL::X509::Certificate.new(File.read(client_cert_file))
client.ca_file = ca_cert_file
client.open_timeout = connect_timeout
client.read_timeout = receive_timeout
client.write_timeout = send_timeout
client
end
end
Expand Down
3 changes: 3 additions & 0 deletions lib/diego/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,7 @@ class DecodeError < Error

class EncodeError < Error
end

class DnsResolutionError < Error
end
end
Loading