Skip to content

Commit

Permalink
init and destroy ares channel on demand..
Browse files Browse the repository at this point in the history
  • Loading branch information
arthurpassos committed Aug 15, 2023
1 parent 0dc2be6 commit 5d81619
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 67 deletions.
71 changes: 37 additions & 34 deletions src/Common/CaresPTRResolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,25 @@ namespace DB
}
}

std::mutex CaresPTRResolver::mutex;
struct AresChannelRAII
{
AresChannelRAII()
{
if (ares_init(&channel) != ARES_SUCCESS)
{
throw DB::Exception(DB::ErrorCodes::DNS_ERROR, "Failed to initialize c-ares channel");
}
}

~AresChannelRAII()
{
ares_destroy(channel);
}

ares_channel channel;
};

CaresPTRResolver::CaresPTRResolver(CaresPTRResolver::provider_token) : channel(nullptr)
CaresPTRResolver::CaresPTRResolver(CaresPTRResolver::provider_token)
{
/*
* ares_library_init is not thread safe. Currently, the only other usage of c-ares seems to be in grpc.
Expand All @@ -57,34 +73,22 @@ namespace DB
* */
static const auto library_init_result = ares_library_init(ARES_LIB_INIT_ALL);

if (library_init_result != ARES_SUCCESS || ares_init(&channel) != ARES_SUCCESS)
if (library_init_result != ARES_SUCCESS)
{
throw DB::Exception(DB::ErrorCodes::DNS_ERROR, "Failed to initialize c-ares");
}
}

CaresPTRResolver::~CaresPTRResolver()
{
ares_destroy(channel);
/*
* Library initialization is currently done only once in the constructor. Multiple instances of CaresPTRResolver
* will be used in the lifetime of ClickHouse, thus it's problematic to have de-init here.
* In a practical view, it makes little to no sense to de-init a DNS library since DNS requests will happen
* until the end of the program. Hence, ares_library_cleanup() will not be called.
* */
}

std::unordered_set<std::string> CaresPTRResolver::resolve(const std::string & ip)
{
std::lock_guard guard(mutex);
AresChannelRAII channel_raii;

std::unordered_set<std::string> ptr_records;

resolve(ip, ptr_records);
resolve(ip, ptr_records, channel_raii.channel);

if (!wait_and_process())
if (!wait_and_process(channel_raii.channel))
{
cancel_requests();
throw DB::Exception(DB::ErrorCodes::DNS_ERROR, "Failed to complete reverse DNS query for IP {}", ip);
}

Expand All @@ -93,22 +97,21 @@ namespace DB

std::unordered_set<std::string> CaresPTRResolver::resolve_v6(const std::string & ip)
{
std::lock_guard guard(mutex);
AresChannelRAII channel_raii;

std::unordered_set<std::string> ptr_records;

resolve_v6(ip, ptr_records);
resolve_v6(ip, ptr_records, channel_raii.channel);

if (!wait_and_process())
if (!wait_and_process(channel_raii.channel))
{
cancel_requests();
throw DB::Exception(DB::ErrorCodes::DNS_ERROR, "Failed to complete reverse DNS query for IP {}", ip);
}

return ptr_records;
}

void CaresPTRResolver::resolve(const std::string & ip, std::unordered_set<std::string> & response)
void CaresPTRResolver::resolve(const std::string & ip, std::unordered_set<std::string> & response, ares_channel channel)
{
in_addr addr;

Expand All @@ -117,23 +120,23 @@ namespace DB
ares_gethostbyaddr(channel, reinterpret_cast<const void*>(&addr), sizeof(addr), AF_INET, callback, &response);
}

void CaresPTRResolver::resolve_v6(const std::string & ip, std::unordered_set<std::string> & response)
void CaresPTRResolver::resolve_v6(const std::string & ip, std::unordered_set<std::string> & response, ares_channel channel)
{
in6_addr addr;
inet_pton(AF_INET6, ip.c_str(), &addr);

ares_gethostbyaddr(channel, reinterpret_cast<const void*>(&addr), sizeof(addr), AF_INET6, callback, &response);
}

bool CaresPTRResolver::wait_and_process()
bool CaresPTRResolver::wait_and_process(ares_channel channel)
{
int sockets[ARES_GETSOCK_MAXNUM];
pollfd pollfd[ARES_GETSOCK_MAXNUM];

while (true)
{
auto readable_sockets = get_readable_sockets(sockets, pollfd);
auto timeout = calculate_timeout();
auto readable_sockets = get_readable_sockets(sockets, pollfd, channel);
auto timeout = calculate_timeout(channel);

int number_of_fds_ready = 0;
if (!readable_sockets.empty())
Expand All @@ -158,24 +161,24 @@ namespace DB

if (number_of_fds_ready > 0)
{
process_readable_sockets(readable_sockets);
process_readable_sockets(readable_sockets, channel);
}
else
{
process_possible_timeout();
process_possible_timeout(channel);
break;
}
}

return true;
}

void CaresPTRResolver::cancel_requests()
void CaresPTRResolver::cancel_requests(ares_channel channel)
{
ares_cancel(channel);
}

std::span<pollfd> CaresPTRResolver::get_readable_sockets(int * sockets, pollfd * pollfd)
std::span<pollfd> CaresPTRResolver::get_readable_sockets(int * sockets, pollfd * pollfd, ares_channel channel)
{
int sockets_bitmask = ares_getsock(channel, sockets, ARES_GETSOCK_MAXNUM);

Expand Down Expand Up @@ -205,7 +208,7 @@ namespace DB
return std::span<struct pollfd>(pollfd, number_of_sockets_to_poll);
}

int64_t CaresPTRResolver::calculate_timeout()
int64_t CaresPTRResolver::calculate_timeout(ares_channel channel)
{
timeval tv;
if (auto * tvp = ares_timeout(channel, nullptr, &tv))
Expand All @@ -218,14 +221,14 @@ namespace DB
return 0;
}

void CaresPTRResolver::process_possible_timeout()
void CaresPTRResolver::process_possible_timeout(ares_channel channel)
{
/* Call ares_process() unconditonally here, even if we simply timed out
above, as otherwise the ares name resolve won't timeout! */
ares_process_fd(channel, ARES_SOCKET_BAD, ARES_SOCKET_BAD);
}

void CaresPTRResolver::process_readable_sockets(std::span<pollfd> readable_sockets)
void CaresPTRResolver::process_readable_sockets(std::span<pollfd> readable_sockets, ares_channel channel)
{
for (auto readable_socket : readable_sockets)
{
Expand Down
29 changes: 16 additions & 13 deletions src/Common/CaresPTRResolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,35 @@ namespace DB

public:
explicit CaresPTRResolver(provider_token);
~CaresPTRResolver() override;

/*
* Library initialization is currently done only once in the constructor. Multiple instances of CaresPTRResolver
* will be used in the lifetime of ClickHouse, thus it's problematic to have de-init here.
* In a practical view, it makes little to no sense to de-init a DNS library since DNS requests will happen
* until the end of the program. Hence, ares_library_cleanup() will not be called.
* */
~CaresPTRResolver() override = default;

std::unordered_set<std::string> resolve(const std::string & ip) override;

std::unordered_set<std::string> resolve_v6(const std::string & ip) override;

private:
bool wait_and_process();

void cancel_requests();

void resolve(const std::string & ip, std::unordered_set<std::string> & response);
bool wait_and_process(ares_channel channel);

void resolve_v6(const std::string & ip, std::unordered_set<std::string> & response);
void cancel_requests(ares_channel channel);

std::span<pollfd> get_readable_sockets(int * sockets, pollfd * pollfd);
void resolve(const std::string & ip, std::unordered_set<std::string> & response, ares_channel channel);

int64_t calculate_timeout();
void resolve_v6(const std::string & ip, std::unordered_set<std::string> & response, ares_channel channel);

void process_possible_timeout();
std::span<pollfd> get_readable_sockets(int * sockets, pollfd * pollfd, ares_channel channel);

void process_readable_sockets(std::span<pollfd> readable_sockets);
int64_t calculate_timeout(ares_channel channel);

ares_channel channel;
void process_possible_timeout(ares_channel channel);

static std::mutex mutex;
void process_readable_sockets(std::span<pollfd> readable_sockets, ares_channel channel);
};
}

41 changes: 21 additions & 20 deletions src/Common/tests/gtest_dns_reverse_resolve.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,35 @@ namespace DB
{
TEST(Common, ReverseDNS)
{
auto addresses = std::vector<std::string>({
"8.8.8.8", "2001:4860:4860::8888", // dns.google
"142.250.219.35", // google.com
"157.240.12.35", // facebook
"208.84.244.116", "2600:1419:c400::214:c410", //www.terra.com.br,
"127.0.0.1", "::1"
});

auto func = [&]()
{
// Good random seed, good engine
auto rnd1 = std::mt19937(std::random_device{}());

for (int i = 0; i < 50; ++i)
for (int i = 0; i < 10; ++i)
{
auto & dns_resolver_instance = DNSResolver::instance();
// unfortunately, DNS cache can't be disabled because we might end up causing a DDoS attack
// dns_resolver_instance.setDisableCacheFlag();

auto addr_index = rnd1() % addresses.size();

[[maybe_unused]] auto result = dns_resolver_instance.reverseResolve(Poco::Net::IPAddress{ addresses[addr_index] });

// will not assert either because some of the IP addresses might change in the future and
// this test will become flaky
// ASSERT_TRUE(!result.empty());
dns_resolver_instance.setDisableCacheFlag();

auto val1 = rnd1() % static_cast<uint32_t>((pow(2, 31) - 1));
auto val2 = rnd1() % static_cast<uint32_t>((pow(2, 31) - 1));
auto val3 = rnd1() % static_cast<uint32_t>((pow(2, 31) - 1));
auto val4 = rnd1() % static_cast<uint32_t>((pow(2, 31) - 1));

uint32_t ipv4_buffer[1] = {
static_cast<uint32_t>(val1)
};

uint32_t ipv6_buffer[4] = {
static_cast<uint32_t>(val1),
static_cast<uint32_t>(val2),
static_cast<uint32_t>(val3),
static_cast<uint32_t>(val4)
};

dns_resolver_instance.reverseResolve(Poco::Net::IPAddress{ ipv4_buffer, sizeof(ipv4_buffer)});
dns_resolver_instance.reverseResolve(Poco::Net::IPAddress{ ipv6_buffer, sizeof(ipv6_buffer)});
}

};

auto number_of_threads = 200u;
Expand Down

0 comments on commit 5d81619

Please sign in to comment.