Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unique expiry #42

Merged
merged 6 commits into from
Sep 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,11 @@ The goal is to monitor the output and print unique IPs:

```bash
# serves current dir on 8080
python3 -m http.server --directory . 8080 2>&1 >/dev/null | choose --match --multiline -r "^(?>(?:\d++\.){3})\d++" --unique-limit 1000 --flush
python3 -m http.server --directory . 8080 2>&1 >/dev/null\
| choose --match --multiline -r "^(?>(?:\d++\.){3})\d++" --unique-limit 1000 --unique-expiry 900 --flush
```

This form of uniqueness keeps the last N unique ips; least recently received ips are forgotten and will appear in the output again. This keeps the memory usage bounded.
This form of uniqueness keeps the last N unique ips; least recently received ips are forgotten and will appear in the output again. This keeps the memory usage bounded. ips are also forgotten if they haven't been seen in a while.

# Stream Editing

Expand Down
183 changes: 0 additions & 183 deletions src/algo_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@

#include <algorithm>
#include <charconv>
#include <cstring> // memcpy
#include <execution>
#include <list>
#include <set>
#include <unordered_set>
#include <vector>

#include "likely_unlikely.hpp"
Expand Down Expand Up @@ -47,185 +43,6 @@ void stable_partial_sort(ExecutionPolicy&& policy, it begin, it middle, it end,
}
}

namespace {

// this manages the least recently used logic.
// if a new element is inserted into a uniqueness set, it should also be inserted here.
// upon insertion here, the least recently used element will be removed from the uniqueness set if capacity is reached.
template <typename T>
struct LRU {
T* obj = 0;

// least recently used queue. back is next to be removed. front is most recently used
std::list<typename T::iterator> elems;
const size_t n; // elems cap

using refresh_handle = typename decltype(elems)::iterator;

LRU(size_t n) : n(n == 0 ? 1 : n) {
// given the context when this is used, n arg is never 0.
// enforced above for safety.
// it is required since n==0 means upon insertion of an element into the uniqueness set,
// the element is immediately removed by this instance (in LRU::insert).
// meaning the returned iterator (by ForgetfulSet::insert) would be invalid
}

// must be called before using
void setup(T* obj) { this->obj = obj; }

// if an insertion into the uniqueness set failed because it already existed,
// call this function to update the recent-ness of that element
void refresh(refresh_handle it) {
if (it == elems.begin()) {
// no action is needed. refreshed element is already most recent
} else {
// move refreshed element to the front
elems.splice(elems.begin(), elems, it);
}
}

// call when an insertion into the uniqueness set succeeds
void insert(typename T::iterator t) {
elems.push_front(t);
if (likely(elems.size() > n)) {
obj->erase(elems.back());
elems.pop_back();
}
}

void clear() {
obj->clear();
elems.clear();
}
};

} // namespace

// only remembers last n elements. least recently used forgotten
template <typename Key, typename Compare>
class ForgetfulSet {
struct KeyInternal {
Key key;
// ran into some issues with circular type declarations. refresh_handle's
// type is `typename decltype(lru)::refresh_handle`. static assertion below
// for safety.
void* refresh_handle;

operator Key() const { return key; }
};

std::set<KeyInternal, Compare> s;
LRU<decltype(s)> lru;

public:
ForgetfulSet(const Compare& comp, size_t n)
: s(comp), //
lru(n) {}

// must be called before using and after all copies or moves of this instance
void setup() { lru.setup(&s); }

void clear() {
lru.clear(); // clears both
}

auto insert(Key k) {
KeyInternal ki;
ki.key = k;
auto ret = this->s.insert(ki);

bool insertion_success = ret.second;

static_assert(sizeof(typename decltype(lru)::refresh_handle) == sizeof(void*));

if (insertion_success) {
// possibly erases oldest from s
this->lru.insert(ret.first);

// set ki's refresh handle now that it exists in lru
auto refresh_handle = this->lru.elems.begin();
void* conversion_source = &refresh_handle;
void* conversion_destination = &const_cast<KeyInternal&>(*ret.first).refresh_handle;
std::memcpy(conversion_destination, conversion_source, sizeof(void*));
} else {
// element already existed
typename decltype(lru)::refresh_handle refresh_handle;

void* conversion_source = &const_cast<KeyInternal&>(*ret.first).refresh_handle;
void* conversion_destination = &refresh_handle;
std::memcpy(conversion_destination, conversion_source, sizeof(void*));

this->lru.refresh(refresh_handle);
}

return ret;
}
};

// largely copy paste from ForgetfulSet.
template <typename Key, typename Hash, typename KeyEqual>
class ForgetfulUnorderedSet {
struct KeyInternal {
Key key;
// ran into some issues with circular type declarations. refresh_handle's
// type is `typename decltype(lru)::refresh_handle`. static assertion below
// for safety.
void* refresh_handle;

operator Key() const { return key; }
};

std::unordered_set<KeyInternal, Hash, KeyEqual> s;
LRU<decltype(s)> lru;

public:
ForgetfulUnorderedSet(const Hash& hash, const KeyEqual key_equal, float load_factor, size_t n) : s(0, hash, key_equal), lru(n) {
s.max_load_factor(load_factor);
// prevent rehashing by allocating a large enough bucket size. required to
// prevent iters invalidation. +1 since element is inserted before one is erased to maintain cap
s.reserve(this->lru.n + 1);
}

// must be called before using and after all copies or moves of this instance
void setup() { lru.setup(&s); }

void clear() {
lru.clear(); // clears both
}

auto insert(Key k) {
KeyInternal ki;
ki.key = k;
auto ret = this->s.insert(ki);

bool insertion_success = ret.second;

static_assert(sizeof(typename decltype(lru)::refresh_handle) == sizeof(void*));

if (insertion_success) {
// possibly erases oldest from s
this->lru.insert(ret.first);

// set ki's refresh handle now that it exists in lru
auto refresh_handle = this->lru.elems.begin();
void* conversion_source = &refresh_handle;
void* conversion_destination = &const_cast<KeyInternal&>(*ret.first).refresh_handle;
std::memcpy(conversion_destination, conversion_source, sizeof(void*));
} else {
// element already existed
typename decltype(lru)::refresh_handle refresh_handle;

void* conversion_source = &const_cast<KeyInternal&>(*ret.first).refresh_handle;
void* conversion_destination = &refresh_handle;
std::memcpy(conversion_destination, conversion_source, sizeof(void*));

this->lru.refresh(refresh_handle);
}

return ret;
}
};

bool general_numeric_compare(const char* lhs_begin, const char* lhs_end, const char* rhs_begin, const char* rhs_end) { //
float lhs, rhs;
// if from_chars isn't found, get a newer compiler. e.g.
Expand Down
17 changes: 16 additions & 1 deletion src/args.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include <getopt.h>
#include <unistd.h>
#include <chrono>
#include <csignal>
#include <cstring>
#include <limits>
Expand Down Expand Up @@ -49,6 +50,7 @@ struct Arguments {
bool unique_consecutive = false; // after sorting uniqueness

size_t unique_limit = 0; // 0 indicates unused
std::chrono::seconds unique_expiry = std::chrono::seconds::zero();

bool flip = false;
bool flush = false;
Expand Down Expand Up @@ -388,9 +390,12 @@ void print_help_message() {
" truncation --out/--tail (use normal -u in these cases instead)\n"
" --unique-numeric\n"
" apply uniqueness numerically. implies -u\n"
" --unique-expiry <# seconds>\n"
" requires --unique-limit. tokens not received again over a\n"
" specified duration are forgotten\n"
" --unique-general-numeric\n"
" apply uniqueness general numerically. implies -u\n"
" --unique-limit [<#tokens>]\n"
" --unique-limit <#tokens>\n"
" implies -u. forget least recently used tokens\n"
" --unique-use-set\n"
" implies -u. apply uniqueness with a tree instead of a hash table\n"
Expand Down Expand Up @@ -506,6 +511,7 @@ Arguments handle_args(int argc, char* const* argv, FILE* input = NULL, FILE* out
{"load-factor", required_argument, NULL, 0},
{"locale", required_argument, NULL, 0},
{"replace", required_argument, NULL, 0},
{"unique-expiry", required_argument, NULL, 0},
{"unique-limit", required_argument, NULL, 0},
{"head", optional_argument, NULL, 0},
{"index", optional_argument, NULL, 0},
Expand Down Expand Up @@ -690,6 +696,15 @@ Arguments handle_args(int argc, char* const* argv, FILE* input = NULL, FILE* out
}
}
uncompiled_output.ordered_ops.push_back(uncompiled::UncompiledReplaceOp(optarg));
} else if (strcmp("unique-expiry", name) == 0) {
using T = decltype(ret.unique_expiry)::rep;
T t = num::parse_number<T>(on_num_err, optarg);
if constexpr (std::is_signed<T>::value) {
if (t < 0) {
on_num_err(); // only positive expiry is allowed
}
}
ret.unique_expiry = std::chrono::seconds(t);
} else if (strcmp("unique-limit", name) == 0) {
ret.unique_limit = num::parse_number<decltype(ret.unique_limit)>(on_num_err, optarg, false);
ret.unique = true;
Expand Down
31 changes: 19 additions & 12 deletions src/numeric_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ std::optional<T> add_overflow(const T& a, const T& b) {
}

// parse T from null or comma terminating base 10 string.
// the string should otherwise only contain digits
template <typename T, typename OnErr>
std::enable_if_t<std::is_unsigned<T>::value, T> parse_number(OnErr onErr, const char* str, bool zero_allowed = true, bool max_allowed = true) {
// this function was surprisingly difficult to make.
Expand All @@ -50,10 +51,6 @@ std::enable_if_t<std::is_unsigned<T>::value, T> parse_number(OnErr onErr, const
}
T out = 0;

if (*str == '+') {
++str;
}

while (1) {
char ch = *str++;
if (ch == '\0' || ch == ',') {
Expand Down Expand Up @@ -93,6 +90,7 @@ std::enable_if_t<std::is_unsigned<T>::value, T> parse_number(OnErr onErr, const
return out;
}

// allows a leading negative sign followed by digits
template <typename T, typename OnErr>
std::enable_if_t<std::is_signed<T>::value, T> parse_number(OnErr onErr, const char* str) {
if (str == 0) {
Expand Down Expand Up @@ -125,33 +123,42 @@ std::enable_if_t<std::is_signed<T>::value, T> parse_number(OnErr onErr, const ch

template <typename T, typename OnErr>
std::tuple<T, std::optional<T>> parse_number_pair(OnErr onErr, const char* str) {
bool erred = false;
if (str == 0) {
onErr();
return {0, 0};
}

bool erred = false; // parse result
auto local_on_err = [&]() {
erred = true;
onErr();
};

// parse the first number
T first = parse_number<T>(local_on_err, str);
if (erred) {
return {0, 0};
}

// iterate until terminator
while (1) {
char ch = *str;
char ch = *str++;
if (ch == '\0') {
// number pair only had first number
return {first, std::nullopt};
} else if (ch == ',') {
++str;
}

if (ch == ',') {
// number pair had second number
break;
} else if (!in(ch, '0', '9') && ch != '+' && ch != '-') {
onErr();
return {0, 0};
}
++str;
}

T second = parse_number<T>(local_on_err, str);
if (erred) {
return {0, 0};
}

return {first, second};
}

Expand Down
Loading