From 4f10cf02a1121042be8c3f768f870e561c596307 Mon Sep 17 00:00:00 2001 From: Noa Date: Mon, 21 Apr 2025 12:15:01 -0500 Subject: [PATCH 1/2] Split out common compression routines into separate file --- .cspell.dict/rust-more.txt | 1 + stdlib/src/{ => compression}/bz2.rs | 8 +- stdlib/src/compression/generic.rs | 267 +++++++++++++++++++++++++ stdlib/src/compression/mod.rs | 5 + stdlib/src/{ => compression}/zlib.rs | 281 +-------------------------- stdlib/src/lib.rs | 7 +- 6 files changed, 285 insertions(+), 284 deletions(-) rename stdlib/src/{ => compression}/bz2.rs (98%) create mode 100644 stdlib/src/compression/generic.rs create mode 100644 stdlib/src/compression/mod.rs rename stdlib/src/{ => compression}/zlib.rs (71%) diff --git a/.cspell.dict/rust-more.txt b/.cspell.dict/rust-more.txt index e92f1ff8c1..ba58289493 100644 --- a/.cspell.dict/rust-more.txt +++ b/.cspell.dict/rust-more.txt @@ -10,6 +10,7 @@ byteorder byteset caseless chrono +Chunker consts cranelift cstring diff --git a/stdlib/src/bz2.rs b/stdlib/src/compression/bz2.rs similarity index 98% rename from stdlib/src/bz2.rs rename to stdlib/src/compression/bz2.rs index 6339a44a24..27cfcb14ae 100644 --- a/stdlib/src/bz2.rs +++ b/stdlib/src/compression/bz2.rs @@ -4,6 +4,9 @@ pub(crate) use _bz2::make_module; #[pymodule] mod _bz2 { + use super::super::{ + DecompressArgs, DecompressError, DecompressState, DecompressStatus, Decompressor, + }; use crate::common::lock::PyMutex; use crate::vm::{ VirtualMachine, @@ -12,9 +15,6 @@ mod _bz2 { object::{PyPayload, PyResult}, types::Constructor, }; - use crate::zlib::{ - DecompressArgs, DecompressError, DecompressState, DecompressStatus, Decompressor, - }; use bzip2::{Decompress, Status, write::BzEncoder}; use rustpython_vm::convert::ToPyException; use std::{fmt, io::Write}; @@ -74,7 +74,7 @@ mod _bz2 { impl BZ2Decompressor { #[pymethod] fn decompress(&self, args: DecompressArgs, vm: &VirtualMachine) -> PyResult> { - let max_length = args.max_length(); + let max_length = args.max_length_negative_is_none(); let data = &*args.data(); let mut state = self.state.lock(); diff --git a/stdlib/src/compression/generic.rs b/stdlib/src/compression/generic.rs new file mode 100644 index 0000000000..0c09874e9c --- /dev/null +++ b/stdlib/src/compression/generic.rs @@ -0,0 +1,267 @@ +use crate::vm::{ + VirtualMachine, + builtins::{PyBaseExceptionRef, PyBytesRef}, + convert::ToPyException, + function::{ArgBytesLike, ArgSize, OptionalArg}, +}; + +#[derive(FromArgs)] +pub(super) struct DecompressArgs { + #[pyarg(positional)] + data: ArgBytesLike, + #[pyarg(any, optional)] + pub max_length: OptionalArg, +} + +impl DecompressArgs { + pub fn data(&self) -> crate::common::borrow::BorrowedValue<'_, [u8]> { + self.data.borrow_buf() + } + pub fn max_length_negative_is_none(&self) -> Option { + self.max_length + .into_option() + .and_then(|ArgSize { value }| usize::try_from(value).ok()) + } +} + +pub(super) trait Decompressor { + type Flush: FlushKind; + type Status: DecompressStatus; + type Error; + + fn total_in(&self) -> u64; + fn decompress_vec( + &mut self, + input: &[u8], + output: &mut Vec, + flush: Self::Flush, + ) -> Result; + fn maybe_set_dict(&mut self, err: Self::Error) -> Result<(), Self::Error> { + Err(err) + } +} + +pub(super) trait DecompressStatus { + fn is_stream_end(&self) -> bool; +} + +pub(super) trait FlushKind: Copy { + const SYNC: Self; +} + +impl FlushKind for () { + const SYNC: Self = (); +} + +pub(super) fn flush_sync(_final_chunk: bool) -> T { + T::SYNC +} + +pub(super) const CHUNKSIZE: usize = u32::MAX as usize; + +#[derive(Clone)] +pub(super) struct Chunker<'a> { + data1: &'a [u8], + data2: &'a [u8], +} +impl<'a> Chunker<'a> { + pub fn new(data: &'a [u8]) -> Self { + Self { + data1: data, + data2: &[], + } + } + pub fn chain(data1: &'a [u8], data2: &'a [u8]) -> Self { + if data1.is_empty() { + Self { + data1: data2, + data2: &[], + } + } else { + Self { data1, data2 } + } + } + pub fn len(&self) -> usize { + self.data1.len() + self.data2.len() + } + pub fn is_empty(&self) -> bool { + self.data1.is_empty() + } + pub fn to_vec(&self) -> Vec { + [self.data1, self.data2].concat() + } + pub fn chunk(&self) -> &'a [u8] { + self.data1.get(..CHUNKSIZE).unwrap_or(self.data1) + } + pub fn advance(&mut self, consumed: usize) { + self.data1 = &self.data1[consumed..]; + if self.data1.is_empty() { + self.data1 = std::mem::take(&mut self.data2); + } + } +} + +pub(super) fn _decompress( + data: &[u8], + d: &mut D, + bufsize: usize, + max_length: Option, + calc_flush: impl Fn(bool) -> D::Flush, +) -> Result<(Vec, bool), D::Error> { + let mut data = Chunker::new(data); + _decompress_chunks(&mut data, d, bufsize, max_length, calc_flush) +} + +pub(super) fn _decompress_chunks( + data: &mut Chunker<'_>, + d: &mut D, + bufsize: usize, + max_length: Option, + calc_flush: impl Fn(bool) -> D::Flush, +) -> Result<(Vec, bool), D::Error> { + if data.is_empty() { + return Ok((Vec::new(), true)); + } + let max_length = max_length.unwrap_or(usize::MAX); + let mut buf = Vec::new(); + + 'outer: loop { + let chunk = data.chunk(); + let flush = calc_flush(chunk.len() == data.len()); + loop { + let additional = std::cmp::min(bufsize, max_length - buf.capacity()); + if additional == 0 { + return Ok((buf, false)); + } + buf.reserve_exact(additional); + + let prev_in = d.total_in(); + let res = d.decompress_vec(chunk, &mut buf, flush); + let consumed = d.total_in() - prev_in; + + data.advance(consumed as usize); + + match res { + Ok(status) => { + let stream_end = status.is_stream_end(); + if stream_end || data.is_empty() { + // we've reached the end of the stream, we're done + buf.shrink_to_fit(); + return Ok((buf, stream_end)); + } else if !chunk.is_empty() && consumed == 0 { + // we're gonna need a bigger buffer + continue; + } else { + // next chunk + continue 'outer; + } + } + Err(e) => { + d.maybe_set_dict(e)?; + // now try the next chunk + continue 'outer; + } + }; + } + } +} + +#[derive(Debug)] +pub(super) struct DecompressState { + decompress: D, + unused_data: PyBytesRef, + input_buffer: Vec, + eof: bool, + needs_input: bool, +} + +impl DecompressState { + pub fn new(decompress: D, vm: &VirtualMachine) -> Self { + Self { + decompress, + unused_data: vm.ctx.empty_bytes.clone(), + input_buffer: Vec::new(), + eof: false, + needs_input: true, + } + } + + pub fn eof(&self) -> bool { + self.eof + } + + pub fn unused_data(&self) -> PyBytesRef { + self.unused_data.clone() + } + + pub fn needs_input(&self) -> bool { + self.needs_input + } + + pub fn decompress( + &mut self, + data: &[u8], + max_length: Option, + bufsize: usize, + vm: &VirtualMachine, + ) -> Result, DecompressError> { + if self.eof { + return Err(DecompressError::Eof(EofError)); + } + + let input_buffer = &mut self.input_buffer; + let d = &mut self.decompress; + + let mut chunks = Chunker::chain(input_buffer, data); + + let prev_len = chunks.len(); + let (ret, stream_end) = + match _decompress_chunks(&mut chunks, d, bufsize, max_length, flush_sync) { + Ok((buf, stream_end)) => (Ok(buf), stream_end), + Err(err) => (Err(err), false), + }; + let consumed = prev_len - chunks.len(); + + self.eof |= stream_end; + + if self.eof { + self.needs_input = false; + if !chunks.is_empty() { + self.unused_data = vm.ctx.new_bytes(chunks.to_vec()); + } + } else if chunks.is_empty() { + input_buffer.clear(); + self.needs_input = true; + } else { + self.needs_input = false; + if let Some(n_consumed_from_data) = consumed.checked_sub(input_buffer.len()) { + input_buffer.clear(); + input_buffer.extend_from_slice(&data[n_consumed_from_data..]); + } else { + input_buffer.drain(..consumed); + input_buffer.extend_from_slice(data); + } + } + + ret.map_err(DecompressError::Decompress) + } +} + +pub(super) enum DecompressError { + Decompress(E), + Eof(EofError), +} + +impl From for DecompressError { + fn from(err: E) -> Self { + Self::Decompress(err) + } +} + +pub(super) struct EofError; + +impl ToPyException for EofError { + fn to_pyexception(&self, vm: &VirtualMachine) -> PyBaseExceptionRef { + vm.new_eof_error("End of stream already reached".to_owned()) + } +} diff --git a/stdlib/src/compression/mod.rs b/stdlib/src/compression/mod.rs new file mode 100644 index 0000000000..e4ced7c00c --- /dev/null +++ b/stdlib/src/compression/mod.rs @@ -0,0 +1,5 @@ +mod generic; +use generic::*; + +pub mod bz2; +pub mod zlib; diff --git a/stdlib/src/zlib.rs b/stdlib/src/compression/zlib.rs similarity index 71% rename from stdlib/src/zlib.rs rename to stdlib/src/compression/zlib.rs index 0578f20c86..892857a35a 100644 --- a/stdlib/src/zlib.rs +++ b/stdlib/src/compression/zlib.rs @@ -1,11 +1,12 @@ // spell-checker:ignore compressobj decompressobj zdict chunksize zlibmodule miniz chunker -pub(crate) use zlib::{DecompressArgs, make_module}; +pub(crate) use zlib::make_module; #[pymodule] mod zlib { - use super::generic::{ - DecompressError, DecompressState, DecompressStatus, Decompressor, FlushKind, flush_sync, + use super::super::{ + _decompress, CHUNKSIZE, DecompressArgs, DecompressError, DecompressState, DecompressStatus, + Decompressor, FlushKind, flush_sync, }; use crate::vm::{ PyObject, PyPayload, PyResult, VirtualMachine, @@ -144,113 +145,6 @@ mod zlib { } } - #[derive(Clone)] - pub(crate) struct Chunker<'a> { - data1: &'a [u8], - data2: &'a [u8], - } - impl<'a> Chunker<'a> { - pub(crate) fn new(data: &'a [u8]) -> Self { - Self { - data1: data, - data2: &[], - } - } - pub(crate) fn chain(data1: &'a [u8], data2: &'a [u8]) -> Self { - if data1.is_empty() { - Self { - data1: data2, - data2: &[], - } - } else { - Self { data1, data2 } - } - } - pub(crate) fn len(&self) -> usize { - self.data1.len() + self.data2.len() - } - pub(crate) fn is_empty(&self) -> bool { - self.data1.is_empty() - } - pub(crate) fn to_vec(&self) -> Vec { - [self.data1, self.data2].concat() - } - pub(crate) fn chunk(&self) -> &'a [u8] { - self.data1.get(..CHUNKSIZE).unwrap_or(self.data1) - } - pub(crate) fn advance(&mut self, consumed: usize) { - self.data1 = &self.data1[consumed..]; - if self.data1.is_empty() { - self.data1 = std::mem::take(&mut self.data2); - } - } - } - - fn _decompress( - data: &[u8], - d: &mut D, - bufsize: usize, - max_length: Option, - calc_flush: impl Fn(bool) -> D::Flush, - ) -> Result<(Vec, bool), D::Error> { - let mut data = Chunker::new(data); - _decompress_chunks(&mut data, d, bufsize, max_length, calc_flush) - } - - pub(super) fn _decompress_chunks( - data: &mut Chunker<'_>, - d: &mut D, - bufsize: usize, - max_length: Option, - calc_flush: impl Fn(bool) -> D::Flush, - ) -> Result<(Vec, bool), D::Error> { - if data.is_empty() { - return Ok((Vec::new(), true)); - } - let max_length = max_length.unwrap_or(usize::MAX); - let mut buf = Vec::new(); - - 'outer: loop { - let chunk = data.chunk(); - let flush = calc_flush(chunk.len() == data.len()); - loop { - let additional = std::cmp::min(bufsize, max_length - buf.capacity()); - if additional == 0 { - return Ok((buf, false)); - } - buf.reserve_exact(additional); - - let prev_in = d.total_in(); - let res = d.decompress_vec(chunk, &mut buf, flush); - let consumed = d.total_in() - prev_in; - - data.advance(consumed as usize); - - match res { - Ok(status) => { - let stream_end = status.is_stream_end(); - if stream_end || data.is_empty() { - // we've reached the end of the stream, we're done - buf.shrink_to_fit(); - return Ok((buf, stream_end)); - } else if !chunk.is_empty() && consumed == 0 { - // we're gonna need a bigger buffer - continue; - } else { - // next chunk - continue 'outer; - } - } - Err(e) => { - d.maybe_set_dict(e)?; - // now try the next chunk - continue 'outer; - } - }; - } - } - } - #[derive(FromArgs)] struct PyFuncDecompressArgs { #[pyarg(positional)] @@ -434,25 +328,6 @@ mod zlib { } } - #[derive(FromArgs)] - pub(crate) struct DecompressArgs { - #[pyarg(positional)] - data: ArgBytesLike, - #[pyarg(any, optional)] - max_length: OptionalArg, - } - - impl DecompressArgs { - pub(crate) fn data(&self) -> crate::common::borrow::BorrowedValue<'_, [u8]> { - self.data.borrow_buf() - } - pub(crate) fn max_length(&self) -> Option { - self.max_length - .into_option() - .and_then(|ArgSize { value }| usize::try_from(value).ok()) - } - } - #[derive(FromArgs)] #[allow(dead_code)] // FIXME: use args struct CompressobjArgs { @@ -533,8 +408,6 @@ mod zlib { // } } - const CHUNKSIZE: usize = u32::MAX as usize; - impl CompressInner { fn new(compress: Compress) -> Self { Self { @@ -734,7 +607,7 @@ mod zlib { #[pymethod] fn decompress(&self, args: DecompressArgs, vm: &VirtualMachine) -> PyResult> { - let max_length = args.max_length(); + let max_length = args.max_length_negative_is_none(); let data = &*args.data(); let inner = &mut *self.inner.lock(); @@ -754,147 +627,3 @@ mod zlib { // } } } - -mod generic { - use super::zlib::{_decompress_chunks, Chunker}; - use crate::vm::{ - VirtualMachine, - builtins::{PyBaseExceptionRef, PyBytesRef}, - convert::ToPyException, - }; - - pub(crate) trait Decompressor { - type Flush: FlushKind; - type Status: DecompressStatus; - type Error; - - fn total_in(&self) -> u64; - fn decompress_vec( - &mut self, - input: &[u8], - output: &mut Vec, - flush: Self::Flush, - ) -> Result; - fn maybe_set_dict(&mut self, err: Self::Error) -> Result<(), Self::Error> { - Err(err) - } - } - - pub(crate) trait DecompressStatus { - fn is_stream_end(&self) -> bool; - } - - pub(crate) trait FlushKind: Copy { - const SYNC: Self; - } - - impl FlushKind for () { - const SYNC: Self = (); - } - - pub(super) fn flush_sync(_final_chunk: bool) -> T { - T::SYNC - } - - #[derive(Debug)] - pub(crate) struct DecompressState { - decompress: D, - unused_data: PyBytesRef, - input_buffer: Vec, - eof: bool, - needs_input: bool, - } - - impl DecompressState { - pub(crate) fn new(decompress: D, vm: &VirtualMachine) -> Self { - Self { - decompress, - unused_data: vm.ctx.empty_bytes.clone(), - input_buffer: Vec::new(), - eof: false, - needs_input: true, - } - } - - pub(crate) fn eof(&self) -> bool { - self.eof - } - - pub(crate) fn unused_data(&self) -> PyBytesRef { - self.unused_data.clone() - } - - pub(crate) fn needs_input(&self) -> bool { - self.needs_input - } - - pub(crate) fn decompress( - &mut self, - data: &[u8], - max_length: Option, - bufsize: usize, - vm: &VirtualMachine, - ) -> Result, DecompressError> { - if self.eof { - return Err(DecompressError::Eof(EofError)); - } - - let input_buffer = &mut self.input_buffer; - let d = &mut self.decompress; - - let mut chunks = Chunker::chain(input_buffer, data); - - let prev_len = chunks.len(); - let (ret, stream_end) = - match _decompress_chunks(&mut chunks, d, bufsize, max_length, flush_sync) { - Ok((buf, stream_end)) => (Ok(buf), stream_end), - Err(err) => (Err(err), false), - }; - let consumed = prev_len - chunks.len(); - - self.eof |= stream_end; - - if self.eof { - self.needs_input = false; - if !chunks.is_empty() { - self.unused_data = vm.ctx.new_bytes(chunks.to_vec()); - } - } else if chunks.is_empty() { - input_buffer.clear(); - self.needs_input = true; - } else { - self.needs_input = false; - if let Some(n_consumed_from_data) = consumed.checked_sub(input_buffer.len()) { - input_buffer.clear(); - input_buffer.extend_from_slice(&data[n_consumed_from_data..]); - } else { - input_buffer.drain(..consumed); - input_buffer.extend_from_slice(data); - } - } - - ret.map_err(DecompressError::Decompress) - } - } - - pub(crate) enum DecompressError { - Decompress(E), - Eof(EofError), - } - - impl From for DecompressError { - fn from(err: E) -> Self { - Self::Decompress(err) - } - } - - pub(crate) struct EofError; - - impl ToPyException for EofError { - fn to_pyexception(&self, vm: &VirtualMachine) -> PyBaseExceptionRef { - vm.new_eof_error("End of stream already reached".to_owned()) - } - } -} - -pub(crate) use generic::{DecompressError, DecompressState, DecompressStatus, Decompressor}; diff --git a/stdlib/src/lib.rs b/stdlib/src/lib.rs index 254aa39322..8db8839ac5 100644 --- a/stdlib/src/lib.rs +++ b/stdlib/src/lib.rs @@ -23,6 +23,7 @@ mod sha256; mod sha3; mod sha512; +mod compression; mod json; #[cfg(not(any(target_os = "ios", target_os = "android", target_arch = "wasm32")))] mod locale; @@ -36,13 +37,11 @@ mod statistics; mod suggestions; // TODO: maybe make this an extension module, if we ever get those // mod re; -mod bz2; #[cfg(not(target_arch = "wasm32"))] pub mod socket; #[cfg(all(unix, not(target_os = "redox")))] mod syslog; mod unicodedata; -mod zlib; mod faulthandler; #[cfg(any(unix, target_os = "wasi"))] @@ -111,7 +110,7 @@ pub fn get_module_inits() -> impl Iterator, StdlibInit "array" => array::make_module, "binascii" => binascii::make_module, "_bisect" => bisect::make_module, - "_bz2" => bz2::make_module, + "_bz2" => compression::bz2::make_module, "cmath" => cmath::make_module, "_contextvars" => contextvars::make_module, "_csv" => csv::make_module, @@ -132,7 +131,7 @@ pub fn get_module_inits() -> impl Iterator, StdlibInit "_statistics" => statistics::make_module, "_struct" => pystruct::make_module, "unicodedata" => unicodedata::make_module, - "zlib" => zlib::make_module, + "zlib" => compression::zlib::make_module, "_statistics" => statistics::make_module, "_suggestions" => suggestions::make_module, // crate::vm::sysmodule::sysconfigdata_name() => sysconfigdata::make_module, From e1646559697c2861b11456649cfbce39eb1ffa2d Mon Sep 17 00:00:00 2001 From: "Jeong, YunWon" <69878+youknowone@users.noreply.github.com> Date: Tue, 6 May 2025 14:36:50 +0900 Subject: [PATCH 2/2] Apply suggestions from code review --- .cspell.dict/rust-more.txt | 1 - stdlib/src/compression/generic.rs | 1 + stdlib/src/compression/zlib.rs | 2 +- 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.cspell.dict/rust-more.txt b/.cspell.dict/rust-more.txt index ba58289493..e92f1ff8c1 100644 --- a/.cspell.dict/rust-more.txt +++ b/.cspell.dict/rust-more.txt @@ -10,7 +10,6 @@ byteorder byteset caseless chrono -Chunker consts cranelift cstring diff --git a/stdlib/src/compression/generic.rs b/stdlib/src/compression/generic.rs index 0c09874e9c..64fc687723 100644 --- a/stdlib/src/compression/generic.rs +++ b/stdlib/src/compression/generic.rs @@ -1,3 +1,4 @@ +// cspell:ignore chunker use crate::vm::{ VirtualMachine, builtins::{PyBaseExceptionRef, PyBytesRef}, diff --git a/stdlib/src/compression/zlib.rs b/stdlib/src/compression/zlib.rs index 892857a35a..f032057026 100644 --- a/stdlib/src/compression/zlib.rs +++ b/stdlib/src/compression/zlib.rs @@ -1,4 +1,4 @@ -// spell-checker:ignore compressobj decompressobj zdict chunksize zlibmodule miniz chunker +// spell-checker:ignore compressobj decompressobj zdict chunksize zlibmodule miniz pub(crate) use zlib::make_module;