forked from GoogleCloudPlatform/DataflowPythonSDK
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstream.pyx
More file actions
201 lines (163 loc) · 6.65 KB
/
stream.pyx
File metadata and controls
201 lines (163 loc) · 6.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cimport libc.stdlib
cimport libc.string
cdef class OutputStream(object):
"""An output string stream implementation supporting write() and get()."""
#TODO(robertwb): Consider using raw C++ streams.
def __cinit__(self):
self.size = 1024
self.pos = 0
self.data = <char*>libc.stdlib.malloc(self.size)
assert self.data, "OutputStream malloc failed."
def __dealloc__(self):
if self.data:
libc.stdlib.free(self.data)
cpdef write(self, bytes b, bint nested=False):
cdef size_t blen = len(b)
if nested:
self.write_var_int64(blen)
if self.size < self.pos + blen:
self.extend(blen)
libc.string.memcpy(self.data + self.pos, <char*>b, blen)
self.pos += blen
cpdef write_byte(self, unsigned char val):
if self.size < self.pos + 1:
self.extend(1)
self.data[self.pos] = val
self.pos += 1
cpdef write_var_int64(self, libc.stdint.int64_t signed_v):
"""Encode a long using variable-length encoding to a stream."""
cdef libc.stdint.uint64_t v = signed_v
cdef long bits
while True:
bits = v & 0x7F
v >>= 7
if v:
bits |= 0x80
self.write_byte(bits)
if not v:
break
cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v):
cdef libc.stdint.uint64_t v = signed_v
if self.size < self.pos + 8:
self.extend(8)
self.data[self.pos ] = <unsigned char>(v >> 56)
self.data[self.pos + 1] = <unsigned char>(v >> 48)
self.data[self.pos + 2] = <unsigned char>(v >> 40)
self.data[self.pos + 3] = <unsigned char>(v >> 32)
self.data[self.pos + 4] = <unsigned char>(v >> 24)
self.data[self.pos + 5] = <unsigned char>(v >> 16)
self.data[self.pos + 6] = <unsigned char>(v >> 8)
self.data[self.pos + 7] = <unsigned char>(v )
self.pos += 8
cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v):
cdef libc.stdint.uint32_t v = signed_v
if self.size < self.pos + 4:
self.extend(4)
self.data[self.pos ] = <unsigned char>(v >> 24)
self.data[self.pos + 1] = <unsigned char>(v >> 16)
self.data[self.pos + 2] = <unsigned char>(v >> 8)
self.data[self.pos + 3] = <unsigned char>(v )
self.pos += 4
cpdef write_bigendian_double(self, double d):
self.write_bigendian_int64((<libc.stdint.int64_t*><char*>&d)[0])
cpdef bytes get(self):
return self.data[:self.pos]
cdef extend(self, size_t missing):
while missing > self.size - self.pos:
self.size *= 2
self.data = <char*>libc.stdlib.realloc(self.data, self.size)
assert self.data, "OutputStream realloc failed."
cdef class ByteCountingOutputStream(OutputStream):
"""An output string stream implementation that only counts the bytes.
This implementation counts the number of bytes it "writes" but
doesn't actually write them anyway. Thus it has write() but not
get(). get_count() returns how many bytes were written.
This is useful for sizing an encoding.
"""
def __cinit__(self):
self.count = 0
cpdef write(self, bytes b, bint nested=False):
cdef size_t blen = len(b)
if nested:
self.write_var_int64(blen)
self.count += blen
cpdef write_byte(self, unsigned char _):
self.count += 1
cpdef write_bigendian_int64(self, libc.stdint.int64_t _):
self.count += 8
cpdef write_bigendian_int32(self, libc.stdint.int32_t _):
self.count += 4
cpdef size_t get_count(self):
return self.count
cpdef bytes get(self):
raise NotImplementedError
def __str__(self):
return '<%s %s>' % (self.__class__.__name__, self.count)
cdef class InputStream(object):
"""An input string stream implementation supporting read() and size()."""
def __init__(self, all):
self.allc = self.all = all
cpdef bytes read(self, size_t size):
self.pos += size
return self.allc[self.pos - size : self.pos]
cpdef long read_byte(self) except? -1:
self.pos += 1
# Note: the C++ compiler on Dataflow workers treats the char array below as
# a signed char. This causes incorrect coder behavior unless explicitly
# cast to an unsigned char here.
return <long>(<unsigned char> self.allc[self.pos - 1])
cpdef size_t size(self) except? -1:
return len(self.all) - self.pos
cpdef bytes read_all(self, bint nested=False):
return self.read(self.read_var_int64() if nested else self.size())
cpdef libc.stdint.int64_t read_var_int64(self) except? -1:
"""Decode a variable-length encoded long from a stream."""
cdef long byte
cdef long bits
cdef long shift = 0
cdef libc.stdint.int64_t result = 0
while True:
byte = self.read_byte()
if byte < 0:
raise RuntimeError('VarInt not terminated.')
bits = byte & 0x7F
if (shift >= sizeof(long) * 8 or
(shift >= (sizeof(long) * 8 - 1) and bits > 1)):
raise RuntimeError('VarLong too long.')
result |= bits << shift
shift += 7
if not (byte & 0x80):
break
return result
cpdef libc.stdint.int64_t read_bigendian_int64(self) except? -1:
self.pos += 8
return (<unsigned char>self.allc[self.pos - 1]
| <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 2] << 8
| <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 3] << 16
| <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 4] << 24
| <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 5] << 32
| <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 6] << 40
| <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 7] << 48
| <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 8] << 56)
cpdef libc.stdint.int32_t read_bigendian_int32(self) except? -1:
self.pos += 4
return (<unsigned char>self.allc[self.pos - 1]
| <libc.stdint.uint32_t><unsigned char>self.allc[self.pos - 2] << 8
| <libc.stdint.uint32_t><unsigned char>self.allc[self.pos - 3] << 16
| <libc.stdint.uint32_t><unsigned char>self.allc[self.pos - 4] << 24)
cpdef double read_bigendian_double(self) except? -1:
cdef libc.stdint.int64_t as_long = self.read_bigendian_int64()
return (<double*><char*>&as_long)[0]