diff --git a/readme.md b/readme.md index d7f96a3..fb66fa4 100644 --- a/readme.md +++ b/readme.md @@ -199,10 +199,11 @@ The goal is to monitor the output and print unique IPs: ```bash # serves current dir on 8080 -python3 -m http.server --directory . 8080 2>&1 >/dev/null | choose --match --multiline -r "^(?>(?:\d++\.){3})\d++" --unique-limit 1000 --flush +python3 -m http.server --directory . 8080 2>&1 >/dev/null\ + | choose --match --multiline -r "^(?>(?:\d++\.){3})\d++" --unique-limit 1000 --unique-expiry 900 --flush ``` -This form of uniqueness keeps the last N unique ips; least recently received ips are forgotten and will appear in the output again. This keeps the memory usage bounded. +This form of uniqueness keeps the last N unique ips; least recently received ips are forgotten and will appear in the output again. This keeps the memory usage bounded. ips are also forgotten if they haven't been seen in a while. # Stream Editing diff --git a/src/algo_utils.hpp b/src/algo_utils.hpp index 0e5db26..55e00e8 100644 --- a/src/algo_utils.hpp +++ b/src/algo_utils.hpp @@ -2,11 +2,7 @@ #include #include -#include // memcpy #include -#include -#include -#include #include #include "likely_unlikely.hpp" @@ -47,185 +43,6 @@ void stable_partial_sort(ExecutionPolicy&& policy, it begin, it middle, it end, } } -namespace { - -// this manages the least recently used logic. -// if a new element is inserted into a uniqueness set, it should also be inserted here. -// upon insertion here, the least recently used element will be removed from the uniqueness set if capacity is reached. -template -struct LRU { - T* obj = 0; - - // least recently used queue. back is next to be removed. front is most recently used - std::list elems; - const size_t n; // elems cap - - using refresh_handle = typename decltype(elems)::iterator; - - LRU(size_t n) : n(n == 0 ? 1 : n) { - // given the context when this is used, n arg is never 0. - // enforced above for safety. - // it is required since n==0 means upon insertion of an element into the uniqueness set, - // the element is immediately removed by this instance (in LRU::insert). - // meaning the returned iterator (by ForgetfulSet::insert) would be invalid - } - - // must be called before using - void setup(T* obj) { this->obj = obj; } - - // if an insertion into the uniqueness set failed because it already existed, - // call this function to update the recent-ness of that element - void refresh(refresh_handle it) { - if (it == elems.begin()) { - // no action is needed. refreshed element is already most recent - } else { - // move refreshed element to the front - elems.splice(elems.begin(), elems, it); - } - } - - // call when an insertion into the uniqueness set succeeds - void insert(typename T::iterator t) { - elems.push_front(t); - if (likely(elems.size() > n)) { - obj->erase(elems.back()); - elems.pop_back(); - } - } - - void clear() { - obj->clear(); - elems.clear(); - } -}; - -} // namespace - -// only remembers last n elements. least recently used forgotten -template -class ForgetfulSet { - struct KeyInternal { - Key key; - // ran into some issues with circular type declarations. refresh_handle's - // type is `typename decltype(lru)::refresh_handle`. static assertion below - // for safety. - void* refresh_handle; - - operator Key() const { return key; } - }; - - std::set s; - LRU lru; - - public: - ForgetfulSet(const Compare& comp, size_t n) - : s(comp), // - lru(n) {} - - // must be called before using and after all copies or moves of this instance - void setup() { lru.setup(&s); } - - void clear() { - lru.clear(); // clears both - } - - auto insert(Key k) { - KeyInternal ki; - ki.key = k; - auto ret = this->s.insert(ki); - - bool insertion_success = ret.second; - - static_assert(sizeof(typename decltype(lru)::refresh_handle) == sizeof(void*)); - - if (insertion_success) { - // possibly erases oldest from s - this->lru.insert(ret.first); - - // set ki's refresh handle now that it exists in lru - auto refresh_handle = this->lru.elems.begin(); - void* conversion_source = &refresh_handle; - void* conversion_destination = &const_cast(*ret.first).refresh_handle; - std::memcpy(conversion_destination, conversion_source, sizeof(void*)); - } else { - // element already existed - typename decltype(lru)::refresh_handle refresh_handle; - - void* conversion_source = &const_cast(*ret.first).refresh_handle; - void* conversion_destination = &refresh_handle; - std::memcpy(conversion_destination, conversion_source, sizeof(void*)); - - this->lru.refresh(refresh_handle); - } - - return ret; - } -}; - -// largely copy paste from ForgetfulSet. -template -class ForgetfulUnorderedSet { - struct KeyInternal { - Key key; - // ran into some issues with circular type declarations. refresh_handle's - // type is `typename decltype(lru)::refresh_handle`. static assertion below - // for safety. - void* refresh_handle; - - operator Key() const { return key; } - }; - - std::unordered_set s; - LRU lru; - - public: - ForgetfulUnorderedSet(const Hash& hash, const KeyEqual key_equal, float load_factor, size_t n) : s(0, hash, key_equal), lru(n) { - s.max_load_factor(load_factor); - // prevent rehashing by allocating a large enough bucket size. required to - // prevent iters invalidation. +1 since element is inserted before one is erased to maintain cap - s.reserve(this->lru.n + 1); - } - - // must be called before using and after all copies or moves of this instance - void setup() { lru.setup(&s); } - - void clear() { - lru.clear(); // clears both - } - - auto insert(Key k) { - KeyInternal ki; - ki.key = k; - auto ret = this->s.insert(ki); - - bool insertion_success = ret.second; - - static_assert(sizeof(typename decltype(lru)::refresh_handle) == sizeof(void*)); - - if (insertion_success) { - // possibly erases oldest from s - this->lru.insert(ret.first); - - // set ki's refresh handle now that it exists in lru - auto refresh_handle = this->lru.elems.begin(); - void* conversion_source = &refresh_handle; - void* conversion_destination = &const_cast(*ret.first).refresh_handle; - std::memcpy(conversion_destination, conversion_source, sizeof(void*)); - } else { - // element already existed - typename decltype(lru)::refresh_handle refresh_handle; - - void* conversion_source = &const_cast(*ret.first).refresh_handle; - void* conversion_destination = &refresh_handle; - std::memcpy(conversion_destination, conversion_source, sizeof(void*)); - - this->lru.refresh(refresh_handle); - } - - return ret; - } -}; - bool general_numeric_compare(const char* lhs_begin, const char* lhs_end, const char* rhs_begin, const char* rhs_end) { // float lhs, rhs; // if from_chars isn't found, get a newer compiler. e.g. diff --git a/src/args.hpp b/src/args.hpp index 6d665d0..47aa8c9 100644 --- a/src/args.hpp +++ b/src/args.hpp @@ -1,6 +1,7 @@ #pragma once #include #include +#include #include #include #include @@ -49,6 +50,7 @@ struct Arguments { bool unique_consecutive = false; // after sorting uniqueness size_t unique_limit = 0; // 0 indicates unused + std::chrono::seconds unique_expiry = std::chrono::seconds::zero(); bool flip = false; bool flush = false; @@ -388,9 +390,12 @@ void print_help_message() { " truncation --out/--tail (use normal -u in these cases instead)\n" " --unique-numeric\n" " apply uniqueness numerically. implies -u\n" + " --unique-expiry <# seconds>\n" + " requires --unique-limit. tokens not received again over a\n" + " specified duration are forgotten\n" " --unique-general-numeric\n" " apply uniqueness general numerically. implies -u\n" - " --unique-limit [<#tokens>]\n" + " --unique-limit <#tokens>\n" " implies -u. forget least recently used tokens\n" " --unique-use-set\n" " implies -u. apply uniqueness with a tree instead of a hash table\n" @@ -506,6 +511,7 @@ Arguments handle_args(int argc, char* const* argv, FILE* input = NULL, FILE* out {"load-factor", required_argument, NULL, 0}, {"locale", required_argument, NULL, 0}, {"replace", required_argument, NULL, 0}, + {"unique-expiry", required_argument, NULL, 0}, {"unique-limit", required_argument, NULL, 0}, {"head", optional_argument, NULL, 0}, {"index", optional_argument, NULL, 0}, @@ -690,6 +696,15 @@ Arguments handle_args(int argc, char* const* argv, FILE* input = NULL, FILE* out } } uncompiled_output.ordered_ops.push_back(uncompiled::UncompiledReplaceOp(optarg)); + } else if (strcmp("unique-expiry", name) == 0) { + using T = decltype(ret.unique_expiry)::rep; + T t = num::parse_number(on_num_err, optarg); + if constexpr (std::is_signed::value) { + if (t < 0) { + on_num_err(); // only positive expiry is allowed + } + } + ret.unique_expiry = std::chrono::seconds(t); } else if (strcmp("unique-limit", name) == 0) { ret.unique_limit = num::parse_number(on_num_err, optarg, false); ret.unique = true; diff --git a/src/numeric_utils.hpp b/src/numeric_utils.hpp index 146488a..99b3405 100644 --- a/src/numeric_utils.hpp +++ b/src/numeric_utils.hpp @@ -36,6 +36,7 @@ std::optional add_overflow(const T& a, const T& b) { } // parse T from null or comma terminating base 10 string. +// the string should otherwise only contain digits template std::enable_if_t::value, T> parse_number(OnErr onErr, const char* str, bool zero_allowed = true, bool max_allowed = true) { // this function was surprisingly difficult to make. @@ -50,10 +51,6 @@ std::enable_if_t::value, T> parse_number(OnErr onErr, const } T out = 0; - if (*str == '+') { - ++str; - } - while (1) { char ch = *str++; if (ch == '\0' || ch == ',') { @@ -93,6 +90,7 @@ std::enable_if_t::value, T> parse_number(OnErr onErr, const return out; } +// allows a leading negative sign followed by digits template std::enable_if_t::value, T> parse_number(OnErr onErr, const char* str) { if (str == 0) { @@ -125,33 +123,42 @@ std::enable_if_t::value, T> parse_number(OnErr onErr, const ch template std::tuple> parse_number_pair(OnErr onErr, const char* str) { - bool erred = false; + if (str == 0) { + onErr(); + return {0, 0}; + } + + bool erred = false; // parse result auto local_on_err = [&]() { erred = true; onErr(); }; + // parse the first number T first = parse_number(local_on_err, str); if (erred) { return {0, 0}; } + + // iterate until terminator while (1) { - char ch = *str; + char ch = *str++; if (ch == '\0') { + // number pair only had first number return {first, std::nullopt}; - } else if (ch == ',') { - ++str; + } + + if (ch == ',') { + // number pair had second number break; - } else if (!in(ch, '0', '9') && ch != '+' && ch != '-') { - onErr(); - return {0, 0}; } - ++str; } + T second = parse_number(local_on_err, str); if (erred) { return {0, 0}; } + return {first, second}; } diff --git a/src/test.cpp b/src/test.cpp index fd371fc..5aee06d 100644 --- a/src/test.cpp +++ b/src/test.cpp @@ -7,6 +7,7 @@ std::optional output_size_bound_testing; #define BOOST_TEST_MODULE choose_test_module #include +#include #include "args.hpp" #include "ncurses_wrapper.hpp" #include "token.hpp" @@ -578,6 +579,99 @@ BOOST_AUTO_TEST_CASE(unique_limit_lru) { BOOST_REQUIRE_EQUAL(out, correct_output); } +BOOST_AUTO_TEST_CASE(unique_expiry) { + // visual inspections also include: + // (echo "1";echo "2";echo "3";sleep 1;echo "2";sleep 1;echo "1";echo "2";echo "3") | choose --unique-limit 1000 --unique-expiry 2 --flush + // # 12313 + auto cmp = [](const choose::Token& lhs, const choose::Token& rhs) { // + return std::lexicographical_compare(lhs.buffer.cbegin(), lhs.buffer.cend(), rhs.buffer.cbegin(), rhs.buffer.cend()); + }; + + auto hash = [](const choose::Token&) { + return 0; // lazy, doesn't matter + }; + + auto eq = [](const choose::Token& lhs, const choose::Token& rhs) { // + return lhs.buffer == rhs.buffer; + }; + + auto delay = std::chrono::milliseconds(10); + + choose::Token token_1(to_vec("1")); + choose::Token token_2(to_vec("2")); + + std::atomic_bool has_errors = false; + // this test is tempermental for CI / valgrind, where the program is run in an + // environment where it can be paused unexpectantly or otherwise take too + // long. hence use of local_assert + auto local_assert = [&](bool p) { + if (!p) { + has_errors = true; + } + }; + + // running these tests in parallel since they incur delays. + std::thread set_thread([&]() { + ForgetfulSet s(cmp, 2, delay); + s.setup(); + // first insertions are successful + local_assert(s.insert(token_2).second); + local_assert(s.insert(token_1).second); + auto start_time1 = std::chrono::steady_clock::now(); + + // spin lock until delay has passed. + // deliberately not using sleep_until / sleep_for. + // this test is impossible to perfect unless running on a hard real time OS + while (start_time1 + delay > std::chrono::steady_clock::now()) { + local_assert(!s.insert(token_1).second); + } + + // correct token was being refreshed + local_assert(!s.insert(token_1).second); + local_assert(s.insert(token_2).second); + auto start_time2 = std::chrono::steady_clock::now(); + // spin lock until delay has passed + while (start_time2 + delay > std::chrono::steady_clock::now()) { + } + local_assert(s.insert(token_1).second); + local_assert(s.insert(token_2).second); + }); + + std::thread unordered_thread([&]() { + ForgetfulUnorderedSet s(hash, eq, 1, 2, delay); + s.setup(); + // first insertions are successful + local_assert(s.insert(token_2).second); + local_assert(s.insert(token_1).second); + auto start_time1 = std::chrono::steady_clock::now(); + + // spin lock until delay has passed. + // deliberately not using sleep_until / sleep_for. + // this test is impossible to perfect unless running on a hard real time OS + while (start_time1 + delay > std::chrono::steady_clock::now()) { + local_assert(!s.insert(token_1).second); + } + + // correct token was being refreshed + local_assert(!s.insert(token_1).second); + local_assert(s.insert(token_2).second); + auto start_time2 = std::chrono::steady_clock::now(); + // spin lock until delay has passed + while (start_time2 + delay > std::chrono::steady_clock::now()) { + } + local_assert(s.insert(token_1).second); + local_assert(s.insert(token_2).second); + }); + + set_thread.join(); + unordered_thread.join(); + + if (has_errors) { + std::cerr << boost::unit_test::framework::current_test_case().full_name().c_str(); + std::cerr << ": test failed due to timing. generally ignore this\n"; + } +} + BOOST_AUTO_TEST_CASE(numeric_unique_use_set) { choose_output out = run_choose("-0\n0\n.0\n1\n1.0\n0001.0", {"--unique-numeric", "--unique-use-set"}); choose_output correct_output{to_vec("-0\n1\n")}; @@ -1397,7 +1491,7 @@ BOOST_AUTO_TEST_CASE(parse_number_unsigned) { auto should_not_be_called = []() { BOOST_REQUIRE(false); }; - BOOST_REQUIRE_EQUAL(num::parse_number(should_not_be_called, "+0"), 0); + BOOST_REQUIRE_EQUAL(num::parse_number(should_not_be_called, "0"), 0); BOOST_REQUIRE_EQUAL(num::parse_number(should_not_be_called, "4294967295"), 0xFFFFFFFF); BOOST_REQUIRE_EQUAL(num::parse_number(should_not_be_called, "16"), 16); @@ -1418,7 +1512,7 @@ BOOST_AUTO_TEST_CASE(parse_number_signed) { auto should_not_be_called = []() { BOOST_REQUIRE(false); }; BOOST_REQUIRE_EQUAL(num::parse_number(should_not_be_called, "-128"), -128); - BOOST_REQUIRE_EQUAL(num::parse_number(should_not_be_called, "+127"), +127); + BOOST_REQUIRE_EQUAL(num::parse_number(should_not_be_called, "127"), +127); BOOST_REQUIRE_EQUAL(num::parse_number(should_not_be_called, "72"), 72); BOOST_REQUIRE_EQUAL(num::parse_number(should_not_be_called, "-72"), -72); @@ -1426,7 +1520,7 @@ BOOST_AUTO_TEST_CASE(parse_number_signed) { auto must_be_called = [&]() { ++err_count; }; BOOST_REQUIRE_EQUAL(num::parse_number(must_be_called, "-129"), 0); - BOOST_REQUIRE_EQUAL(num::parse_number(must_be_called, "+128"), 0); + BOOST_REQUIRE_EQUAL(num::parse_number(must_be_called, "128"), 0); BOOST_REQUIRE_EQUAL(num::parse_number(must_be_called, NULL), 0); BOOST_REQUIRE_EQUAL(err_count, 3); } @@ -1437,7 +1531,7 @@ BOOST_AUTO_TEST_CASE(parse_number_pair) { using T = std::tuple>; BOOST_REQUIRE(num::parse_number_pair(should_not_be_called, "15,51") == (T{15, 51})); - BOOST_REQUIRE(num::parse_number_pair(should_not_be_called, "+15,-51") == (T{15, -51})); + BOOST_REQUIRE(num::parse_number_pair(should_not_be_called, "15,-51") == (T{15, -51})); BOOST_REQUIRE(num::parse_number_pair(should_not_be_called, "15") == (T{15, std::nullopt})); int err_count = 0; diff --git a/src/token.hpp b/src/token.hpp index 97cbbef..f9771b7 100644 --- a/src/token.hpp +++ b/src/token.hpp @@ -11,6 +11,7 @@ #include "args.hpp" #include "regex.hpp" #include "string_utils.hpp" +#include "uniqueness_utils.hpp" /* There's a lot going on in this file. It should have complete code coverage. View with: @@ -357,7 +358,7 @@ std::vector create_tokens(choose::Arguments& args) { if (args.unique_limit == 0) { return unique_checker_T(uniqueness_set_T(uniqueness_set_comparison)); } else { - return unique_checker_T(uniqueness_limit_set_T(uniqueness_set_comparison, args.unique_limit)); + return unique_checker_T(uniqueness_limit_set_T(uniqueness_set_comparison, args.unique_limit, args.unique_expiry)); } } else { if (args.unique_limit == 0) { @@ -368,7 +369,8 @@ std::vector create_tokens(choose::Arguments& args) { return unique_checker_T(unordered_uniqueness_limit_set_T(unordered_set_hash, // unordered_set_equals, // args.unique_load_factor, // - args.unique_limit)); + args.unique_limit, // + args.unique_expiry)); } } } else { diff --git a/src/uniqueness_utils.hpp b/src/uniqueness_utils.hpp new file mode 100644 index 0000000..73937a7 --- /dev/null +++ b/src/uniqueness_utils.hpp @@ -0,0 +1,229 @@ +#pragma once + +#include +#include // memcpy +#include +#include +#include +#include + +namespace choose { + +namespace { + +// used in conjunction with ForgetfulSet or ForgetfulUnorderedSet. +// this manages the least recently used logic and expiry logic +// if a new element is inserted into a uniqueness set, it should also be inserted here. +// upon insertion here, the least recently used element will be removed from the uniqueness set if capacity is reached. +template +struct ForgetfulManager { + using time_point_t = std::chrono::time_point; + using duration_t = typename time_point_t::duration; + + T* obj = 0; + + const size_t n; // elems cap + const duration_t expiry; // 0 indicates no expiry - functionality disabled + + struct Entry { + typename T::iterator it; // element in set + time_point_t t; // last used time + }; + + // least recently used queue. back is next to be removed. front is most recently used + std::list elems; + + using refresh_handle = typename decltype(elems)::iterator; + + ForgetfulManager(size_t n, // + duration_t expiry = duration_t::zero()) + : n(n == 0 ? 1 : n), expiry(expiry) { + // given the context when this is used, n arg is never 0. + // enforced above for safety. + // it is required since n==0 means upon insertion of an element into the uniqueness set, + // the element is immediately removed by this instance (in ForgetfulManager::insert). + // meaning the returned iterator (by ForgetfulSet::insert) would be invalid + } + + // must be called before using and after all copies or moves of this instance + void setup(T* obj) { this->obj = obj; } + + // remove expired elements. must be called before attempting insertion in uniqueness set + void remove_old() { + if (expiry != duration_t::zero()) { + while (!elems.empty()) { + if (std::chrono::steady_clock::now() - elems.back().t >= expiry) { + obj->erase(elems.back().it); + elems.pop_back(); + } else { + break; + } + } + } + } + + // if an insertion into the uniqueness set failed because it already existed, + // call this function to update the recent-ness of that element + void refresh(refresh_handle it) { + it->t = std::chrono::steady_clock::now(); + if (it == elems.begin()) { + // no splice is needed. refreshed element is already most recent + } else { + // move refreshed element to the front + elems.splice(elems.begin(), elems, it); + } + } + + // call when an insertion into the uniqueness set succeeds + void insert(typename T::iterator it) { + Entry e{it, std::chrono::steady_clock::now()}; + elems.push_front(e); + if (likely(elems.size() > n)) { + obj->erase(elems.back().it); + elems.pop_back(); + } + } + + void clear() { + obj->clear(); + elems.clear(); + } +}; + +} // namespace + +// only remembers last n elements. least recently used forgotten +template +struct ForgetfulSet { + struct KeyInternal { + Key key; + // ran into some issues with circular type declarations. refresh_handle's + // type is `typename decltype(lru)::refresh_handle`. static assertion below + // for safety. + void* refresh_handle; + + operator Key() const { return key; } + }; + + std::set s; + ForgetfulManager lru; + + public: + ForgetfulSet(const Compare& comp, // + size_t n, + typename decltype(lru)::duration_t expiry = decltype(lru)::duration_t::zero()) + : s(comp), // + lru(n, expiry) {} + + // must be called before using and after all copies or moves of this instance + void setup() { lru.setup(&s); } + + void clear() { + lru.clear(); // clears both + } + + auto insert(Key k) { + lru.remove_old(); + KeyInternal ki; + ki.key = k; + auto ret = this->s.insert(ki); + + bool insertion_success = ret.second; + + static_assert(sizeof(typename decltype(lru)::refresh_handle) == sizeof(void*)); + + if (insertion_success) { + // possibly erases oldest from s + this->lru.insert(ret.first); + + // set ki's refresh handle now that it exists in lru + auto refresh_handle = this->lru.elems.begin(); + void* conversion_source = &refresh_handle; + void* conversion_destination = &const_cast(*ret.first).refresh_handle; + std::memcpy(conversion_destination, conversion_source, sizeof(void*)); + } else { + // element already existed + typename decltype(lru)::refresh_handle refresh_handle; + + void* conversion_source = &const_cast(*ret.first).refresh_handle; + void* conversion_destination = &refresh_handle; + std::memcpy(conversion_destination, conversion_source, sizeof(void*)); + + this->lru.refresh(refresh_handle); + } + + return ret; + } +}; + +// largely copy paste from ForgetfulSet. +template +struct ForgetfulUnorderedSet { + struct KeyInternal { + Key key; + // ran into some issues with circular type declarations. refresh_handle's + // type is `typename decltype(lru)::refresh_handle`. static assertion below + // for safety. + void* refresh_handle; + + operator Key() const { return key; } + }; + + std::unordered_set s; + ForgetfulManager lru; + + public: + ForgetfulUnorderedSet(const Hash& hash, // + const KeyEqual key_equal, + float load_factor, + size_t n, + typename decltype(lru)::duration_t expiry = decltype(lru)::duration_t::zero()) + : s(0, hash, key_equal), lru(n, expiry) { + s.max_load_factor(load_factor); + // prevent rehashing by allocating a large enough bucket size. required to + // prevent iters invalidation. +1 since element is inserted before one is erased to maintain cap + s.reserve(this->lru.n + 1); + } + + // must be called before using and after all copies or moves of this instance + void setup() { lru.setup(&s); } + + void clear() { + lru.clear(); // clears both + } + + auto insert(Key k) { + lru.remove_old(); + KeyInternal ki; + ki.key = k; + auto ret = this->s.insert(ki); + + bool insertion_success = ret.second; + + static_assert(sizeof(typename decltype(lru)::refresh_handle) == sizeof(void*)); + + if (insertion_success) { + // possibly erases oldest from s + this->lru.insert(ret.first); + + // set ki's refresh handle now that it exists in lru + auto refresh_handle = this->lru.elems.begin(); + void* conversion_source = &refresh_handle; + void* conversion_destination = &const_cast(*ret.first).refresh_handle; + std::memcpy(conversion_destination, conversion_source, sizeof(void*)); + } else { + // element already existed + typename decltype(lru)::refresh_handle refresh_handle; + + void* conversion_source = &const_cast(*ret.first).refresh_handle; + void* conversion_destination = &refresh_handle; + std::memcpy(conversion_destination, conversion_source, sizeof(void*)); + + this->lru.refresh(refresh_handle); + } + + return ret; + } +}; + +} // namespace choose \ No newline at end of file