rustpython_vm/function/
buffer.rs1use crate::{
2 builtins::{PyStr, PyStrRef},
3 common::borrow::{BorrowedValue, BorrowedValueMut},
4 protocol::PyBuffer,
5 AsObject, PyObject, PyObjectRef, PyResult, TryFromBorrowedObject, TryFromObject,
6 VirtualMachine,
7};
8
9#[derive(Debug, Traverse)]
13pub struct ArgBytesLike(PyBuffer);
14
15impl PyObject {
16 pub fn try_bytes_like<R>(
17 &self,
18 vm: &VirtualMachine,
19 f: impl FnOnce(&[u8]) -> R,
20 ) -> PyResult<R> {
21 let buffer = PyBuffer::try_from_borrowed_object(vm, self)?;
22 buffer.as_contiguous().map(|x| f(&x)).ok_or_else(|| {
23 vm.new_type_error("non-contiguous buffer is not a bytes-like object".to_owned())
24 })
25 }
26
27 pub fn try_rw_bytes_like<R>(
28 &self,
29 vm: &VirtualMachine,
30 f: impl FnOnce(&mut [u8]) -> R,
31 ) -> PyResult<R> {
32 let buffer = PyBuffer::try_from_borrowed_object(vm, self)?;
33 buffer
34 .as_contiguous_mut()
35 .map(|mut x| f(&mut x))
36 .ok_or_else(|| {
37 vm.new_type_error("buffer is not a read-write bytes-like object".to_owned())
38 })
39 }
40}
41
42impl ArgBytesLike {
43 pub fn borrow_buf(&self) -> BorrowedValue<'_, [u8]> {
44 unsafe { self.0.contiguous_unchecked() }
45 }
46
47 pub fn with_ref<F, R>(&self, f: F) -> R
48 where
49 F: FnOnce(&[u8]) -> R,
50 {
51 f(&self.borrow_buf())
52 }
53
54 pub fn len(&self) -> usize {
55 self.0.desc.len
56 }
57
58 pub fn is_empty(&self) -> bool {
59 self.len() == 0
60 }
61
62 pub fn as_object(&self) -> &PyObject {
63 &self.0.obj
64 }
65}
66
67impl From<ArgBytesLike> for PyBuffer {
68 fn from(buffer: ArgBytesLike) -> Self {
69 buffer.0
70 }
71}
72
73impl<'a> TryFromBorrowedObject<'a> for ArgBytesLike {
74 fn try_from_borrowed_object(vm: &VirtualMachine, obj: &'a PyObject) -> PyResult<Self> {
75 let buffer = PyBuffer::try_from_borrowed_object(vm, obj)?;
76 if buffer.desc.is_contiguous() {
77 Ok(Self(buffer))
78 } else {
79 Err(vm.new_type_error("non-contiguous buffer is not a bytes-like object".to_owned()))
80 }
81 }
82}
83
84#[derive(Debug, Traverse)]
86pub struct ArgMemoryBuffer(PyBuffer);
87
88impl ArgMemoryBuffer {
89 pub fn borrow_buf_mut(&self) -> BorrowedValueMut<'_, [u8]> {
90 unsafe { self.0.contiguous_mut_unchecked() }
91 }
92
93 pub fn with_ref<F, R>(&self, f: F) -> R
94 where
95 F: FnOnce(&mut [u8]) -> R,
96 {
97 f(&mut self.borrow_buf_mut())
98 }
99
100 pub fn len(&self) -> usize {
101 self.0.desc.len
102 }
103
104 pub fn is_empty(&self) -> bool {
105 self.len() == 0
106 }
107}
108
109impl From<ArgMemoryBuffer> for PyBuffer {
110 fn from(buffer: ArgMemoryBuffer) -> Self {
111 buffer.0
112 }
113}
114
115impl<'a> TryFromBorrowedObject<'a> for ArgMemoryBuffer {
116 fn try_from_borrowed_object(vm: &VirtualMachine, obj: &'a PyObject) -> PyResult<Self> {
117 let buffer = PyBuffer::try_from_borrowed_object(vm, obj)?;
118 if !buffer.desc.is_contiguous() {
119 Err(vm.new_type_error("non-contiguous buffer is not a bytes-like object".to_owned()))
120 } else if buffer.desc.readonly {
121 Err(vm.new_type_error("buffer is not a read-write bytes-like object".to_owned()))
122 } else {
123 Ok(Self(buffer))
124 }
125 }
126}
127
128pub enum ArgStrOrBytesLike {
130 Buf(ArgBytesLike),
131 Str(PyStrRef),
132}
133
134impl ArgStrOrBytesLike {
135 pub fn as_object(&self) -> &PyObject {
136 match self {
137 Self::Buf(b) => b.as_object(),
138 Self::Str(s) => s.as_object(),
139 }
140 }
141}
142
143impl TryFromObject for ArgStrOrBytesLike {
144 fn try_from_object(vm: &VirtualMachine, obj: PyObjectRef) -> PyResult<Self> {
145 obj.downcast()
146 .map(Self::Str)
147 .or_else(|obj| ArgBytesLike::try_from_object(vm, obj).map(Self::Buf))
148 }
149}
150
151impl ArgStrOrBytesLike {
152 pub fn borrow_bytes(&self) -> BorrowedValue<'_, [u8]> {
153 match self {
154 Self::Buf(b) => b.borrow_buf(),
155 Self::Str(s) => s.as_str().as_bytes().into(),
156 }
157 }
158}
159
160#[derive(Debug)]
161pub enum ArgAsciiBuffer {
162 String(PyStrRef),
163 Buffer(ArgBytesLike),
164}
165
166impl TryFromObject for ArgAsciiBuffer {
167 fn try_from_object(vm: &VirtualMachine, obj: PyObjectRef) -> PyResult<Self> {
168 match obj.downcast::<PyStr>() {
169 Ok(string) => {
170 if string.as_str().is_ascii() {
171 Ok(ArgAsciiBuffer::String(string))
172 } else {
173 Err(vm.new_value_error(
174 "string argument should contain only ASCII characters".to_owned(),
175 ))
176 }
177 }
178 Err(obj) => ArgBytesLike::try_from_object(vm, obj).map(ArgAsciiBuffer::Buffer),
179 }
180 }
181}
182
183impl ArgAsciiBuffer {
184 pub fn len(&self) -> usize {
185 match self {
186 Self::String(s) => s.as_str().len(),
187 Self::Buffer(buffer) => buffer.len(),
188 }
189 }
190
191 pub fn is_empty(&self) -> bool {
192 self.len() == 0
193 }
194
195 #[inline]
196 pub fn with_ref<R>(&self, f: impl FnOnce(&[u8]) -> R) -> R {
197 match self {
198 Self::String(s) => f(s.as_str().as_bytes()),
199 Self::Buffer(buffer) => buffer.with_ref(f),
200 }
201 }
202}