Skip to content

Commit

Permalink
Unique expiry (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
jagprog5 committed Sep 10, 2023
1 parent b7b92d1 commit 455cd11
Show file tree
Hide file tree
Showing 7 changed files with 369 additions and 204 deletions.
5 changes: 3 additions & 2 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,11 @@ The goal is to monitor the output and print unique IPs:

```bash
# serves current dir on 8080
python3 -m http.server --directory . 8080 2>&1 >/dev/null | choose --match --multiline -r "^(?>(?:\d++\.){3})\d++" --unique-limit 1000 --flush
python3 -m http.server --directory . 8080 2>&1 >/dev/null\
| choose --match --multiline -r "^(?>(?:\d++\.){3})\d++" --unique-limit 1000 --unique-expiry 900 --flush
```

This form of uniqueness keeps the last N unique ips; least recently received ips are forgotten and will appear in the output again. This keeps the memory usage bounded.
This form of uniqueness keeps the last N unique ips; least recently received ips are forgotten and will appear in the output again. This keeps the memory usage bounded. ips are also forgotten if they haven't been seen in a while.

# Stream Editing

Expand Down
183 changes: 0 additions & 183 deletions src/algo_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@

#include <algorithm>
#include <charconv>
#include <cstring> // memcpy
#include <execution>
#include <list>
#include <set>
#include <unordered_set>
#include <vector>

#include "likely_unlikely.hpp"
Expand Down Expand Up @@ -47,185 +43,6 @@ void stable_partial_sort(ExecutionPolicy&& policy, it begin, it middle, it end,
}
}

namespace {

// this manages the least recently used logic.
// if a new element is inserted into a uniqueness set, it should also be inserted here.
// upon insertion here, the least recently used element will be removed from the uniqueness set if capacity is reached.
template <typename T>
struct LRU {
T* obj = 0;

// least recently used queue. back is next to be removed. front is most recently used
std::list<typename T::iterator> elems;
const size_t n; // elems cap

using refresh_handle = typename decltype(elems)::iterator;

LRU(size_t n) : n(n == 0 ? 1 : n) {
// given the context when this is used, n arg is never 0.
// enforced above for safety.
// it is required since n==0 means upon insertion of an element into the uniqueness set,
// the element is immediately removed by this instance (in LRU::insert).
// meaning the returned iterator (by ForgetfulSet::insert) would be invalid
}

// must be called before using
void setup(T* obj) { this->obj = obj; }

// if an insertion into the uniqueness set failed because it already existed,
// call this function to update the recent-ness of that element
void refresh(refresh_handle it) {
if (it == elems.begin()) {
// no action is needed. refreshed element is already most recent
} else {
// move refreshed element to the front
elems.splice(elems.begin(), elems, it);
}
}

// call when an insertion into the uniqueness set succeeds
void insert(typename T::iterator t) {
elems.push_front(t);
if (likely(elems.size() > n)) {
obj->erase(elems.back());
elems.pop_back();
}
}

void clear() {
obj->clear();
elems.clear();
}
};

} // namespace

// only remembers last n elements. least recently used forgotten
template <typename Key, typename Compare>
class ForgetfulSet {
struct KeyInternal {
Key key;
// ran into some issues with circular type declarations. refresh_handle's
// type is `typename decltype(lru)::refresh_handle`. static assertion below
// for safety.
void* refresh_handle;

operator Key() const { return key; }
};

std::set<KeyInternal, Compare> s;
LRU<decltype(s)> lru;

public:
ForgetfulSet(const Compare& comp, size_t n)
: s(comp), //
lru(n) {}

// must be called before using and after all copies or moves of this instance
void setup() { lru.setup(&s); }

void clear() {
lru.clear(); // clears both
}

auto insert(Key k) {
KeyInternal ki;
ki.key = k;
auto ret = this->s.insert(ki);

bool insertion_success = ret.second;

static_assert(sizeof(typename decltype(lru)::refresh_handle) == sizeof(void*));

if (insertion_success) {
// possibly erases oldest from s
this->lru.insert(ret.first);

// set ki's refresh handle now that it exists in lru
auto refresh_handle = this->lru.elems.begin();
void* conversion_source = &refresh_handle;
void* conversion_destination = &const_cast<KeyInternal&>(*ret.first).refresh_handle;
std::memcpy(conversion_destination, conversion_source, sizeof(void*));
} else {
// element already existed
typename decltype(lru)::refresh_handle refresh_handle;

void* conversion_source = &const_cast<KeyInternal&>(*ret.first).refresh_handle;
void* conversion_destination = &refresh_handle;
std::memcpy(conversion_destination, conversion_source, sizeof(void*));

this->lru.refresh(refresh_handle);
}

return ret;
}
};

// largely copy paste from ForgetfulSet.
template <typename Key, typename Hash, typename KeyEqual>
class ForgetfulUnorderedSet {
struct KeyInternal {
Key key;
// ran into some issues with circular type declarations. refresh_handle's
// type is `typename decltype(lru)::refresh_handle`. static assertion below
// for safety.
void* refresh_handle;

operator Key() const { return key; }
};

std::unordered_set<KeyInternal, Hash, KeyEqual> s;
LRU<decltype(s)> lru;

public:
ForgetfulUnorderedSet(const Hash& hash, const KeyEqual key_equal, float load_factor, size_t n) : s(0, hash, key_equal), lru(n) {
s.max_load_factor(load_factor);
// prevent rehashing by allocating a large enough bucket size. required to
// prevent iters invalidation. +1 since element is inserted before one is erased to maintain cap
s.reserve(this->lru.n + 1);
}

// must be called before using and after all copies or moves of this instance
void setup() { lru.setup(&s); }

void clear() {
lru.clear(); // clears both
}

auto insert(Key k) {
KeyInternal ki;
ki.key = k;
auto ret = this->s.insert(ki);

bool insertion_success = ret.second;

static_assert(sizeof(typename decltype(lru)::refresh_handle) == sizeof(void*));

if (insertion_success) {
// possibly erases oldest from s
this->lru.insert(ret.first);

// set ki's refresh handle now that it exists in lru
auto refresh_handle = this->lru.elems.begin();
void* conversion_source = &refresh_handle;
void* conversion_destination = &const_cast<KeyInternal&>(*ret.first).refresh_handle;
std::memcpy(conversion_destination, conversion_source, sizeof(void*));
} else {
// element already existed
typename decltype(lru)::refresh_handle refresh_handle;

void* conversion_source = &const_cast<KeyInternal&>(*ret.first).refresh_handle;
void* conversion_destination = &refresh_handle;
std::memcpy(conversion_destination, conversion_source, sizeof(void*));

this->lru.refresh(refresh_handle);
}

return ret;
}
};

bool general_numeric_compare(const char* lhs_begin, const char* lhs_end, const char* rhs_begin, const char* rhs_end) { //
float lhs, rhs;
// if from_chars isn't found, get a newer compiler. e.g.
Expand Down
17 changes: 16 additions & 1 deletion src/args.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include <getopt.h>
#include <unistd.h>
#include <chrono>
#include <csignal>
#include <cstring>
#include <limits>
Expand Down Expand Up @@ -49,6 +50,7 @@ struct Arguments {
bool unique_consecutive = false; // after sorting uniqueness

size_t unique_limit = 0; // 0 indicates unused
std::chrono::seconds unique_expiry = std::chrono::seconds::zero();

bool flip = false;
bool flush = false;
Expand Down Expand Up @@ -388,9 +390,12 @@ void print_help_message() {
" truncation --out/--tail (use normal -u in these cases instead)\n"
" --unique-numeric\n"
" apply uniqueness numerically. implies -u\n"
" --unique-expiry <# seconds>\n"
" requires --unique-limit. tokens not received again over a\n"
" specified duration are forgotten\n"
" --unique-general-numeric\n"
" apply uniqueness general numerically. implies -u\n"
" --unique-limit [<#tokens>]\n"
" --unique-limit <#tokens>\n"
" implies -u. forget least recently used tokens\n"
" --unique-use-set\n"
" implies -u. apply uniqueness with a tree instead of a hash table\n"
Expand Down Expand Up @@ -506,6 +511,7 @@ Arguments handle_args(int argc, char* const* argv, FILE* input = NULL, FILE* out
{"load-factor", required_argument, NULL, 0},
{"locale", required_argument, NULL, 0},
{"replace", required_argument, NULL, 0},
{"unique-expiry", required_argument, NULL, 0},
{"unique-limit", required_argument, NULL, 0},
{"head", optional_argument, NULL, 0},
{"index", optional_argument, NULL, 0},
Expand Down Expand Up @@ -690,6 +696,15 @@ Arguments handle_args(int argc, char* const* argv, FILE* input = NULL, FILE* out
}
}
uncompiled_output.ordered_ops.push_back(uncompiled::UncompiledReplaceOp(optarg));
} else if (strcmp("unique-expiry", name) == 0) {
using T = decltype(ret.unique_expiry)::rep;
T t = num::parse_number<T>(on_num_err, optarg);
if constexpr (std::is_signed<T>::value) {
if (t < 0) {
on_num_err(); // only positive expiry is allowed
}
}
ret.unique_expiry = std::chrono::seconds(t);
} else if (strcmp("unique-limit", name) == 0) {
ret.unique_limit = num::parse_number<decltype(ret.unique_limit)>(on_num_err, optarg, false);
ret.unique = true;
Expand Down
31 changes: 19 additions & 12 deletions src/numeric_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ std::optional<T> add_overflow(const T& a, const T& b) {
}

// parse T from null or comma terminating base 10 string.
// the string should otherwise only contain digits
template <typename T, typename OnErr>
std::enable_if_t<std::is_unsigned<T>::value, T> parse_number(OnErr onErr, const char* str, bool zero_allowed = true, bool max_allowed = true) {
// this function was surprisingly difficult to make.
Expand All @@ -50,10 +51,6 @@ std::enable_if_t<std::is_unsigned<T>::value, T> parse_number(OnErr onErr, const
}
T out = 0;

if (*str == '+') {
++str;
}

while (1) {
char ch = *str++;
if (ch == '\0' || ch == ',') {
Expand Down Expand Up @@ -93,6 +90,7 @@ std::enable_if_t<std::is_unsigned<T>::value, T> parse_number(OnErr onErr, const
return out;
}

// allows a leading negative sign followed by digits
template <typename T, typename OnErr>
std::enable_if_t<std::is_signed<T>::value, T> parse_number(OnErr onErr, const char* str) {
if (str == 0) {
Expand Down Expand Up @@ -125,33 +123,42 @@ std::enable_if_t<std::is_signed<T>::value, T> parse_number(OnErr onErr, const ch

template <typename T, typename OnErr>
std::tuple<T, std::optional<T>> parse_number_pair(OnErr onErr, const char* str) {
bool erred = false;
if (str == 0) {
onErr();
return {0, 0};
}

bool erred = false; // parse result
auto local_on_err = [&]() {
erred = true;
onErr();
};

// parse the first number
T first = parse_number<T>(local_on_err, str);
if (erred) {
return {0, 0};
}

// iterate until terminator
while (1) {
char ch = *str;
char ch = *str++;
if (ch == '\0') {
// number pair only had first number
return {first, std::nullopt};
} else if (ch == ',') {
++str;
}

if (ch == ',') {
// number pair had second number
break;
} else if (!in(ch, '0', '9') && ch != '+' && ch != '-') {
onErr();
return {0, 0};
}
++str;
}

T second = parse_number<T>(local_on_err, str);
if (erred) {
return {0, 0};
}

return {first, second};
}

Expand Down
Loading

0 comments on commit 455cd11

Please sign in to comment.