Skip to content

Commit

Permalink
Correctly flush&stop image data on IDAT end
Browse files Browse the repository at this point in the history
All data in an image must appear in consecutive IDAT/fdAT chunks that
are not interrupted by any other data chunk. In particular we should not
ignore intermediate chunks but report them as an error (see 5.6. Chunk
Ordering). This is also important to properly flush the zlib
decompression at the end of the subframe and retrieve remaining rows as
well as check its checksum even if ignored blocks follow.
  • Loading branch information
HeroicKatora committed Jun 13, 2020
1 parent 3888342 commit 5658bad
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 20 deletions.
25 changes: 20 additions & 5 deletions src/decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,21 +171,22 @@ impl<R: Read> ReadDecoder<R> {
while !self.at_eof {
let buf = self.reader.fill_buf()?;
if buf.is_empty() {
return Err(DecodingError::Format("unexpected EOF".into()));
return Err(DecodingError::Format("unexpected EOF after image".into()));
}
let (consumed, event) = self.decoder.update(buf, &mut vec![])?;
self.reader.consume(consumed);
match event {
Decoded::Nothing => (),
Decoded::ImageEnd => self.at_eof = true,
Decoded::ChunkComplete(_, _) => return Ok(()),
Decoded::ImageData => { /*ignore more data*/ }
// ignore more data
Decoded::ChunkComplete(_, _) | Decoded::ChunkBegin(_, _) | Decoded::ImageData => {}
Decoded::ImageDataFlushed => return Ok(()),
Decoded::PartialChunk(_) => {}
new => unreachable!("{:?}", new),
}
}

Err(DecodingError::Format("unexpected EOF".into()))
Err(DecodingError::Format("unexpected EOF after image".into()))
}

fn info(&self) -> Option<&Info> {
Expand Down Expand Up @@ -228,6 +229,7 @@ struct SubframeInfo {
width: u32,
rowlen: usize,
interlace: InterlaceIter,
consumed_and_flushed: bool,
}

#[derive(Clone)]
Expand Down Expand Up @@ -410,7 +412,9 @@ impl<R: Read> Reader<R> {
}
}
// Advance over the rest of data for this (sub-)frame.
self.decoder.finished_decoding()?;
if !self.subframe.consumed_and_flushed {
self.decoder.finished_decoding()?;
}
// Advance our state to expect the next frame.
self.finished_frame();
Ok(())
Expand Down Expand Up @@ -658,6 +662,12 @@ impl<R: Read> Reader<R> {
interlace: passdata,
}));
} else {
if self.subframe.consumed_and_flushed {
return Err(DecodingError::Format(
format!("not enough data for image").into(),
));
}

// Clear the current buffer before appending more data.
if self.scan_start > 0 {
self.current.drain(..self.scan_start).for_each(drop);
Expand All @@ -667,6 +677,9 @@ impl<R: Read> Reader<R> {
let val = self.decoder.decode_next(&mut self.current)?;
match val {
Some(Decoded::ImageData) => {}
Some(Decoded::ImageDataFlushed) => {
self.subframe.consumed_and_flushed = true;
}
None => {
if !self.current.is_empty() {
return Err(DecodingError::Format("file truncated".into()));
Expand All @@ -687,6 +700,7 @@ impl SubframeInfo {
width: 0,
rowlen: 0,
interlace: InterlaceIter::None(0..0),
consumed_and_flushed: false,
}
}

Expand All @@ -709,6 +723,7 @@ impl SubframeInfo {
width,
rowlen: info.raw_row_length_from_width(width),
interlace,
consumed_and_flushed: false,
}
}
}
Expand Down
24 changes: 24 additions & 0 deletions src/decoder/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ pub enum Decoded {
FrameControl(FrameControl),
/// Decoded raw image data.
ImageData,
/// The last of a consecutive chunk of IDAT was done.
/// This is distinct from ChunkComplete which only marks that some IDAT chunk was completed but
/// not that no additional IDAT chunk follows.
ImageDataFlushed,
PartialChunk(ChunkType),
ImageEnd,
}
Expand Down Expand Up @@ -143,6 +147,10 @@ pub struct StreamingDecoder {
}

struct ChunkState {
/// The type of the current chunk.
/// Relevant for `IDAT` and `fdAT` which aggregate consecutive chunks of their own type.
type_: ChunkType,

/// Partial crc until now.
crc: Crc32,

Expand Down Expand Up @@ -260,6 +268,20 @@ impl StreamingDecoder {
(val >> 8) as u8,
val as u8,
];
if type_str != self.current_chunk.type_
&& (self.current_chunk.type_ == IDAT
|| self.current_chunk.type_ == chunk::fdAT)
{
self.current_chunk.type_ = type_str;
self.inflater.finish_compressed_chunks(image_data)?;
self.inflater.reset();
return goto!(
0,
U32Byte3(Type(length), val & !0xff),
emit Decoded::ImageDataFlushed
);
}
self.current_chunk.type_ = type_str;
self.current_chunk.crc.reset();
self.current_chunk.crc.update(&type_str);
self.current_chunk.remaining = length;
Expand Down Expand Up @@ -360,6 +382,7 @@ impl StreamingDecoder {
crc,
remaining,
raw_bytes,
type_: _,
} = &mut self.current_chunk;
let buf_avail = raw_bytes.capacity() - raw_bytes.len();
let bytes_avail = min(buf.len(), buf_avail);
Expand Down Expand Up @@ -678,6 +701,7 @@ impl Default for StreamingDecoder {
impl Default for ChunkState {
fn default() -> Self {
ChunkState {
type_: [0; 4],
crc: Crc32::new(),
remaining: 0,
raw_bytes: Vec::with_capacity(CHUNCK_BUFFER_SIZE),
Expand Down
69 changes: 54 additions & 15 deletions src/decoder/zlib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,25 @@ pub(super) struct ZlibStream {
state: Box<DecompressorOxide>,
/// If there has been a call to decompress already.
started: bool,
/// A buffer of compressed data.
/// We use this for a progress guarantee. The data in the input stream is chunked as given by
/// the underlying stream buffer. We will not read any more data until the current buffer has
/// been fully consumed. The zlib decompression can not fully consume all the data when it is
/// in the middle of the stream, it will treat full symbols and maybe the last bytes need to be
/// treated in a special way. The exact reason isn't as important but the interface does not
/// promise us this. Now, the complication is that the _current_ chunking information of PNG
/// alone is not enough to determine this as indeed the compressed stream is the concatenation
/// of all consecutive `IDAT`/`fdAT` chunks. We would need to inspect the next chunk header.
///
/// Thus, there needs to be a buffer that allows fully clearing a chunk so that the next chunk
/// type can be inspected.
in_buffer: Vec<u8>,
/// The logical start of the `in_buffer`.
in_pos: usize,
/// Remaining buffered decoded bytes.
/// The decoder sometimes wants inspect some already finished bytes for further decoding. So we
/// keep a total of 32KB of decoded data available as long as more data may be appended.
buffer: Vec<u8>,
out_buffer: Vec<u8>,
/// The cursor position in the output stream as a buffer index.
out_pos: usize,
}
Expand All @@ -23,14 +38,17 @@ impl ZlibStream {
ZlibStream {
state: Box::default(),
started: false,
buffer: vec![0; 2 * CHUNCK_BUFFER_SIZE],
in_buffer: Vec::with_capacity(CHUNCK_BUFFER_SIZE),
in_pos: 0,
out_buffer: vec![0; 2 * CHUNCK_BUFFER_SIZE],
out_pos: 0,
}
}

pub(crate) fn reset(&mut self) {
self.started = false;
self.buffer.clear();
self.in_buffer.clear();
self.out_buffer.clear();
self.out_pos = 0;
*self.state = DecompressorOxide::default();
}
Expand All @@ -48,12 +66,31 @@ impl ZlibStream {

self.prepare_vec_for_appending();

let (status, in_consumed, out_consumed) = {
let mut cursor = io::Cursor::new(self.buffer.as_mut_slice());
let (status, mut in_consumed, out_consumed) = {
let mut cursor = io::Cursor::new(self.out_buffer.as_mut_slice());
cursor.set_position(self.out_pos as u64);
decompress(&mut self.state, data, &mut cursor, BASE_FLAGS)
let in_data = if self.in_buffer.is_empty() {
data
} else {
&self.in_buffer[self.in_pos..]
};
decompress(&mut self.state, in_data, &mut cursor, BASE_FLAGS)
};

if !self.in_buffer.is_empty() {
self.in_pos += in_consumed;
}

if self.in_buffer.len() == self.in_pos {
self.in_buffer.clear();
self.in_pos = 0;
}

if in_consumed == 0 {
self.in_buffer.extend_from_slice(data);
in_consumed = data.len();
}

self.started = true;
self.out_pos += out_consumed;
self.transfer_finished_data(image_data);
Expand All @@ -73,7 +110,6 @@ impl ZlibStream {
/// more data were passed to it.
pub(crate) fn finish_compressed_chunks(
&mut self,
tail: &[u8],
image_data: &mut Vec<u8>,
) -> Result<(), DecodingError> {
const BASE_FLAGS: u32 = inflate_flags::TINFL_FLAG_PARSE_ZLIB_HEADER
Expand All @@ -83,6 +119,9 @@ impl ZlibStream {
return Ok(());
}

let tail = self.in_buffer.split_off(0);
let tail = &tail[self.in_pos..];

let mut start = 0;
loop {
self.prepare_vec_for_appending();
Expand All @@ -91,7 +130,7 @@ impl ZlibStream {
// TODO: we may be able to avoid the indirection through the buffer here.
// First append all buffered data and then create a cursor on the image_data
// instead.
let mut cursor = io::Cursor::new(self.buffer.as_mut_slice());
let mut cursor = io::Cursor::new(self.out_buffer.as_mut_slice());
cursor.set_position(self.out_pos as u64);
decompress(&mut self.state, &tail[start..], &mut cursor, BASE_FLAGS)
};
Expand All @@ -101,8 +140,8 @@ impl ZlibStream {

match status {
TINFLStatus::Done => {
self.buffer.truncate(self.out_pos as usize);
image_data.append(&mut self.buffer);
self.out_buffer.truncate(self.out_pos as usize);
image_data.append(&mut self.out_buffer);
return Ok(());
}
TINFLStatus::HasMoreOutput => {
Expand All @@ -121,13 +160,13 @@ impl ZlibStream {

/// Resize the vector to allow allocation of more data.
fn prepare_vec_for_appending(&mut self) {
if self.buffer.len().saturating_sub(self.out_pos) >= CHUNCK_BUFFER_SIZE {
if self.out_buffer.len().saturating_sub(self.out_pos) >= CHUNCK_BUFFER_SIZE {
return;
}

let buffered_len = self.decoding_size(self.buffer.len());
debug_assert!(self.buffer.len() <= buffered_len);
self.buffer.resize(buffered_len, 0u8);
let buffered_len = self.decoding_size(self.out_buffer.len());
debug_assert!(self.out_buffer.len() <= buffered_len);
self.out_buffer.resize(buffered_len, 0u8);
}

fn decoding_size(&self, len: usize) -> usize {
Expand All @@ -147,7 +186,7 @@ impl ZlibStream {
fn transfer_finished_data(&mut self, image_data: &mut Vec<u8>) -> usize {
let safe = self.out_pos.saturating_sub(CHUNCK_BUFFER_SIZE);
// TODO: allocation limits.
image_data.extend(self.buffer.drain(..safe));
image_data.extend(self.out_buffer.drain(..safe));
self.out_pos -= safe;
safe
}
Expand Down

0 comments on commit 5658bad

Please sign in to comment.