Skip to content

Commit

Permalink
handle keys larger than 511 bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Aug 15, 2024
1 parent 9a804d4 commit 7861001
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 5 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions turbopack/crates/turbo-tasks-backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ anyhow = { workspace = true }
async-trait = { workspace = true }
auto-hash-map = { workspace = true }
bincode = "1.3.3"
byteorder = "1.5.0"
dashmap = { workspace = true }
indexmap = { workspace = true }
lmdb = "0.8.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod extended_key;

use std::{
collections::{hash_map::Entry, HashMap},
error::Error,
Expand Down Expand Up @@ -118,7 +120,8 @@ impl BackingStorage for LmdbBackingStorage {
let task_id = **task_id;
let task_type_bytes = bincode::serialize(&task_type)
.with_context(|| anyhow!("Unable to serialize task cache key {task_type:?}"))?;
tx.put(
extended_key::put(
&mut tx,
self.forward_task_cache_db,
&task_type_bytes,
&task_id.to_be_bytes(),
Expand Down Expand Up @@ -204,8 +207,7 @@ impl BackingStorage for LmdbBackingStorage {
fn forward_lookup_task_cache(&self, task_type: &CachedTaskType) -> Option<TaskId> {
let tx = self.env.begin_ro_txn().ok()?;
let task_type = bincode::serialize(task_type).ok()?;
let result = tx
.get(self.forward_task_cache_db, &task_type)
let result = extended_key::get(&tx, self.forward_task_cache_db, &task_type)
.ok()
.and_then(|v| v.try_into().ok())
.map(|v| TaskId::from(u32::from_be_bytes(v)));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use std::hash::{Hash, Hasher};

use byteorder::ByteOrder;
use lmdb::{Database, RoTransaction, RwTransaction, Transaction, WriteFlags};
use rustc_hash::FxHasher;

const MAX_KEY_SIZE: usize = 511;
const SHARED_KEY: usize = MAX_KEY_SIZE - 8;

pub fn get<'tx>(
tx: &'tx RoTransaction<'tx>,
database: Database,
key: &[u8],
) -> lmdb::Result<&'tx [u8]> {
if key.len() > MAX_KEY_SIZE - 1 {
let hashed_key = hashed_key(key);
let data = tx.get(database, &hashed_key)?;
let mut iter = ExtendedValueIter::new(data);
while let Some((k, v)) = iter.next() {

Check failure on line 19 in turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage/extended_key.rs

View workflow job for this annotation

GitHub Actions / rust check / build

this loop could be written as a `for` loop
if k == key {
return Ok(v);
}
}
Err(lmdb::Error::NotFound)
} else {
tx.get(database, &key)
}
}

pub fn put(
tx: &mut RwTransaction<'_>,
database: Database,
key: &[u8],
value: &[u8],
flags: WriteFlags,
) -> lmdb::Result<()> {
if key.len() > MAX_KEY_SIZE - 1 {
let hashed_key = hashed_key(key);

let size = key.len() - SHARED_KEY + value.len() + 8;
let old = tx.get(database, &hashed_key);
let old_size = old.map_or(0, |v| v.len());
let mut data = Vec::with_capacity(old_size + size);
data.extend_from_slice(&((key.len() - SHARED_KEY) as u32).to_be_bytes());
data.extend_from_slice(&(value.len() as u32).to_be_bytes());
data.extend_from_slice(&key[SHARED_KEY..]);
data.extend_from_slice(value);
if let Ok(old) = old {
let mut iter = ExtendedValueIter::new(old);
while let Some((k, v)) = iter.next() {

Check failure on line 50 in turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage/extended_key.rs

View workflow job for this annotation

GitHub Actions / rust check / build

this loop could be written as a `for` loop
if k != &key[SHARED_KEY..] {
data.extend_from_slice(&(k.len() as u32).to_be_bytes());
data.extend_from_slice(&(v.len() as u32).to_be_bytes());
data.extend_from_slice(k);
data.extend_from_slice(v);
}
}
};

tx.put(database, &hashed_key, &data, flags)?;
Ok(())
} else {
tx.put(database, &key, &value, flags)
}
}

fn hashed_key(key: &[u8]) -> [u8; MAX_KEY_SIZE] {
let mut result = [0; MAX_KEY_SIZE];
let mut hash = FxHasher::default();
key.hash(&mut hash);
byteorder::BigEndian::write_u64(&mut result, hash.finish());
result[8..].copy_from_slice(&key[0..SHARED_KEY]);
result
}

struct ExtendedValueIter<'a> {
data: &'a [u8],
pos: usize,
}

impl<'a> Iterator for ExtendedValueIter<'a> {
type Item = (&'a [u8], &'a [u8]);

fn next(&mut self) -> Option<Self::Item> {
if self.pos >= self.data.len() {
return None;
}
let key_len = byteorder::BigEndian::read_u32(&self.data[self.pos..]) as usize;
self.pos += 4;
let value_len = byteorder::BigEndian::read_u32(&self.data[self.pos..]) as usize;
self.pos += 4;
let key = &self.data[self.pos..self.pos + key_len];
self.pos += key_len;
let value = &self.data[self.pos..self.pos + value_len];
self.pos += value_len;
Some((key, value))
}
}

impl<'a> ExtendedValueIter<'a> {
fn new(data: &'a [u8]) -> Self {
Self { data, pos: 0 }
}
}

0 comments on commit 7861001

Please sign in to comment.