Skip to content

Commit

Permalink
Unique expiry (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
jagprog5 committed Sep 10, 2023
1 parent b7b92d1 commit 9a3a1c4
Show file tree
Hide file tree
Showing 7 changed files with 375 additions and 209 deletions.
18 changes: 11 additions & 7 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ here

Transformations can be done in a specified order. This command prints every other word by:

1. Suffixing each token with its arrival order
2. Filtering for tokens that end with an even number
3. Substituting to remove the arrival order
1. Suffixing each token with its arrival order.
2. Filtering for tokens that end with an even number.
3. Substituting to remove the arrival order.

```bash
echo -n 'every other word is printed here' | \
Expand All @@ -112,7 +112,7 @@ Compared to this:
cat some_content | choose -f "test" --head 5
```

The former is restricted to working with `lines`, whereas the latter works with `tokens`. Tokens are contiguous ranges and can contain newline characters, whereas lines can't.
The former is restricted to working with `lines`, whereas the latter works with `tokens`. Tokens are contiguous ranges and can contain newline characters, whereas lines can't. choose is line oriented by default, but doesn't have to be.

# Sorting and Uniqueness

Expand Down Expand Up @@ -187,7 +187,7 @@ ccc

# Monitoring

Suppose there's an input that's running for a really long time. For example, a python http server, with an output like this:
Suppose there's an input that's running for a _really_ long time. For example, a python http server, with an output like this:

```txt
127.0.0.1 - - [08/Sep/2023 22:11:48] "GET /tester.txt HTTP/1.1" 200 -
Expand All @@ -199,10 +199,14 @@ The goal is to monitor the output and print unique IPs:

```bash
# serves current dir on 8080
python3 -m http.server --directory . 8080 2>&1 >/dev/null | choose --match --multiline -r "^(?>(?:\d++\.){3})\d++" --unique-limit 1000 --flush
python3 -m http.server --directory . 8080 2>&1 >/dev/null\
| choose --match --multiline -r "^(?>(?:\d++\.){3})\d++" --unique-limit 1000 --unique-expiry 900 --flush
```

This form of uniqueness keeps the last N unique ips; least recently received ips are forgotten and will appear in the output again. This keeps the memory usage bounded.
This applies a form of bounded uniqueness in the face of an infinite input; tokens can be forgotten based on space and time constraints, meaning they can pass to the output again:

- There is a restriction in the number of tokens that can be remembered (`unique-limit`); least recently received tokens are forgotten.
- There is a limit on the recentness of tokens (`unique-expiry`); if a token hasn't been seen in a while then it is also forgotten.

# Stream Editing

Expand Down
183 changes: 0 additions & 183 deletions src/algo_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@

#include <algorithm>
#include <charconv>
#include <cstring> // memcpy
#include <execution>
#include <list>
#include <set>
#include <unordered_set>
#include <vector>

#include "likely_unlikely.hpp"
Expand Down Expand Up @@ -47,185 +43,6 @@ void stable_partial_sort(ExecutionPolicy&& policy, it begin, it middle, it end,
}
}

namespace {

// this manages the least recently used logic.
// if a new element is inserted into a uniqueness set, it should also be inserted here.
// upon insertion here, the least recently used element will be removed from the uniqueness set if capacity is reached.
template <typename T>
struct LRU {
T* obj = 0;

// least recently used queue. back is next to be removed. front is most recently used
std::list<typename T::iterator> elems;
const size_t n; // elems cap

using refresh_handle = typename decltype(elems)::iterator;

LRU(size_t n) : n(n == 0 ? 1 : n) {
// given the context when this is used, n arg is never 0.
// enforced above for safety.
// it is required since n==0 means upon insertion of an element into the uniqueness set,
// the element is immediately removed by this instance (in LRU::insert).
// meaning the returned iterator (by ForgetfulSet::insert) would be invalid
}

// must be called before using
void setup(T* obj) { this->obj = obj; }

// if an insertion into the uniqueness set failed because it already existed,
// call this function to update the recent-ness of that element
void refresh(refresh_handle it) {
if (it == elems.begin()) {
// no action is needed. refreshed element is already most recent
} else {
// move refreshed element to the front
elems.splice(elems.begin(), elems, it);
}
}

// call when an insertion into the uniqueness set succeeds
void insert(typename T::iterator t) {
elems.push_front(t);
if (likely(elems.size() > n)) {
obj->erase(elems.back());
elems.pop_back();
}
}

void clear() {
obj->clear();
elems.clear();
}
};

} // namespace

// only remembers last n elements. least recently used forgotten
template <typename Key, typename Compare>
class ForgetfulSet {
struct KeyInternal {
Key key;
// ran into some issues with circular type declarations. refresh_handle's
// type is `typename decltype(lru)::refresh_handle`. static assertion below
// for safety.
void* refresh_handle;

operator Key() const { return key; }
};

std::set<KeyInternal, Compare> s;
LRU<decltype(s)> lru;

public:
ForgetfulSet(const Compare& comp, size_t n)
: s(comp), //
lru(n) {}

// must be called before using and after all copies or moves of this instance
void setup() { lru.setup(&s); }

void clear() {
lru.clear(); // clears both
}

auto insert(Key k) {
KeyInternal ki;
ki.key = k;
auto ret = this->s.insert(ki);

bool insertion_success = ret.second;

static_assert(sizeof(typename decltype(lru)::refresh_handle) == sizeof(void*));

if (insertion_success) {
// possibly erases oldest from s
this->lru.insert(ret.first);

// set ki's refresh handle now that it exists in lru
auto refresh_handle = this->lru.elems.begin();
void* conversion_source = &refresh_handle;
void* conversion_destination = &const_cast<KeyInternal&>(*ret.first).refresh_handle;
std::memcpy(conversion_destination, conversion_source, sizeof(void*));
} else {
// element already existed
typename decltype(lru)::refresh_handle refresh_handle;

void* conversion_source = &const_cast<KeyInternal&>(*ret.first).refresh_handle;
void* conversion_destination = &refresh_handle;
std::memcpy(conversion_destination, conversion_source, sizeof(void*));

this->lru.refresh(refresh_handle);
}

return ret;
}
};

// largely copy paste from ForgetfulSet.
template <typename Key, typename Hash, typename KeyEqual>
class ForgetfulUnorderedSet {
struct KeyInternal {
Key key;
// ran into some issues with circular type declarations. refresh_handle's
// type is `typename decltype(lru)::refresh_handle`. static assertion below
// for safety.
void* refresh_handle;

operator Key() const { return key; }
};

std::unordered_set<KeyInternal, Hash, KeyEqual> s;
LRU<decltype(s)> lru;

public:
ForgetfulUnorderedSet(const Hash& hash, const KeyEqual key_equal, float load_factor, size_t n) : s(0, hash, key_equal), lru(n) {
s.max_load_factor(load_factor);
// prevent rehashing by allocating a large enough bucket size. required to
// prevent iters invalidation. +1 since element is inserted before one is erased to maintain cap
s.reserve(this->lru.n + 1);
}

// must be called before using and after all copies or moves of this instance
void setup() { lru.setup(&s); }

void clear() {
lru.clear(); // clears both
}

auto insert(Key k) {
KeyInternal ki;
ki.key = k;
auto ret = this->s.insert(ki);

bool insertion_success = ret.second;

static_assert(sizeof(typename decltype(lru)::refresh_handle) == sizeof(void*));

if (insertion_success) {
// possibly erases oldest from s
this->lru.insert(ret.first);

// set ki's refresh handle now that it exists in lru
auto refresh_handle = this->lru.elems.begin();
void* conversion_source = &refresh_handle;
void* conversion_destination = &const_cast<KeyInternal&>(*ret.first).refresh_handle;
std::memcpy(conversion_destination, conversion_source, sizeof(void*));
} else {
// element already existed
typename decltype(lru)::refresh_handle refresh_handle;

void* conversion_source = &const_cast<KeyInternal&>(*ret.first).refresh_handle;
void* conversion_destination = &refresh_handle;
std::memcpy(conversion_destination, conversion_source, sizeof(void*));

this->lru.refresh(refresh_handle);
}

return ret;
}
};

bool general_numeric_compare(const char* lhs_begin, const char* lhs_end, const char* rhs_begin, const char* rhs_end) { //
float lhs, rhs;
// if from_chars isn't found, get a newer compiler. e.g.
Expand Down
15 changes: 14 additions & 1 deletion src/args.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include <getopt.h>
#include <unistd.h>
#include <chrono>
#include <csignal>
#include <cstring>
#include <limits>
Expand Down Expand Up @@ -49,6 +50,7 @@ struct Arguments {
bool unique_consecutive = false; // after sorting uniqueness

size_t unique_limit = 0; // 0 indicates unused
std::chrono::seconds unique_expiry = std::chrono::seconds::zero();

bool flip = false;
bool flush = false;
Expand Down Expand Up @@ -388,9 +390,12 @@ void print_help_message() {
" truncation --out/--tail (use normal -u in these cases instead)\n"
" --unique-numeric\n"
" apply uniqueness numerically. implies -u\n"
" --unique-expiry <# seconds>\n"
" requires --unique-limit. tokens not received again over a\n"
" specified duration are forgotten\n"
" --unique-general-numeric\n"
" apply uniqueness general numerically. implies -u\n"
" --unique-limit [<#tokens>]\n"
" --unique-limit <#tokens>\n"
" implies -u. forget least recently used tokens\n"
" --unique-use-set\n"
" implies -u. apply uniqueness with a tree instead of a hash table\n"
Expand Down Expand Up @@ -506,6 +511,7 @@ Arguments handle_args(int argc, char* const* argv, FILE* input = NULL, FILE* out
{"load-factor", required_argument, NULL, 0},
{"locale", required_argument, NULL, 0},
{"replace", required_argument, NULL, 0},
{"unique-expiry", required_argument, NULL, 0},
{"unique-limit", required_argument, NULL, 0},
{"head", optional_argument, NULL, 0},
{"index", optional_argument, NULL, 0},
Expand Down Expand Up @@ -690,6 +696,13 @@ Arguments handle_args(int argc, char* const* argv, FILE* input = NULL, FILE* out
}
}
uncompiled_output.ordered_ops.push_back(uncompiled::UncompiledReplaceOp(optarg));
} else if (strcmp("unique-expiry", name) == 0) {
using T = decltype(ret.unique_expiry)::rep;
T t = num::parse_number<T>(on_num_err, optarg);
if (t <= 0) {
on_num_err(); // only positive expiry is allowed
}
ret.unique_expiry = std::chrono::seconds(t);
} else if (strcmp("unique-limit", name) == 0) {
ret.unique_limit = num::parse_number<decltype(ret.unique_limit)>(on_num_err, optarg, false);
ret.unique = true;
Expand Down
31 changes: 19 additions & 12 deletions src/numeric_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ std::optional<T> add_overflow(const T& a, const T& b) {
}

// parse T from null or comma terminating base 10 string.
// the string should otherwise only contain digits
template <typename T, typename OnErr>
std::enable_if_t<std::is_unsigned<T>::value, T> parse_number(OnErr onErr, const char* str, bool zero_allowed = true, bool max_allowed = true) {
// this function was surprisingly difficult to make.
Expand All @@ -50,10 +51,6 @@ std::enable_if_t<std::is_unsigned<T>::value, T> parse_number(OnErr onErr, const
}
T out = 0;

if (*str == '+') {
++str;
}

while (1) {
char ch = *str++;
if (ch == '\0' || ch == ',') {
Expand Down Expand Up @@ -93,6 +90,7 @@ std::enable_if_t<std::is_unsigned<T>::value, T> parse_number(OnErr onErr, const
return out;
}

// allows a leading negative sign followed by digits
template <typename T, typename OnErr>
std::enable_if_t<std::is_signed<T>::value, T> parse_number(OnErr onErr, const char* str) {
if (str == 0) {
Expand Down Expand Up @@ -125,33 +123,42 @@ std::enable_if_t<std::is_signed<T>::value, T> parse_number(OnErr onErr, const ch

template <typename T, typename OnErr>
std::tuple<T, std::optional<T>> parse_number_pair(OnErr onErr, const char* str) {
bool erred = false;
if (str == 0) {
onErr();
return {0, 0};
}

bool erred = false; // parse result
auto local_on_err = [&]() {
erred = true;
onErr();
};

// parse the first number
T first = parse_number<T>(local_on_err, str);
if (erred) {
return {0, 0};
}

// iterate until terminator
while (1) {
char ch = *str;
char ch = *str++;
if (ch == '\0') {
// number pair only had first number
return {first, std::nullopt};
} else if (ch == ',') {
++str;
}

if (ch == ',') {
// number pair had second number
break;
} else if (!in(ch, '0', '9') && ch != '+' && ch != '-') {
onErr();
return {0, 0};
}
++str;
}

T second = parse_number<T>(local_on_err, str);
if (erred) {
return {0, 0};
}

return {first, second};
}

Expand Down
Loading

0 comments on commit 9a3a1c4

Please sign in to comment.