-
Notifications
You must be signed in to change notification settings - Fork 210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add bitpack encoding for LanceV2 #2333
Conversation
ACTION NEEDED Lance follows the Conventional Commits specification for release automation. The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification. For details on the error please inspect the "PR Title Check" action. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 Nice work so far. This looks like the correct general approach to me. Still some details to work out but nothing looks out of place.
_all_null: &mut bool, | ||
) { | ||
// TODO -- not sure if this is correct | ||
buffers[0].0 = self.uncompressed_bits_per_value / 8 * num_rows as u64; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works as long as uncompressed_bits_per_value
is a multiple of 8 and, for now, it should always be so. If we have to start handling cases where it isn't we will need to update this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a debug assert for now
} | ||
|
||
fn decode_into(&self, rows_to_skip: u32, num_rows: u32, dest_buffers: &mut [BytesMut]) { | ||
let mut bytes_to_skip = rows_to_skip as u64 * self.bits_per_value / 8; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rows_to_skip * self.bits_per_value
isn't always going to be a multiple of 8. What happens when it isn't?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this logic wasn't correct. Reworked the decode_into method
// pre-add enough capacity to the buffer to hold all the values we're about to put in it | ||
let capacity_to_add = dst.capacity() as i64 - dst.len() as i64 + num_rows as i64; | ||
if capacity_to_add > 0 { | ||
let bytes_to_add = | ||
capacity_to_add as usize * self.uncompressed_bits_per_value as usize / 8; | ||
dst.extend((0..bytes_to_add).into_iter().map(|_| 0)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You shouldn't need to do this. As long as update_capacity
is returning a valid value then you should be able to safely assume the capacity is already there.
That being said, it doesn't really hurt to have this code. Maybe simpler to just put a debug_assert
checking that there is enough capacity.
let mut mask = 0u64; | ||
for _ in 0..self.bits_per_value { | ||
mask = mask << 1 | 1; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this means you have a limit of 64 bits per value. This is probably fine but you should add a debug_assert
somewhere verifying this.
fn num_buffers(&self) -> u32 { | ||
// TODO ask weston what this is about | ||
1 | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 is correct. There are some cases (e.g. dictionary encoding) where we encode 1 input buffer into 2 output buffers.
|
||
// additional metadata that should be present if bitpacking is used | ||
optional BitpackMeta bitpack_meta = 4; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nit: I think of bitpacking less as an extension of Flat
and more as it's own encoding that has another array encoding inside of it (like fixed_size_list
). I don't know of any concrete reason that's better but I like thinking of these as small composable pieces rather than one piece with lots of options.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good call, made this change
let mut dest = vec![BytesMut::new()]; | ||
unit.decode_into(0, 7, &mut dest); | ||
|
||
println!("{:?}", dest); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: convert to an assert when ready to move out of draft.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I deleted this.. we have other tests covering this code path
let mut packed_arrays = vec![]; | ||
for arr in arrays { | ||
let packed = pack_array(arr.clone(), num_bits)?; | ||
packed_arrays.push(packed.into()); | ||
} | ||
|
||
let data_type = arrays[0].data_type(); | ||
let bits_per_value = 8 * data_type.byte_width() as u64; | ||
|
||
Ok(EncodedBuffer { | ||
bits_per_value: num_bits, | ||
parts: packed_arrays, | ||
bitpack_meta: Some(pb::BitpackMeta { | ||
uncompressed_bits_per_value: bits_per_value, | ||
}), | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to conditionally bitpack based on the whether num_bits
is less than "native num bits" if that makes sense? E.g. if a number is using the full range then don't bitpack?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure -- made this change
T: ArrowPrimitiveType, | ||
T::Native: PrimInt + AsPrimitive<u64>, | ||
{ | ||
let max = arrow::compute::bit_or(arr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well this is convenient :)
let buffers = data.buffers(); | ||
let mut packed_buffers = vec![]; | ||
for buffer in buffers { | ||
let packed_buffer = pack_bits(&buffer, num_bits, byte_len); | ||
packed_buffers.push(packed_buffer); | ||
} | ||
packed_buffers.concat() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only want to pack the values buffer, I think this will also try and pack the validity buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we're actually OK here. This gets passed the result of array.to_data()
here:
lance/rust/lance-encoding/src/encodings/physical/bitpack.rs
Lines 165 to 168 in d18b7df
match arr.data_type() { | |
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => Ok( | |
pack_buffers(arr.to_data(), num_bits, arr.data_type().byte_width()), | |
), |
And the validity buffer doesn't get included in that result. For example:
let arr = UInt16Array::from(vec![Some(1), None, Some(2)]);
let data = arr.to_data();
let buffers = data.buffers();
for buffer in buffers {
println!("{:?}", buffer);
}
prints:
Buffer { data: Bytes { ptr: 0x124704e80, len: 6, data: [1, 0, 0, 0, 2, 0] }, ptr: 0x124704e80, length: 6 }
d18b7df
to
47afcba
Compare
056e140
to
e07e788
Compare
00b3b96
to
849209f
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2333 +/- ##
==========================================
- Coverage 79.35% 79.14% -0.22%
==========================================
Files 213 218 +5
Lines 62521 63615 +1094
Branches 62521 63615 +1094
==========================================
+ Hits 49614 50345 +731
- Misses 9996 10323 +327
- Partials 2911 2947 +36
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
b57a121
to
c5786da
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few thoughts, only partyway through.
// Items are bitpacked in a buffer | ||
message Bitpacked { | ||
// the number of bits used for a value in the buffer | ||
uint64 compressed_bits_per_value = 1; | ||
|
||
// the number of bits of the uncompressed value. e.g. for a u32, this will be 32 | ||
uint64 uncompressed_bits_per_value = 2; | ||
|
||
// The items in the list | ||
Buffer buffer = 3; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No change required, but, as an interesting aside, this forces Bitpacked
to be a "terminal" encoding. I honestly don't know of any cases where it wouldn't be terminal but in the BtrBlocks "pure tree of encodings" style it would be:
message Bitpacked {
// The number of bits of the uncompressed value. e.g. for a u32, this will be 32
uint64 uncompressed_bits_per_value = 1;
// The compressed bytes
ArrayEncoding compressed_bytes = 2;
}
E.g. this would open up the door to weird things like using dictionary encoding to encode the compressed byte buffer (which would be a bad idea since you should really apply dictionary encoding higher up but I use it as an example none the less).
Even if the difference was more than a philosophical one, it isn't a trivial change to make because then you would need to make sure all the ArrayEncoding
encoders actually support a "bits_per_value" that isn't divisible by 8 which would be a headache.
pub struct EncodedBufferMeta { | ||
pub bits_per_value: u64, | ||
|
||
pub bitpacked_bits_per_value: Option<u64>, | ||
|
||
pub compression_scheme: Option<CompressionScheme>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change EncodedBuffer
to contain these fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't sure about that .. there are some places where we construct an EncodedBuffer
that it's not clear what we'd set for these fields.
For example here in EncodedArray.into_parts
:
lance/rust/lance-encoding/src/encoder.rs
Line 91 in 314d636
.map(|b| EncodedBuffer { parts: b.parts }) |
And here in ZoneMapsFieldEncoder.maps_to_metadata
:
lance/rust/lance-encoding-datafusion/src/zone.rs
Lines 500 to 501 in 314d636
Ok(EncodedBuffer { | |
parts: vec![Buffer::from(zone_maps_buffer)], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't they both be None
?
ValueEncoder::try_new(Arc::new(CoreBufferEncodingStrategy { | ||
compression_scheme: get_compression_scheme(), | ||
}))?, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could the buffer encoding strategy be a property of CoreArrayEncodingStrategy
instead of ValueEncoder
? In other words, could this be something like...
_ => {
let buffer_encoder = self.buffer_encoding_strategy.create_buffer_encoder(...);
Ok(Box::new(BasicEncoder::new(Box::new(ValueEncoder::try_new(buffer_encoder)?))))
}
It would require changing array_encoder_from_type
(taking in a data type) to create_array_encoder
(taking in a slice of array ref) but that change is fine and I think I end up making that change anyways in some pending PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that would be straight forward.
We also instantiate ValueEncoder here for the ListOffsetEncoder
:
lance/rust/lance-encoding/src/encodings/logical/list.rs
Lines 831 to 844 in da28952
impl ListOffsetsEncoder { | |
fn new(cache_bytes: u64, keep_original_array: bool, column_index: u32) -> Self { | |
Self { | |
accumulation_queue: AccumulationQueue::new( | |
cache_bytes, | |
column_index, | |
keep_original_array, | |
), | |
inner_encoder: Arc::new(BasicEncoder::new(Box::new( | |
ValueEncoder::try_new(Arc::new(CoreBufferEncodingStrategy { | |
compression_scheme: CompressionScheme::None, | |
})) | |
.unwrap(), | |
))), |
Do you think it makes sense to pass the BufferEncodingStrategy through to this constructor from the constructor of ListFieldEncoder
here? (e.g., make the BufferEncodingStrategy a property of ListOffsetEncoder
)
lance/rust/lance-encoding/src/encodings/logical/list.rs
Lines 1092 to 1098 in da28952
impl ListFieldEncoder { | |
pub fn new( | |
items_encoder: Box<dyn FieldEncoder>, | |
cache_bytes_per_columns: u64, | |
keep_original_array: bool, | |
column_index: u32, | |
) -> Self { |
Then we could create the inner ValueEncoder
inline when in make_encode_task
?
lance/rust/lance-encoding/src/encodings/logical/list.rs
Lines 876 to 877 in da28952
fn make_encode_task(&self, arrays: Vec<ArrayRef>) -> EncodeTask { | |
let inner_encoder = self.inner_encoder.clone(); |
pub struct EncodedBufferMeta { | ||
pub bits_per_value: u64, | ||
|
||
pub bitpacked_bits_per_value: Option<u64>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels like an odd property to be at this level. Should this be pb::ArrayEncoding
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This struct is the return value from BufferEncoder.encode
, and then we use it to construct the pb::array_encoding::ArrayEncoding
in the ValueEncoder
. Is that OK?
lance/rust/lance-encoding/src/encodings/physical/value.rs
Lines 260 to 283 in da28952
let array_encoding = | |
if let Some(bitpacked_bits_per_value) = encoded_buffer_meta.bitpacked_bits_per_value { | |
pb::array_encoding::ArrayEncoding::Bitpacked(pb::Bitpacked { | |
compressed_bits_per_value: bitpacked_bits_per_value, | |
uncompressed_bits_per_value: encoded_buffer_meta.bits_per_value, | |
buffer: Some(pb::Buffer { | |
buffer_index: index, | |
buffer_type: pb::buffer::BufferType::Page as i32, | |
}), | |
}) | |
} else { | |
pb::array_encoding::ArrayEncoding::Flat(pb::Flat { | |
bits_per_value: encoded_buffer_meta.bits_per_value, | |
buffer: Some(pb::Buffer { | |
buffer_index: index, | |
buffer_type: pb::buffer::BufferType::Page as i32, | |
}), | |
compression: encoded_buffer_meta | |
.compression_scheme | |
.map(|compression_scheme| pb::Compression { | |
scheme: compression_scheme.to_string(), | |
}), | |
}) | |
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, let's proceed for now. I this I'll be doing a refactor at some point to introduce the concept of "data layouts" and I can take another look at this then. Let's leave it as is for now.
fn count_items_to_pack(arrays: &[ArrayRef]) -> usize { | ||
let mut count = 0; | ||
for arr in arrays { | ||
count += arr.len(); | ||
} | ||
|
||
count | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nit: arrays.iter().map(|arr| arr.len()).sum::<usize>()
might be more compact (I doubt it'd make a perf diff).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, made this change
f434e73
to
f59c037
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor suggestions but I think we're ready to include this.
I have one concern it would be nice to fix. The problem with environment variables and unit tests is that the unit tests often run in parallel and the environment variables are process-wide. So, if I run all of the tests in lance-encoding
on my system, I regularly see the test_utf8
test fail. This can cause some noise in CI.
We could try and find a better approach than env variables but we could also just investigate why test_utf8
fails if LANCE_USE_BITPACKING
is true. As long as all unit tests pass with LANCE_USE_BITPACKING=true
I don't mind too much if some tests are running with bit packing sometimes and not other times. I'll try and investigate soon too.
pub struct EncodedBufferMeta { | ||
pub bits_per_value: u64, | ||
|
||
pub bitpacked_bits_per_value: Option<u64>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, let's proceed for now. I this I'll be doing a refactor at some point to introduce the concept of "data layouts" and I can take another look at this then. Let's leave it as is for now.
pub struct EncodedBufferMeta { | ||
pub bits_per_value: u64, | ||
|
||
pub bitpacked_bits_per_value: Option<u64>, | ||
|
||
pub compression_scheme: Option<CompressionScheme>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't they both be None
?
DataType::UInt16 => Some(num_bits_for_type::<UInt16Type>(arr.as_primitive())), | ||
DataType::UInt32 => Some(num_bits_for_type::<UInt32Type>(arr.as_primitive())), | ||
DataType::UInt64 => Some(num_bits_for_type::<UInt64Type>(arr.as_primitive())), | ||
// TODO -- eventually we could support signed types as well |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can do in a follow-up but we also want the various temporal types to be encoded with bit packing too.
Work in progress
TODO