From 8c460f47080aecc9fd4c1d2a4771f988297548ea Mon Sep 17 00:00:00 2001 From: Rob Marrowstone Date: Fri, 5 Apr 2024 12:50:13 -0700 Subject: [PATCH 1/8] Text Reader Refactor WIP Current (April '24) attempt to refactor the text reader. Two goals: * Reduce lines of code to 50% (roughly) and complexity to 25% * Increase perf of impl in scope by 4 - 5x In the end I want this to be performant and maintainable as we look to add 1.1 functionality in the future. My first attempt at this was the protonic POC: `protonic-poc` What I has changed since then: * Refactored binary parser to context frame holding parser pattern * Implemented basic SliceableBuffer used in the above * Understand lazy IonThunkEvent values better Given what I know now, I think this should: * Use some parser combinator _pattern_ to reduce boilerplate * Lex _eagerly_ but lightly, and use IonThunkEvents for lazy "parsing" * Use lookup tables judiciously Next steps: * Complete SliceableBuffer methods and tests * Make minimum of Symbols, Lists and Structs work and test --- amazon/ion/reader_text2.py | 371 +++++++++++++++++++++++++++++++++ amazon/ion/sliceable_buffer.py | 41 ++++ 2 files changed, 412 insertions(+) create mode 100644 amazon/ion/reader_text2.py diff --git a/amazon/ion/reader_text2.py b/amazon/ion/reader_text2.py new file mode 100644 index 000000000..fa8ca79ca --- /dev/null +++ b/amazon/ion/reader_text2.py @@ -0,0 +1,371 @@ +# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the License. +# A copy of the License is located at: +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS +# OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the +# License. + +import base64 +from decimal import Decimal +from collections import defaultdict, deque +from enum import IntEnum +from functools import partial +from typing import Optional, NamedTuple, Iterator, Callable, Tuple + +from amazon.ion.core import Transition, ION_STREAM_INCOMPLETE_EVENT, ION_STREAM_END_EVENT, IonType, IonEvent, \ + IonEventType, IonThunkEvent, TimestampPrecision, timestamp, ION_VERSION_MARKER_EVENT +from amazon.ion.exceptions import IonException +from amazon.ion.reader import BufferQueue, reader_trampoline, ReadEventType, CodePointArray, CodePoint +from amazon.ion.sliceable_buffer import SliceableBuffer +from amazon.ion.symbols import SymbolToken, TEXT_ION_1_0 +from amazon.ion.util import coroutine, _next_code_point, CodePoint + + +def _illegal_character(c, ctx, message=''): + """Raises an IonException upon encountering the given illegal character in the given context. + + Args: + c (int|None): Ordinal of the illegal character. + ctx (_HandlerContext): Context in which the illegal character was encountered. + message (Optional[str]): Additional information, as necessary. + + """ + container_type = ctx.container.ion_type is None and 'top-level' or ctx.container.ion_type.name + value_type = ctx.ion_type is None and 'unknown' or ctx.ion_type.name + if c is None: + header = 'Illegal token' + else: + c = 'EOF' if BufferQueue.is_eof(c) else chr(c) + header = 'Illegal character %s' % (c,) + raise IonException('%s at position %d in %s value contained in %s. %s Pending value: %s' + % (header, ctx.queue.position, value_type, container_type, message, ctx.value)) + + +def _defaultdict(dct, fallback=_illegal_character): + """Wraps the given dictionary such that the given fallback function will be called when a nonexistent key is + accessed. + """ + out = defaultdict(lambda: fallback) + for k, v in iter(dct.items()): + out[k] = v + return out + + +def _merge_mappings(*args): + """Merges a sequence of dictionaries and/or tuples into a single dictionary. + + If a given argument is a tuple, it must have two elements, the first of which is a sequence of keys and the second + of which is a single value, which will be mapped to from each of the keys in the sequence. + """ + dct = {} + for arg in args: + if isinstance(arg, dict): + merge = arg + else: + assert isinstance(arg, tuple) + keys, value = arg + merge = dict(zip(keys, [value]*len(keys))) + dct.update(merge) + return dct + + +def _seq(s): + """Converts bytes to a sequence of integer code points.""" + return tuple(iter(s)) + + +_ENCODING = 'utf-8' + +# NOTE: the following are stored as sequences of integer code points. This simplifies dealing with inconsistencies +# between how bytes objects are handled in python 2 and 3, and simplifies logic around comparing multi-byte characters. +_WHITESPACE_NOT_NL = _seq(b' \t\v\f') +_WHITESPACE = _WHITESPACE_NOT_NL + _seq(b'\n\r') +_VALUE_TERMINATORS = _seq(b'{}[](),\"\' \t\n\r/') +_SYMBOL_TOKEN_TERMINATORS = _WHITESPACE + _seq(b'/:') +_DIGITS = _seq(b'0123456789') +_BINARY_RADIX = _seq(b'Bb') +_BINARY_DIGITS = _seq(b'01') +_HEX_RADIX = _seq(b'Xx') +_HEX_DIGITS = _DIGITS + _seq(b'abcdefABCDEF') +_DECIMAL_EXPS = _seq(b'Dd') +_FLOAT_EXPS = _seq(b'Ee') +_SIGN = _seq(b'+-') +_TIMESTAMP_YEAR_DELIMITERS = _seq(b'-T') +_TIMESTAMP_DELIMITERS = _seq(b'-:+.') +_TIMESTAMP_OFFSET_INDICATORS = _seq(b'Z+-') +_LETTERS = _seq(b'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ') +_BASE64_DIGITS = _LETTERS + _DIGITS + _seq(b'+/') +_IDENTIFIER_STARTS = _LETTERS + _seq(b'_') # Note: '$' is dealt with separately. +_IDENTIFIER_CHARACTERS = _IDENTIFIER_STARTS + _DIGITS + _seq(b'$') +_OPERATORS = _seq(b'!#%&*+-./;<=>?@^`|~') +_COMMON_ESCAPES = _seq(b'abtnfrv?0\'"/\\') +_NEWLINES = _seq(b'\r\n') + +_UNDERSCORE = ord(b'_') +_DOT = ord(b'.') +_COMMA = ord(b',') +_COLON = ord(b':') +_SLASH = ord(b'/') +_ASTERISK = ord(b'*') +_BACKSLASH = ord(b'\\') +_CARRIAGE_RETURN = ord(b'\r') +_NEWLINE = ord(b'\n') +_DOUBLE_QUOTE = ord(b'"') +_SINGLE_QUOTE = ord(b'\'') +_DOLLAR_SIGN = ord(b'$') +_PLUS = ord(b'+') +_MINUS = ord(b'-') +_HYPHEN = _MINUS +_T = ord(b'T') +_Z = ord(b'Z') +_T_LOWER = ord(b't') +_N_LOWER = ord(b'n') +_F_LOWER = ord(b'f') +_ZERO = _DIGITS[0] +_OPEN_BRACE = ord(b'{') +_OPEN_BRACKET = ord(b'[') +_OPEN_PAREN = ord(b'(') +_CLOSE_BRACE = ord(b'}') +_CLOSE_BRACKET = ord(b']') +_CLOSE_PAREN = ord(b')') +_BASE64_PAD = ord(b'=') +_QUESTION_MARK = ord(b'?') +_UNICODE_ESCAPE_2 = ord(b'x') +_UNICODE_ESCAPE_4 = ord(b'u') +_UNICODE_ESCAPE_8 = ord(b'U') + +_ESCAPED_NEWLINE = u'' # An escaped newline expands to nothing. + +_MAX_TEXT_CHAR = 0x10ffff +_MAX_CLOB_CHAR = 0x7f +_MIN_QUOTED_CHAR = 0x20 + +# The following suffixes are used for comparison when a token is found that starts with the first letter in +# the keyword. For example, when a new token starts with 't', the next three characters must match those in +# _TRUE_SUFFIX, followed by an acceptable termination character, in order for the token to match the 'true' keyword. +_TRUE_SUFFIX = _seq(b'rue') +_FALSE_SUFFIX = _seq(b'alse') +_NAN_SUFFIX = _seq(b'an') +_INF_SUFFIX = _seq(b'inf') +_IVM_PREFIX = _seq(b'$ion_') + +_IVM_EVENTS = { + TEXT_ION_1_0: ION_VERSION_MARKER_EVENT, +} + +_POS_INF = float('+inf') +_NEG_INF = float('-inf') +_NAN = float('nan') + + +def _ends_value(c): + return c in _VALUE_TERMINATORS or BufferQueue.is_eof(c) + + +class _NullSequence: + """Contains the terminal character sequence for the typed null suffix of the given IonType, starting with the first + character after the one which disambiguated the type. + + For example, SYMBOL's _NullSequence contains the characters 'mbol' because 'null.s' is ambiguous until 'y' is found, + at which point it must end in 'mbol'. + + Instances are used as leaves of the typed null prefix tree below. + """ + def __init__(self, ion_type, sequence): + self.ion_type = ion_type + self.sequence = sequence + + def __getitem__(self, item): + return self.sequence[item] + +_NULL_SUFFIX = _NullSequence(IonType.NULL, _seq(b'ull')) +_NULL_SYMBOL_SUFFIX = _NullSequence(IonType.SYMBOL, _seq(b'mbol')) +_NULL_SEXP_SUFFIX = _NullSequence(IonType.SEXP, _seq(b'xp')) +_NULL_STRING_SUFFIX = _NullSequence(IonType.STRING, _seq(b'ng')) +_NULL_STRUCT_SUFFIX = _NullSequence(IonType.STRUCT, _seq(b'ct')) +_NULL_INT_SUFFIX = _NullSequence(IonType.INT, _seq(b'nt')) +_NULL_FLOAT_SUFFIX = _NullSequence(IonType.FLOAT, _seq(b'loat')) +_NULL_DECIMAL_SUFFIX = _NullSequence(IonType.DECIMAL, _seq(b'ecimal')) +_NULL_CLOB_SUFFIX = _NullSequence(IonType.CLOB, _seq(b'lob')) +_NULL_LIST_SUFFIX = _NullSequence(IonType.LIST, _seq(b'ist')) +_NULL_BLOB_SUFFIX = _NullSequence(IonType.BLOB, _seq(b'ob')) +_NULL_BOOL_SUFFIX = _NullSequence(IonType.BOOL, _seq(b'ol')) +_NULL_TIMESTAMP_SUFFIX = _NullSequence(IonType.TIMESTAMP, _seq(b'imestamp')) + + +# The following implements a prefix tree used to determine whether a typed null keyword has been found (see +# _typed_null_handler). The leaves of the tree (enumerated above) are the terminal character sequences for the 13 +# possible suffixes to 'null.'. Any other suffix to 'null.' is an error. _NULL_STARTS is entered when 'null.' is found. + +_NULL_STR_NEXT = { + ord(b'i'): _NULL_STRING_SUFFIX, + ord(b'u'): _NULL_STRUCT_SUFFIX +} + +_NULL_ST_NEXT = { + ord(b'r'): _NULL_STR_NEXT +} + +_NULL_S_NEXT = { + ord(b'y'): _NULL_SYMBOL_SUFFIX, + ord(b'e'): _NULL_SEXP_SUFFIX, + ord(b't'): _NULL_ST_NEXT +} + +_NULL_B_NEXT = { + ord(b'l'): _NULL_BLOB_SUFFIX, + ord(b'o'): _NULL_BOOL_SUFFIX +} + +_NULL_STARTS = { + ord(b'n'): _NULL_SUFFIX, # null.null + ord(b's'): _NULL_S_NEXT, # null.string, null.symbol, null.struct, null.sexp + ord(b'i'): _NULL_INT_SUFFIX, # null.int + ord(b'f'): _NULL_FLOAT_SUFFIX, # null.float + ord(b'd'): _NULL_DECIMAL_SUFFIX, # null.decimal + ord(b'b'): _NULL_B_NEXT, # null.bool, null.blob + ord(b'c'): _NULL_CLOB_SUFFIX, # null.clob + ord(b'l'): _NULL_LIST_SUFFIX, # null.list + ord(b't'): _NULL_TIMESTAMP_SUFFIX, # null.timestamp +} + + +class _ContainerContext(NamedTuple): + """A description of an Ion container, including the container's IonType and its textual delimiter and end character, + if applicable. + + This is tracked as part of the current token's context, and is useful when certain lexing decisions depend on + which container the token is a member of. For example, ending a numeric token with ']' is not legal unless that + token is contained in a list. + + Args: + end (tuple): Tuple containing the container's end character, if any. + delimiter (tuple): Tuple containing the container's delimiter character, if any. + ion_type (Optional[IonType]): The container's IonType, if any. + is_delimited (bool): True if delimiter is not empty; otherwise, False. + """ + end: tuple + delimiter: tuple + ion_type: Optional[IonType] + is_delimited: bool + +_C_TOP_LEVEL = _ContainerContext((), (), None, False) +_C_STRUCT = _ContainerContext((_CLOSE_BRACE,), (_COMMA,), IonType.STRUCT, True) +_C_LIST = _ContainerContext((_CLOSE_BRACKET,), (_COMMA,), IonType.LIST, True) +_C_SEXP = _ContainerContext((_CLOSE_PAREN,), (), IonType.SEXP, False) + + + +class _ContextFrame(NamedTuple): + parser: Callable[[SliceableBuffer], Tuple[IonEvent, SliceableBuffer]] + ion_type: Optional[IonType] + depth: int + + +def _whitespace(byte): + return byte in bytearray(b" \n\t\r\f") + +def _tlv_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: + (_, buffer) = buffer.read_while(_whitespace) + if buffer.is_eof(): + return IonEvent.STREAM_END, buffer + if + + +def _list_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: + (_, buffer) = buffer.read_while(lambda b: b in bytearray(b" \n\t\r\f")) + if buffer.is_eof(): + raise IonException("Unexpected end of file") + if buffer.read_byte() == _CLOSE_BRACKET: + # todo: i guess i need to pass the depth here too! + return IonEvent(IonEventType.CONTAINER_END, IonType.LIST, 0), buffer + + # XXX: we expect the _tlv_parser to trim trailing whitespace, I guess?S?!? + (event, buffer) = _tlv_parser(buffer) + + # if buffer is empty... durn we need to wait for comma i guess? erg. + # if comma, consume it + if buffer.read_byte() == _COMMA: + pass + elif buffer.read_byte() == _CLOSE_BRACKET: + pass + else: + error! + + +def _struct_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: + pass + +def _sexp_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: + pass + +_parser_table = [ + None, # NULL = 0 + None, # BOOL = 1 + None, # INT = 2 + None, # FLOAT = 3 + None, # DECIMAL = 4 + None, # TIMESTAMP = 5 + None, # SYMBOL = 6 + None, # STRING = 7 + None, # CLOB = 8 + None, # BLOB = 9 + _list_parser, # LIST = 10 + _sexp_parser, # SEXP = 11 + _struct_parser, # STRUCT = 12 +] + + +@coroutine +def stream_handler(): + """ + Handler for an Ion Text value-stream. + """ + buffer = SliceableBuffer.empty() + context_stack = deque([_ContextFrame(_tlv_parser, None, 0)]) + ion_event = None + skip_or_next = ReadEventType.NEXT + expect_data = False + + while True: + read_event = yield ion_event + assert read_event is not None + + # part 1: handle user's read event + if expect_data: + if read_event.type is not ReadEventType.DATA: + raise TypeError("Data expected") + buffer = buffer.extend(read_event.data) + else: + if read_event.type is ReadEventType.DATA: + raise TypeError("Next or Skip expected") + skip_or_next = read_event.type + + if skip_or_next is ReadEventType.SKIP: + raise NotImplementedError("Skip is not supported") + + # part 2: do some lexxing + (parser, ctx_type, depth) = context_stack[-1] + + (ion_event, buffer) = parser(buffer) + event_type = ion_event.type + ion_type = ion_event.ion_type + + if event_type is IonEventType.STREAM_END: + break + elif event_type is IonEventType.INCOMPLETE: + raise NotImplementedError("Incomplete is not supported") + elif event_type is IonEventType.CONTAINER_START: + parser = _parser_table[ion_type] + context_stack.append(_ContextFrame(parser, ion_type, depth + 1)) + elif event_type is IonEventType.CONTAINER_END: + assert ion_type is ctx_type + assert depth > 0 + context_stack.pop() diff --git a/amazon/ion/sliceable_buffer.py b/amazon/ion/sliceable_buffer.py index e2062fdbc..6cc378831 100644 --- a/amazon/ion/sliceable_buffer.py +++ b/amazon/ion/sliceable_buffer.py @@ -123,6 +123,47 @@ def read_slice(self, n): return memoryview(combined), SliceableBuffer(chunks[i:], remaining, size - n) + def read_while(self, pred): + """ + Read bytes while pred(byte) is True, return (bytes, new buffer). + + Raise IncompleteReadError if the buffer is empty or pred never returns False. + """ + size = self.size + chunks = self._chunks + offset = self._offset + + raise NotImplementedError("foo ya") + + def peek(self, n): + """ + Peek an n bytes, return bytes. + + Raise IncompleteReadError if the buffer is n > size. + """ + size = self.size + chunks = self._chunks + offset = self._offset + + if size < n: + raise IncompleteReadError(f'Buffer has size {size}, but {n} bytes were requested!') + + # short-circuit when we can serve full peek from first chunk + # optimizes for common case and simplifies accumulation loop + (chunk, length) = chunks[0] + if n < length: + return chunk[offset:offset + n] + elif n == length: + return chunk[offset:] + + combined = bytearray(n) + cursor = 0 + for (chunk, length) in chunks: + combined[cursor:cursor + length] = chunk + cursor += length + + return memoryview(combined) + def skip(self, n): """ Skip max(n, size) bytes, return (skipped, new buffer). From 272bd28debbbd4e44a011448a02531e4ef8d8217 Mon Sep 17 00:00:00 2001 From: Rob Marrowstone Date: Fri, 19 Apr 2024 14:21:15 -0700 Subject: [PATCH 2/8] ParserRule Hacking WIP --- amazon/.DS_Store | Bin 0 -> 8196 bytes amazon/ion/protons.py | 63 +++++++++++++++++++++++++++++++++ amazon/ion/reader_text2.py | 20 ++++++++--- amazon/ion/sliceable_buffer.py | 59 ++++++++++++++++++++++++------ 4 files changed, 128 insertions(+), 14 deletions(-) create mode 100644 amazon/.DS_Store create mode 100644 amazon/ion/protons.py diff --git a/amazon/.DS_Store b/amazon/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..094f78f4157993a9fda1b57976072efbcbe27a06 GIT binary patch literal 8196 zcmeHMTWl3Y7@lui=apGvUa2?81_i$( zn?EABKF@P<<#slA*tSX;zNFK0gRJdZ`Ov^NEmAo;V7sPQ>hKD#>4&zxQlM(8mf^J% z6D@764at_xTP7Ql6I+@aslBOfa#B;5rZ#Tt$((e?T<@%O0MSQ)Ro~{&xf|X%ez8;D zMpRVg#-%skmc7yR1;*h4eQ3}g_1s?jjFsVwd~#t}e)Xx14m zIN88zcO0SY4+hrhK(vN^FCTPhpoI%G?i|Y zlh!UQT)LwEf%T28TX%NtIr!M+>U$RPn#H<4OmT4o`w831_m^xlI5Cv8g{Q)Yv_GGxy!SWV#5Yq?>k8g%6*!Gn0QmOZuC3ZoaJy+ zlV)uX>Bc}&dd>Goo!hqS#^9(oHbyS`HE4*s)9r>aB1S%t^@Hwl%cWUO)9|5pb;fR0 z9TL)Z)3nUX)yr#_?cHY>!~ThE-s&z9(D>J4uGIf1^@oc|67jBGrX6_sNZ1n!x$PEh zLNP6d1hcBDsq|X8Dq&=OHkea^mOSJ+kd z9s8bLW52OK*k9~#_74~|EJhtxp<TU^BL0D>|_U`>-De(2rpp#W9TFIL6>%9H(#w zPvaRpi|6nXUd0=D6K~-iT*QZ%#wC1$Pw^SP#5ed6*YGob!S4zbMX690Ds@UyS*6q~ z>y<`@uJc4u%E>V-`EspnqE?jZlpN7joicK#$!ylQYLq&>QnEYrotnsI&O$!X zk?2wh{(J^5XGP_^>;pEpJH4 zmeD3D-oJslVsGAcJoh;Dlo5KXa?H(f1S;fswc`B0Yxev9Ii6KKTpWQo0&^JwRCe`t zb Parser: diff --git a/amazon/ion/reader_text2.py b/amazon/ion/reader_text2.py index fa8ca79ca..3eacf7d8a 100644 --- a/amazon/ion/reader_text2.py +++ b/amazon/ion/reader_text2.py @@ -261,6 +261,16 @@ class _ContainerContext(NamedTuple): _C_LIST = _ContainerContext((_CLOSE_BRACKET,), (_COMMA,), IonType.LIST, True) _C_SEXP = _ContainerContext((_CLOSE_PAREN,), (), IonType.SEXP, False) +def invalid_char(c: int): + raise ValueError(f"Invalid char {c}") + +def open_brace_handler(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: + # todo: handle blob/clob + + +tlv_table = [invalid_char] * 256 +tlv_table[_OPEN_BRACE] = open_brace_handler +tlv_table[_OPEN_BRACKET] = open_bracket_handler class _ContextFrame(NamedTuple): @@ -275,7 +285,7 @@ def _whitespace(byte): def _tlv_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: (_, buffer) = buffer.read_while(_whitespace) if buffer.is_eof(): - return IonEvent.STREAM_END, buffer + return ION_STREAM_END_EVENT, buffer if @@ -322,13 +332,12 @@ def _sexp_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: _struct_parser, # STRUCT = 12 ] - @coroutine def stream_handler(): """ Handler for an Ion Text value-stream. """ - buffer = SliceableBuffer.empty() + buffer: SliceableBuffer = SliceableBuffer.empty() context_stack = deque([_ContextFrame(_tlv_parser, None, 0)]) ion_event = None skip_or_next = ReadEventType.NEXT @@ -359,8 +368,9 @@ def stream_handler(): ion_type = ion_event.ion_type if event_type is IonEventType.STREAM_END: - break + expect_data = True elif event_type is IonEventType.INCOMPLETE: + # todo: flushable/commit or something something raise NotImplementedError("Incomplete is not supported") elif event_type is IonEventType.CONTAINER_START: parser = _parser_table[ion_type] @@ -369,3 +379,5 @@ def stream_handler(): assert ion_type is ctx_type assert depth > 0 context_stack.pop() + + diff --git a/amazon/ion/sliceable_buffer.py b/amazon/ion/sliceable_buffer.py index 6cc378831..c993e098e 100644 --- a/amazon/ion/sliceable_buffer.py +++ b/amazon/ion/sliceable_buffer.py @@ -8,7 +8,9 @@ class SliceableBuffer: Reads return the data and a new buffer starting at the end of the read. As the reader advances past chunks (either through reads or skips), whole - chunks are dropped from the buffer. + chunks are dropped from the returned buffer. + + Once no buffers have references to chunks, those chunks will be GC'ed. Built with the assumption that chunks will be reasonably large and that relatively few (single digit) chunks will be buffered at once. @@ -127,13 +129,56 @@ def read_while(self, pred): """ Read bytes while pred(byte) is True, return (bytes, new buffer). - Raise IncompleteReadError if the buffer is empty or pred never returns False. + Raise IncompleteReadError if pred never returns False. """ size = self.size chunks = self._chunks offset = self._offset - raise NotImplementedError("foo ya") + # short-circuit when we can serve full read from first chunk + # optimizes for common case and simplifies accumulation loop + (chunk, length) = chunks[0] + for cursor in range(offset, length): + if not pred(chunk[cursor]): + # drop chunk if fully consumed + if cursor + 1 == length: + return chunk[offset:], SliceableBuffer(chunks[1:], 0, size - (length - offset)) + else: + return chunk[offset:cursor], SliceableBuffer(chunks, cursor + 1, size - (1 + cursor - offset)) + + count = length - offset + slices = [_ChunkPair(chunk[offset:], count)] + + # i is used to init the new buffer after the loop + i = 1 + for (i, pair) in enumerate(chunks[1:], start=1): + (chunk, length) = pair + for cursor in range(0, length): + if not pred(chunk[cursor]): + count += cursor + 1 + # drop chunk if fully consumed + if cursor + 1 == length: + i += 1 + cursor = 0 + slices.append(pair) + else: + slices.append(_ChunkPair(chunk[:cursor], cursor + 1)) + break + else: + count += length + slices.append(pair) + continue + break + else: + raise IncompleteReadError("pred never returned False") + + combined = bytearray(count) + offset = 0 + for (chunk, length) in slices: + combined[offset:offset + length] = chunk + offset += length + + return memoryview(combined), SliceableBuffer(chunks[i:], cursor + 1, size - count) def peek(self, n): """ @@ -156,13 +201,7 @@ def peek(self, n): elif n == length: return chunk[offset:] - combined = bytearray(n) - cursor = 0 - for (chunk, length) in chunks: - combined[cursor:cursor + length] = chunk - cursor += length - - return memoryview(combined) + raise NotImplementedError("no combiney yet!") def skip(self, n): """ From 7664db7738b2a7d468e00bf8a75b1824b004060f Mon Sep 17 00:00:00 2001 From: Rob Marrowstone Date: Fri, 19 Apr 2024 14:37:56 -0700 Subject: [PATCH 3/8] Move Old Protonic Code into amazon.ion --- amazon/ion/protons.py | 370 +++++++++++++++++++++++++++++++++---- amazon/ion/reader_text2.py | 318 ++----------------------------- tests/test_protons.py | 158 ++++++++++++++++ 3 files changed, 507 insertions(+), 339 deletions(-) create mode 100644 tests/test_protons.py diff --git a/amazon/ion/protons.py b/amazon/ion/protons.py index 30a8befe2..cd2eb2bff 100644 --- a/amazon/ion/protons.py +++ b/amazon/ion/protons.py @@ -1,63 +1,363 @@ -from typing import Callable, Tuple, Any, NamedTuple +from dataclasses import dataclass +from enum import Enum +from typing import Callable, Any -from amazon.ion.sliceable_buffer import SliceableBuffer +from amazon.protonic.data_source import DataSource -Parser = Callable[[SliceableBuffer], Tuple[Any, SliceableBuffer]] +""" +"protonic" is a parser combinator library for parsing ion and similar document +formats in python. -class IncompleteParseError(Exception): - pass +Parser combinators abstract a lot of the undifferentiated bits of parsing. They +represent a good middle ground between a direct transformation of an abstract +grammar representation and a bunch of hand-written parsing code. +The goals: +- Keep it simple: avoid unnecessary indirection, keep scope small, avoid fancy + ast manipulations. +- Support streaming: receive incremental inputs and produce incremental results +- Reasonably performant: avoid data copying, reduce call overhead and reference + counting. +- Enable good error messaging. TODO more here! +- Simple to extend: users can write their own functions and combinators. +""" -Fail = 'fail' -Eof = 'eof' +class ParseError(Exception): -class Tag(NamedTuple): - bytes: bytes + def __init__(self, message): + self.message = message -class Range(NamedTuple): - low: int - high: int +@dataclass +class ParserContext: + """ + Wraps a mutable DataSource to provide an immutable facade. + + Intended to hold other metadata information if/when the need + arises. + + FIXME: As implemented now this is super dangerous because if you + advance the underlying buffer it silently invalidates all of your + outstanding ParserContexts. We should fix this but for now I am + doing this because it gives me the api in the combinators that i + want without either a bunch of ref copying or doing something + more complex like weakrefs. + """ + + # TODO: add col/line or other context info for errors + source: DataSource + cursor: int = 0 + def avail(self): + return len(self.source) - self.cursor -class Alt(NamedTuple): - branches: list["ParserRule"] + def read(self, n): + start = self.cursor + end = start + n + return self.source[start:end] + def remaining(self, n): + return ParserContext(self.source, self.cursor + n) -class TakeWhile(NamedTuple): - byte_checker: Callable[[int], bool] + def advance(self): + """ + Drop anything from the buffer before cursor and reset cursor + to start of current buffer. + NOTE: once you have done this any and all outstanding ParserContexts + sharing the same DataSource will be invalid! + """ + self.source.advance(self.cursor) + self.cursor = 0 -class Peek(NamedTuple): - parser: "ParserRule" +class ResultType(Enum): + SUCCESS = "Success" + FAILURE = "Failure" + INCOMPLETE = "Incomplete" + DONE = "Done" -class Terminated(NamedTuple): - """ Indicates the rule must end with the terminator or the result is incomplete. - The result of ``rule`` is the result of the operation. +@dataclass +class ParseResult: + """ + base-class for parse results. """ - rule: "ParserRule" - terminal: "ParserRule" + type: ResultType + context: ParserContext -class Preceded(NamedTuple): - """ Indicates the rule may be preceded by another rule.""" - precedes: "ParserRule" - rule: "ParserRule" +@dataclass +class SuccessResult(ParseResult): + value: Any + type = ResultType.SUCCESS + def __init__(self, context, value): + self.context = context + self.value = value -class MapValue(NamedTuple): - parser: "ParserRule" - mapper: Callable[[bytes], Any] +Parser = Callable[[ParserContext], ParseResult] + + +def tag(tag: bytes) -> Parser: + """ + Match a sequence of bytes, a "tag" + """ + l = len(tag) + + def p(ctx: ParserContext): + avail = ctx.avail() + if avail < l: + # todo what happens when 0 is avail? that should be inomplete right? + if ctx.read(avail) == tag[:avail]: + return _inc_or_fail(ctx) + else: + return _failure(ctx) + + elif ctx.read(l) == tag: + return _success(ctx.remaining(l), tag) + else: + return _failure(ctx) + return p + + +def one_of(items: bytes) -> Parser: + """ + Match one of the bytes passed. + """ + def p(ctx): + if not ctx.avail(): + return _inc_or_fail(ctx) + + b = ctx.read(1) + + if b[0] in items: + return _success(ctx.remaining(1), b) + else: + return _failure(ctx) + return p + + +def terminated(item: Parser, terminal: Parser) -> Parser: + """ + checks that the value is terminated with terminal, which is not + consumed. result is that produced by item if both succeed. + """ + def p(ctx: ParserContext): + result = item(ctx) + if result.type is not ResultType.SUCCESS: + return ParseResult(result.type, ctx) + + ended = terminal(result.context) + if ended.type is not ResultType.SUCCESS: + return ParseResult(ended.type, ctx) + + return _success(ended.context, result.value) + return p + + +def delim(start: Parser, value: Parser, end: Parser) -> Parser: + """ + result of value is returned + + if start matches. + + this doesn't do any checks about incomplete vs done itself: it delegates that to it's component parsers + """ + def p(ctx): + started = start(ctx) + if started.type is not ResultType.SUCCESS: + return ParseResult(started.type, ctx) + + body = value(started.context) + if body.type is not ResultType.SUCCESS: + return ParseResult(body.type, ctx) + + ended = end(body.context) + if ended.type is not ResultType.SUCCESS: + return ParseResult(ended.type, ctx) + + return _success(ended.context, body.value) + return p + + +def take_while(pred) -> Parser: + """ + pred is fed one byte at a time. Why is it written this way? + I don't want to add the complexity in surface area to generalize + it to take a parser _and_ doing so would mean i'd need to combine + the results in a general way, _and_ ensure the parser made progress... + so this is the simplest and more performant thing i can think to do + while still having some generality. + + if the data ends before pred returns False _and_ there may be more + then it will be Incomplete. + + pred can signal an error by raising ParserError. + """ + def p(ctx: ParserContext): + initial_ctx = ctx + n = 0 + + while ctx.avail(): + result = pred(ctx.read(1)[0]) + if not result: + return _success(ctx, initial_ctx.read(n)) + + ctx = ctx.remaining(1) + n += 1 + + if ctx.source.is_complete(): + return _success(ctx, initial_ctx.read(n)) + else: + return _incomplete(initial_ctx) + return p + + +def value(parser: Parser, value: Any) -> Parser: + """ + If the parser succeeds, the resulting value will be replaced by the value. + """ + return map_value(parser, lambda _: value) + + +def map_value(parser: Parser, mapper: Callable) -> Parser: + """ + If the parser succeeds, the mapper will be called with the value + of the result and the return value will replace the value. + """ + def p(ctx): + result = parser(ctx) + if result.type is ResultType.SUCCESS: + return _success(result.context, mapper(result.value)) + else: + return result + + return p + + +def alt(*parsers: Parser) -> Parser: + """ + A switch between a variable number of parsers. + + A success or incomplete will propagate from the first that matchers. + Users are responsible for ordering parsers so that any that may complete + will come before potential incompletes. + """ + def p(ctx): + for parser in parsers: + result = parser(ctx) + if result.type is ResultType.SUCCESS or result.type is ResultType.INCOMPLETE: + return result + return _failure(ctx) + + return p + + +def is_eof() -> Parser: + """ + Results in the Successful None value when source is exhausted and marked EOF. + + Incomplete when source is exhausted but not EOF. + + Failure when there is data. + """ + def p(ctx): + if not ctx.avail(): + if ctx.source.is_complete(): + return _success(ctx, None) + else: + return _incomplete(ctx) + else: + return _failure(ctx) + return p + + +def preceded(prefix, item) -> Parser: + """ + checks that the prefix matches then returns item if it matches. + """ + def p(ctx: ParserContext): + + prior = prefix(ctx) + if prior.type is not ResultType.SUCCESS: + return prior + + result = item(prior.context) + if result.type is ResultType.SUCCESS: + return result + else: + return ParseResult(result.type, ctx) + return p + + +def succeed(value=None) -> Parser: + """ + Always succeeds. Context will be returned as-is, value will be value + """ + return lambda ctx: _success(ctx, value) + + +def fail() -> Parser: + """ + Always fails. Context will be returned as-is. + + todo: consider adding message + """ + return lambda ctx: _failure(ctx) + + +def error(message: str) -> Parser: + """ + Raises a ParseError with message + """ + def p(ctx): + raise ParseError(message) + return p + + +def ignore(parser: Parser) -> Parser: + """ + If the parser succeeds, throw away any value but keep the context. + """ + def p(ctx): + result = parser(ctx) + if result.type is ResultType.SUCCESS: + return _success(result.context, None) + else: + return result + return p + + +def peek(parser: Parser) -> Parser: + """ + Peek at the stream, result propagates but the context doesn't move forward. + """ + def p(ctx): + result = parser(ctx) + if result.type is ResultType.SUCCESS: + return _success(ctx, result.value) + else: + return result + return p + + +def _incomplete(context): + return ParseResult(ResultType.INCOMPLETE, context) + + +def _failure(context): + return ParseResult(ResultType.FAILURE, context) -class Constant(NamedTuple): - parser: "ParserRule" - value: Any +def _success(context, value): + return SuccessResult(context, value) -ParserRule = Tag | Range | Alt | TakeWhile | Peek | Terminated | Preceded | MapValue | Constant | Fail | Eof -def rule(parser: "ParserRule") -> Parser: +def _inc_or_fail(context): + if context.source.is_complete(): + return _failure(context) + else: + return _incomplete(context) diff --git a/amazon/ion/reader_text2.py b/amazon/ion/reader_text2.py index 3eacf7d8a..2e1717c51 100644 --- a/amazon/ion/reader_text2.py +++ b/amazon/ion/reader_text2.py @@ -12,325 +12,36 @@ # specific language governing permissions and limitations under the # License. -import base64 -from decimal import Decimal -from collections import defaultdict, deque -from enum import IntEnum -from functools import partial -from typing import Optional, NamedTuple, Iterator, Callable, Tuple +from collections import deque +from typing import Optional, NamedTuple, Callable, Tuple -from amazon.ion.core import Transition, ION_STREAM_INCOMPLETE_EVENT, ION_STREAM_END_EVENT, IonType, IonEvent, \ - IonEventType, IonThunkEvent, TimestampPrecision, timestamp, ION_VERSION_MARKER_EVENT -from amazon.ion.exceptions import IonException -from amazon.ion.reader import BufferQueue, reader_trampoline, ReadEventType, CodePointArray, CodePoint +from amazon.ion.core import IonType, IonEvent, \ + IonEventType +from amazon.ion.reader import ReadEventType from amazon.ion.sliceable_buffer import SliceableBuffer -from amazon.ion.symbols import SymbolToken, TEXT_ION_1_0 -from amazon.ion.util import coroutine, _next_code_point, CodePoint - - -def _illegal_character(c, ctx, message=''): - """Raises an IonException upon encountering the given illegal character in the given context. - - Args: - c (int|None): Ordinal of the illegal character. - ctx (_HandlerContext): Context in which the illegal character was encountered. - message (Optional[str]): Additional information, as necessary. - - """ - container_type = ctx.container.ion_type is None and 'top-level' or ctx.container.ion_type.name - value_type = ctx.ion_type is None and 'unknown' or ctx.ion_type.name - if c is None: - header = 'Illegal token' - else: - c = 'EOF' if BufferQueue.is_eof(c) else chr(c) - header = 'Illegal character %s' % (c,) - raise IonException('%s at position %d in %s value contained in %s. %s Pending value: %s' - % (header, ctx.queue.position, value_type, container_type, message, ctx.value)) - - -def _defaultdict(dct, fallback=_illegal_character): - """Wraps the given dictionary such that the given fallback function will be called when a nonexistent key is - accessed. - """ - out = defaultdict(lambda: fallback) - for k, v in iter(dct.items()): - out[k] = v - return out - - -def _merge_mappings(*args): - """Merges a sequence of dictionaries and/or tuples into a single dictionary. - - If a given argument is a tuple, it must have two elements, the first of which is a sequence of keys and the second - of which is a single value, which will be mapped to from each of the keys in the sequence. - """ - dct = {} - for arg in args: - if isinstance(arg, dict): - merge = arg - else: - assert isinstance(arg, tuple) - keys, value = arg - merge = dict(zip(keys, [value]*len(keys))) - dct.update(merge) - return dct - - -def _seq(s): - """Converts bytes to a sequence of integer code points.""" - return tuple(iter(s)) - - -_ENCODING = 'utf-8' - -# NOTE: the following are stored as sequences of integer code points. This simplifies dealing with inconsistencies -# between how bytes objects are handled in python 2 and 3, and simplifies logic around comparing multi-byte characters. -_WHITESPACE_NOT_NL = _seq(b' \t\v\f') -_WHITESPACE = _WHITESPACE_NOT_NL + _seq(b'\n\r') -_VALUE_TERMINATORS = _seq(b'{}[](),\"\' \t\n\r/') -_SYMBOL_TOKEN_TERMINATORS = _WHITESPACE + _seq(b'/:') -_DIGITS = _seq(b'0123456789') -_BINARY_RADIX = _seq(b'Bb') -_BINARY_DIGITS = _seq(b'01') -_HEX_RADIX = _seq(b'Xx') -_HEX_DIGITS = _DIGITS + _seq(b'abcdefABCDEF') -_DECIMAL_EXPS = _seq(b'Dd') -_FLOAT_EXPS = _seq(b'Ee') -_SIGN = _seq(b'+-') -_TIMESTAMP_YEAR_DELIMITERS = _seq(b'-T') -_TIMESTAMP_DELIMITERS = _seq(b'-:+.') -_TIMESTAMP_OFFSET_INDICATORS = _seq(b'Z+-') -_LETTERS = _seq(b'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ') -_BASE64_DIGITS = _LETTERS + _DIGITS + _seq(b'+/') -_IDENTIFIER_STARTS = _LETTERS + _seq(b'_') # Note: '$' is dealt with separately. -_IDENTIFIER_CHARACTERS = _IDENTIFIER_STARTS + _DIGITS + _seq(b'$') -_OPERATORS = _seq(b'!#%&*+-./;<=>?@^`|~') -_COMMON_ESCAPES = _seq(b'abtnfrv?0\'"/\\') -_NEWLINES = _seq(b'\r\n') - -_UNDERSCORE = ord(b'_') -_DOT = ord(b'.') -_COMMA = ord(b',') -_COLON = ord(b':') -_SLASH = ord(b'/') -_ASTERISK = ord(b'*') -_BACKSLASH = ord(b'\\') -_CARRIAGE_RETURN = ord(b'\r') -_NEWLINE = ord(b'\n') -_DOUBLE_QUOTE = ord(b'"') -_SINGLE_QUOTE = ord(b'\'') -_DOLLAR_SIGN = ord(b'$') -_PLUS = ord(b'+') -_MINUS = ord(b'-') -_HYPHEN = _MINUS -_T = ord(b'T') -_Z = ord(b'Z') -_T_LOWER = ord(b't') -_N_LOWER = ord(b'n') -_F_LOWER = ord(b'f') -_ZERO = _DIGITS[0] -_OPEN_BRACE = ord(b'{') -_OPEN_BRACKET = ord(b'[') -_OPEN_PAREN = ord(b'(') -_CLOSE_BRACE = ord(b'}') -_CLOSE_BRACKET = ord(b']') -_CLOSE_PAREN = ord(b')') -_BASE64_PAD = ord(b'=') -_QUESTION_MARK = ord(b'?') -_UNICODE_ESCAPE_2 = ord(b'x') -_UNICODE_ESCAPE_4 = ord(b'u') -_UNICODE_ESCAPE_8 = ord(b'U') - -_ESCAPED_NEWLINE = u'' # An escaped newline expands to nothing. - -_MAX_TEXT_CHAR = 0x10ffff -_MAX_CLOB_CHAR = 0x7f -_MIN_QUOTED_CHAR = 0x20 - -# The following suffixes are used for comparison when a token is found that starts with the first letter in -# the keyword. For example, when a new token starts with 't', the next three characters must match those in -# _TRUE_SUFFIX, followed by an acceptable termination character, in order for the token to match the 'true' keyword. -_TRUE_SUFFIX = _seq(b'rue') -_FALSE_SUFFIX = _seq(b'alse') -_NAN_SUFFIX = _seq(b'an') -_INF_SUFFIX = _seq(b'inf') -_IVM_PREFIX = _seq(b'$ion_') - -_IVM_EVENTS = { - TEXT_ION_1_0: ION_VERSION_MARKER_EVENT, -} - -_POS_INF = float('+inf') -_NEG_INF = float('-inf') -_NAN = float('nan') - - -def _ends_value(c): - return c in _VALUE_TERMINATORS or BufferQueue.is_eof(c) - - -class _NullSequence: - """Contains the terminal character sequence for the typed null suffix of the given IonType, starting with the first - character after the one which disambiguated the type. - - For example, SYMBOL's _NullSequence contains the characters 'mbol' because 'null.s' is ambiguous until 'y' is found, - at which point it must end in 'mbol'. - - Instances are used as leaves of the typed null prefix tree below. - """ - def __init__(self, ion_type, sequence): - self.ion_type = ion_type - self.sequence = sequence - - def __getitem__(self, item): - return self.sequence[item] - -_NULL_SUFFIX = _NullSequence(IonType.NULL, _seq(b'ull')) -_NULL_SYMBOL_SUFFIX = _NullSequence(IonType.SYMBOL, _seq(b'mbol')) -_NULL_SEXP_SUFFIX = _NullSequence(IonType.SEXP, _seq(b'xp')) -_NULL_STRING_SUFFIX = _NullSequence(IonType.STRING, _seq(b'ng')) -_NULL_STRUCT_SUFFIX = _NullSequence(IonType.STRUCT, _seq(b'ct')) -_NULL_INT_SUFFIX = _NullSequence(IonType.INT, _seq(b'nt')) -_NULL_FLOAT_SUFFIX = _NullSequence(IonType.FLOAT, _seq(b'loat')) -_NULL_DECIMAL_SUFFIX = _NullSequence(IonType.DECIMAL, _seq(b'ecimal')) -_NULL_CLOB_SUFFIX = _NullSequence(IonType.CLOB, _seq(b'lob')) -_NULL_LIST_SUFFIX = _NullSequence(IonType.LIST, _seq(b'ist')) -_NULL_BLOB_SUFFIX = _NullSequence(IonType.BLOB, _seq(b'ob')) -_NULL_BOOL_SUFFIX = _NullSequence(IonType.BOOL, _seq(b'ol')) -_NULL_TIMESTAMP_SUFFIX = _NullSequence(IonType.TIMESTAMP, _seq(b'imestamp')) - - -# The following implements a prefix tree used to determine whether a typed null keyword has been found (see -# _typed_null_handler). The leaves of the tree (enumerated above) are the terminal character sequences for the 13 -# possible suffixes to 'null.'. Any other suffix to 'null.' is an error. _NULL_STARTS is entered when 'null.' is found. - -_NULL_STR_NEXT = { - ord(b'i'): _NULL_STRING_SUFFIX, - ord(b'u'): _NULL_STRUCT_SUFFIX -} - -_NULL_ST_NEXT = { - ord(b'r'): _NULL_STR_NEXT -} - -_NULL_S_NEXT = { - ord(b'y'): _NULL_SYMBOL_SUFFIX, - ord(b'e'): _NULL_SEXP_SUFFIX, - ord(b't'): _NULL_ST_NEXT -} - -_NULL_B_NEXT = { - ord(b'l'): _NULL_BLOB_SUFFIX, - ord(b'o'): _NULL_BOOL_SUFFIX -} - -_NULL_STARTS = { - ord(b'n'): _NULL_SUFFIX, # null.null - ord(b's'): _NULL_S_NEXT, # null.string, null.symbol, null.struct, null.sexp - ord(b'i'): _NULL_INT_SUFFIX, # null.int - ord(b'f'): _NULL_FLOAT_SUFFIX, # null.float - ord(b'd'): _NULL_DECIMAL_SUFFIX, # null.decimal - ord(b'b'): _NULL_B_NEXT, # null.bool, null.blob - ord(b'c'): _NULL_CLOB_SUFFIX, # null.clob - ord(b'l'): _NULL_LIST_SUFFIX, # null.list - ord(b't'): _NULL_TIMESTAMP_SUFFIX, # null.timestamp -} - - -class _ContainerContext(NamedTuple): - """A description of an Ion container, including the container's IonType and its textual delimiter and end character, - if applicable. - - This is tracked as part of the current token's context, and is useful when certain lexing decisions depend on - which container the token is a member of. For example, ending a numeric token with ']' is not legal unless that - token is contained in a list. - - Args: - end (tuple): Tuple containing the container's end character, if any. - delimiter (tuple): Tuple containing the container's delimiter character, if any. - ion_type (Optional[IonType]): The container's IonType, if any. - is_delimited (bool): True if delimiter is not empty; otherwise, False. - """ - end: tuple - delimiter: tuple - ion_type: Optional[IonType] - is_delimited: bool - -_C_TOP_LEVEL = _ContainerContext((), (), None, False) -_C_STRUCT = _ContainerContext((_CLOSE_BRACE,), (_COMMA,), IonType.STRUCT, True) -_C_LIST = _ContainerContext((_CLOSE_BRACKET,), (_COMMA,), IonType.LIST, True) -_C_SEXP = _ContainerContext((_CLOSE_PAREN,), (), IonType.SEXP, False) - -def invalid_char(c: int): - raise ValueError(f"Invalid char {c}") - -def open_brace_handler(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: - # todo: handle blob/clob - - -tlv_table = [invalid_char] * 256 -tlv_table[_OPEN_BRACE] = open_brace_handler -tlv_table[_OPEN_BRACKET] = open_bracket_handler - - -class _ContextFrame(NamedTuple): - parser: Callable[[SliceableBuffer], Tuple[IonEvent, SliceableBuffer]] - ion_type: Optional[IonType] - depth: int +from amazon.ion.util import coroutine def _whitespace(byte): return byte in bytearray(b" \n\t\r\f") def _tlv_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: - (_, buffer) = buffer.read_while(_whitespace) - if buffer.is_eof(): - return ION_STREAM_END_EVENT, buffer - if - + pass def _list_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: - (_, buffer) = buffer.read_while(lambda b: b in bytearray(b" \n\t\r\f")) - if buffer.is_eof(): - raise IonException("Unexpected end of file") - if buffer.read_byte() == _CLOSE_BRACKET: - # todo: i guess i need to pass the depth here too! - return IonEvent(IonEventType.CONTAINER_END, IonType.LIST, 0), buffer - - # XXX: we expect the _tlv_parser to trim trailing whitespace, I guess?S?!? - (event, buffer) = _tlv_parser(buffer) - - # if buffer is empty... durn we need to wait for comma i guess? erg. - # if comma, consume it - if buffer.read_byte() == _COMMA: - pass - elif buffer.read_byte() == _CLOSE_BRACKET: - pass - else: - error! - - + pass def _struct_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: pass def _sexp_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: pass -_parser_table = [ - None, # NULL = 0 - None, # BOOL = 1 - None, # INT = 2 - None, # FLOAT = 3 - None, # DECIMAL = 4 - None, # TIMESTAMP = 5 - None, # SYMBOL = 6 - None, # STRING = 7 - None, # CLOB = 8 - None, # BLOB = 9 - _list_parser, # LIST = 10 - _sexp_parser, # SEXP = 11 - _struct_parser, # STRUCT = 12 -] + +class _ContextFrame(NamedTuple): + parser: Callable[[SliceableBuffer], Tuple[IonEvent, SliceableBuffer]] + ion_type: Optional[IonType] + depth: int + @coroutine def stream_handler(): @@ -380,4 +91,3 @@ def stream_handler(): assert depth > 0 context_stack.pop() - diff --git a/tests/test_protons.py b/tests/test_protons.py new file mode 100644 index 000000000..032f32145 --- /dev/null +++ b/tests/test_protons.py @@ -0,0 +1,158 @@ +from typing import Tuple, List + +import pytest + +from amazon.protonic.protons import * +from amazon.protonic.protons import _failure, _success + + +def expect_value(v, next=None): + """ + If next is None expects that the result context is exhausted. + Otherwise expects next to match the first bytes in the buffer. + """ + def expect(_, result): + assert result.type is ResultType.SUCCESS + assert v == result.value + if next: + n = len(next) + assert result.context.read(n) == next + else: + assert not result.context.avail() + return expect + + +def expect_value_if_done(v): + """ + Expects v if the source is complete, otherwise incomplete. + """ + def expect(ctx, result): + if ctx.source.is_complete(): + expect_value(v)(ctx, result) + else: + expect_incomplete()(ctx, result) + return expect + + +def expect_failure(): + def expect(ctx, result): + assert result.type is ResultType.FAILURE + assert ctx.avail() == result.context.avail() + return expect + + +def expect_incomplete(): + def expect(ctx, result): + assert result.type is ResultType.INCOMPLETE + assert ctx.avail() == result.context.avail() + return expect + + +def expect_inc_or_fail(): + def expect(ctx, result): + if ctx.source.is_complete(): + assert result.type is ResultType.FAILURE + else: + assert result.type is ResultType.INCOMPLETE + assert ctx.avail() == result.context.avail() + return expect + + +def parameterify(*tests: Tuple[Parser, List]): + return (["rule", "data", "expect"], + [(rule, data, expect) for (rule, ts) in tests for (data, expect) in ts]) + + +@pytest.mark.parametrize(*parameterify( + (tag(b"spam"), [ + ("spam", expect_value(b"spam")), + ("spam musubi", expect_value(b"spam", next=b" ")), + ("eggs", expect_failure()), + ("", expect_inc_or_fail()), + ("spa", expect_inc_or_fail()) + ]), + (one_of(b"abc"), [ + ("b", expect_value(b"b")), + ("abc", expect_value(b"a", next=b"b")), + ("d", expect_failure()), + ("", expect_inc_or_fail()) + ]), + (value(tag(b"spam"), "eggs"), [ + ("spam", expect_value("eggs")), + ("spam musubi", expect_value("eggs", next=b" ")), + ("beef", expect_failure()) + ]), + (delim(tag(b"{"), tag(b" "), tag(b"}")), [ + ("{ }", expect_value(b" ")), + ("{ };", expect_value(b" ", next=b";")), + ("{}", expect_failure()), + ("{", expect_inc_or_fail()), + ("{ ", expect_inc_or_fail()), + ("", expect_inc_or_fail()), + ("[]", expect_failure()), + (" }", expect_failure()), + ("{bad}", expect_failure()) + ]), + (take_while(lambda b: ord(b'a') <= b <= ord(b'c')), [ + ("abc", expect_value_if_done(b"abc")), + ("abc123", expect_value(b"abc", next=b"1")), + ("", expect_value_if_done(b"")), + ("123", expect_value(b"", next=b"1")) + ]), + (terminated(tag(b"foo"), tag(b";")), [ + ("foo;", expect_value(b"foo")), + ("foo|", expect_failure()), + ("foo", expect_inc_or_fail()), + ("qux", expect_failure()), + ("", expect_inc_or_fail()) + ]), + (preceded(tag(b"> "), tag(b"spam")), [ + ("> spam", expect_value(b"spam")), + ("$ spam", expect_failure()), + ("> eggs", expect_failure()), + ("", expect_inc_or_fail()), + ("> ", expect_inc_or_fail()) + ]), + (alt(tag(b"spam"), tag(b"spa"), tag(b"eggs")), [ + ("spa", expect_value_if_done(b"spa")), + ("eggs", expect_value(b"eggs")), + ("ham", expect_failure()) + ]), + (is_eof(), [ + ("", expect_value_if_done(None)), + ("a", expect_failure()) + ]) +)) +def test_rule(rule, data, expect): + """ + Tests rule against both complete and incomplete data sources. + """ + ctx = context_from(data) + result = rule(ctx) + expect(ctx, result) + + ctx = context_from(data, True) + result = rule(ctx) + expect(ctx, result) + + +def context_from(strdata, end=False): + source = DataSource() + source.extend(bytes(strdata, "utf-8")) + if end: + source.eof() + + return ParserContext(source) + + +def expect_next(next) -> Parser: + def p(c: ParserContext): + if not next: + assert not c.avail() + elif not c.avail(): + return _failure(c) + else: + # bytes() is not necessary but improves failure message + assert next == bytes(c.read(1)) + return _success(c, b"") + return p From 6cf712c95216ed405ba53e77d5abf2927e502fb8 Mon Sep 17 00:00:00 2001 From: Rob Marrowstone Date: Mon, 22 Apr 2024 13:52:13 -0700 Subject: [PATCH 4/8] Updated protons to work with SliceableBuffer This change updates the parser combinators to use the SliceableBuffer instead of the BufferContext as before. We're starting with just the minimal set to get off the ground with the text parser. --- amazon/ion/protons.py | 403 ++++++++++++++------------------- amazon/ion/reader_text2.py | 15 +- amazon/ion/sliceable_buffer.py | 9 +- tests/simple_harness.py | 51 +++++ tests/simple_write_harness.py | 44 ++++ tests/test_protons.py | 129 ++++++----- 6 files changed, 353 insertions(+), 298 deletions(-) create mode 100755 tests/simple_harness.py create mode 100755 tests/simple_write_harness.py diff --git a/amazon/ion/protons.py b/amazon/ion/protons.py index cd2eb2bff..abbc3cf40 100644 --- a/amazon/ion/protons.py +++ b/amazon/ion/protons.py @@ -2,7 +2,7 @@ from enum import Enum from typing import Callable, Any -from amazon.protonic.data_source import DataSource +from amazon.ion.sliceable_buffer import SliceableBuffer """ "protonic" is a parser combinator library for parsing ion and similar document @@ -29,49 +29,6 @@ def __init__(self, message): self.message = message -@dataclass -class ParserContext: - """ - Wraps a mutable DataSource to provide an immutable facade. - - Intended to hold other metadata information if/when the need - arises. - - FIXME: As implemented now this is super dangerous because if you - advance the underlying buffer it silently invalidates all of your - outstanding ParserContexts. We should fix this but for now I am - doing this because it gives me the api in the combinators that i - want without either a bunch of ref copying or doing something - more complex like weakrefs. - """ - - # TODO: add col/line or other context info for errors - source: DataSource - cursor: int = 0 - - def avail(self): - return len(self.source) - self.cursor - - def read(self, n): - start = self.cursor - end = start + n - return self.source[start:end] - - def remaining(self, n): - return ParserContext(self.source, self.cursor + n) - - def advance(self): - """ - Drop anything from the buffer before cursor and reset cursor - to start of current buffer. - - NOTE: once you have done this any and all outstanding ParserContexts - sharing the same DataSource will be invalid! - """ - self.source.advance(self.cursor) - self.cursor = 0 - - class ResultType(Enum): SUCCESS = "Success" FAILURE = "Failure" @@ -85,142 +42,142 @@ class ParseResult: base-class for parse results. """ type: ResultType - context: ParserContext - - -@dataclass -class SuccessResult(ParseResult): - value: Any - type = ResultType.SUCCESS + buffer: SliceableBuffer + value: Any = None - def __init__(self, context, value): - self.context = context - self.value = value - -Parser = Callable[[ParserContext], ParseResult] +Parser = Callable[[SliceableBuffer], ParseResult] def tag(tag: bytes) -> Parser: """ - Match a sequence of bytes, a "tag" - """ - l = len(tag) - - def p(ctx: ParserContext): - avail = ctx.avail() - if avail < l: - # todo what happens when 0 is avail? that should be inomplete right? - if ctx.read(avail) == tag[:avail]: - return _inc_or_fail(ctx) - else: - return _failure(ctx) + Match a sequence of bytes, a "tag". - elif ctx.read(l) == tag: - return _success(ctx.remaining(l), tag) - else: - return _failure(ctx) - return p - -def one_of(items: bytes) -> Parser: - """ - Match one of the bytes passed. """ - def p(ctx): - if not ctx.avail(): - return _inc_or_fail(ctx) + length = len(tag) + + def p(buffer: SliceableBuffer): + avail = buffer.size + + if avail >= length: + (data, buffer) = buffer.read_slice(length) + if data == tag: + return ParseResult(ResultType.SUCCESS, buffer, tag) + else: + return ParseResult(ResultType.FAILURE, buffer) - b = ctx.read(1) + if avail: + (data, buffer) = buffer.read_slice(avail) + else: + data = b"" - if b[0] in items: - return _success(ctx.remaining(1), b) + if data == tag[:avail] and not buffer.is_eof(): + return ParseResult(ResultType.INCOMPLETE, buffer) else: - return _failure(ctx) + return ParseResult(ResultType.FAILURE, buffer) + return p +# def one_of(items: bytes) -> Parser: +# """ +# Match one of the bytes passed. +# """ +# def p(ctx): +# if not ctx.avail(): +# return _inc_or_fail(ctx) +# +# b = ctx.read(1) +# +# if b[0] in items: +# return _success(ctx.remaining(1), b) +# else: +# return _failure(ctx) +# return p +# + def terminated(item: Parser, terminal: Parser) -> Parser: """ checks that the value is terminated with terminal, which is not consumed. result is that produced by item if both succeed. """ - def p(ctx: ParserContext): - result = item(ctx) - if result.type is not ResultType.SUCCESS: - return ParseResult(result.type, ctx) - - ended = terminal(result.context) - if ended.type is not ResultType.SUCCESS: - return ParseResult(ended.type, ctx) - - return _success(ended.context, result.value) - return p - - -def delim(start: Parser, value: Parser, end: Parser) -> Parser: - """ - result of value is returned - - if start matches. - - this doesn't do any checks about incomplete vs done itself: it delegates that to it's component parsers - """ - def p(ctx): - started = start(ctx) - if started.type is not ResultType.SUCCESS: - return ParseResult(started.type, ctx) - - body = value(started.context) + def p(buffer: SliceableBuffer): + body = item(buffer) if body.type is not ResultType.SUCCESS: - return ParseResult(body.type, ctx) + return body - ended = end(body.context) - if ended.type is not ResultType.SUCCESS: - return ParseResult(ended.type, ctx) + term = terminal(body.buffer) + if term.type is not ResultType.SUCCESS: + return term - return _success(ended.context, body.value) + return ParseResult(ResultType.SUCCESS, term.buffer, body.value) return p -def take_while(pred) -> Parser: - """ - pred is fed one byte at a time. Why is it written this way? - I don't want to add the complexity in surface area to generalize - it to take a parser _and_ doing so would mean i'd need to combine - the results in a general way, _and_ ensure the parser made progress... - so this is the simplest and more performant thing i can think to do - while still having some generality. - - if the data ends before pred returns False _and_ there may be more - then it will be Incomplete. - - pred can signal an error by raising ParserError. - """ - def p(ctx: ParserContext): - initial_ctx = ctx - n = 0 - - while ctx.avail(): - result = pred(ctx.read(1)[0]) - if not result: - return _success(ctx, initial_ctx.read(n)) - - ctx = ctx.remaining(1) - n += 1 - - if ctx.source.is_complete(): - return _success(ctx, initial_ctx.read(n)) - else: - return _incomplete(initial_ctx) - return p - - -def value(parser: Parser, value: Any) -> Parser: - """ - If the parser succeeds, the resulting value will be replaced by the value. - """ - return map_value(parser, lambda _: value) +# def delim(start: Parser, value: Parser, end: Parser) -> Parser: +# """ +# result of value is returned +# +# if start matches. +# +# this doesn't do any checks about incomplete vs done itself: it delegates that to it's component parsers +# """ +# def p(ctx): +# started = start(ctx) +# if started.type is not ResultType.SUCCESS: +# return ParseResult(started.type, ctx) +# +# body = value(started.context) +# if body.type is not ResultType.SUCCESS: +# return ParseResult(body.type, ctx) +# +# ended = end(body.context) +# if ended.type is not ResultType.SUCCESS: +# return ParseResult(ended.type, ctx) +# +# return _success(ended.context, body.value) +# return p + + +# def take_while(pred) -> Parser: +# """ +# pred is fed one byte at a time. Why is it written this way? +# I don't want to add the complexity in surface area to generalize +# it to take a parser _and_ doing so would mean i'd need to combine +# the results in a general way, _and_ ensure the parser made progress... +# so this is the simplest and more performant thing i can think to do +# while still having some generality. +# +# if the data ends before pred returns False _and_ there may be more +# then it will be Incomplete. +# +# pred can signal an error by raising ParserError. +# """ +# def p(buffer: ParserContext): +# initial_ctx = ctx +# n = 0 +# +# while ctx.avail(): +# result = pred(ctx.read(1)[0]) +# if not result: +# return _success(ctx, initial_ctx.read(n)) +# +# ctx = ctx.remaining(1) +# n += 1 +# +# if ctx.source.is_complete(): +# return _success(ctx, initial_ctx.read(n)) +# else: +# return _incomplete(initial_ctx) +# return p + + +def constant(parser: Parser, constant: Any) -> Parser: + """ + If the parser succeeds, the resulting value will be replaced by the constant. + """ + return map_value(parser, lambda _: constant) def map_value(parser: Parser, mapper: Callable) -> Parser: @@ -228,10 +185,10 @@ def map_value(parser: Parser, mapper: Callable) -> Parser: If the parser succeeds, the mapper will be called with the value of the result and the return value will replace the value. """ - def p(ctx): - result = parser(ctx) + def p(buffer: SliceableBuffer): + result = parser(buffer) if result.type is ResultType.SUCCESS: - return _success(result.context, mapper(result.value)) + return ParseResult(ResultType.SUCCESS, result.buffer, mapper(result.value)) else: return result @@ -246,12 +203,12 @@ def alt(*parsers: Parser) -> Parser: Users are responsible for ordering parsers so that any that may complete will come before potential incompletes. """ - def p(ctx): + def p(buffer: SliceableBuffer): for parser in parsers: - result = parser(ctx) + result = parser(buffer) if result.type is ResultType.SUCCESS or result.type is ResultType.INCOMPLETE: return result - return _failure(ctx) + return ParseResult(ResultType.FAILURE, buffer) return p @@ -264,100 +221,88 @@ def is_eof() -> Parser: Failure when there is data. """ - def p(ctx): - if not ctx.avail(): - if ctx.source.is_complete(): - return _success(ctx, None) + def p(buffer: SliceableBuffer): + if not buffer.size: + if buffer.is_eof(): + return ParseResult(ResultType.SUCCESS, buffer) else: - return _incomplete(ctx) + return ParseResult(ResultType.INCOMPLETE, buffer) else: - return _failure(ctx) + return ParseResult(ResultType.FAILURE, buffer) return p -def preceded(prefix, item) -> Parser: +def preceded(prefix: Parser, item: Parser) -> Parser: """ checks that the prefix matches then returns item if it matches. + + if both match then the buffer returned from item is returned """ - def p(ctx: ParserContext): + def p(buffer: SliceableBuffer): - prior = prefix(ctx) + prior = prefix(buffer) if prior.type is not ResultType.SUCCESS: return prior - result = item(prior.context) - if result.type is ResultType.SUCCESS: - return result - else: - return ParseResult(result.type, ctx) - return p - - -def succeed(value=None) -> Parser: - """ - Always succeeds. Context will be returned as-is, value will be value - """ - return lambda ctx: _success(ctx, value) - - -def fail() -> Parser: - """ - Always fails. Context will be returned as-is. - - todo: consider adding message - """ - return lambda ctx: _failure(ctx) - - -def error(message: str) -> Parser: - """ - Raises a ParseError with message - """ - def p(ctx): - raise ParseError(message) + return item(prior.buffer) return p -def ignore(parser: Parser) -> Parser: - """ - If the parser succeeds, throw away any value but keep the context. - """ - def p(ctx): - result = parser(ctx) - if result.type is ResultType.SUCCESS: - return _success(result.context, None) - else: - return result - return p +# def succeed(value=None) -> Parser: +# """ +# Always succeeds. Context will be returned as-is, value will be value +# """ +# return lambda ctx: _success(ctx, value) +# +# +# def fail() -> Parser: +# """ +# Always fails. Context will be returned as-is. +# +# todo: consider adding message +# """ +# return lambda ctx: _failure(ctx) +# +# +# def error(message: str) -> Parser: +# """ +# Raises a ParseError with message +# """ +# def p(ctx): +# raise ParseError(message) +# return p + + +#def ignore(parser: Parser) -> Parser: +# """ +# If the parser succeeds, throw away any value but keep the context. +# """ +# def p(ctx): +# result = parser(ctx) +# if result.type is ResultType.SUCCESS: +# return _success(result.context, None) +# else: +# return result +# return p def peek(parser: Parser) -> Parser: """ Peek at the stream, result propagates but the context doesn't move forward. """ - def p(ctx): - result = parser(ctx) - if result.type is ResultType.SUCCESS: - return _success(ctx, result.value) - else: - return result + def p(buffer: SliceableBuffer): + result = parser(buffer) + return ParseResult(result.type, buffer, result.value) return p -def _incomplete(context): - return ParseResult(ResultType.INCOMPLETE, context) - - -def _failure(context): - return ParseResult(ResultType.FAILURE, context) +def _incomplete(buffer: SliceableBuffer) -> ParseResult: + raise NotImplementedError("todo") -def _success(context, value): - return SuccessResult(context, value) +def _failure(buffer: SliceableBuffer) -> ParseResult: + raise NotImplementedError("todo") -def _inc_or_fail(context): - if context.source.is_complete(): - return _failure(context) - else: - return _incomplete(context) +def _success(buffer: SliceableBuffer, value: Any) -> ParseResult: + raise NotImplementedError("todo") diff --git a/amazon/ion/reader_text2.py b/amazon/ion/reader_text2.py index 2e1717c51..eafca3b9e 100644 --- a/amazon/ion/reader_text2.py +++ b/amazon/ion/reader_text2.py @@ -25,17 +25,27 @@ def _whitespace(byte): return byte in bytearray(b" \n\t\r\f") + def _tlv_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: pass + def _list_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: pass + + def _struct_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: pass def _sexp_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: - pass + raise NotImplementedError("todo") + +_container_parsers = [ + _list_parser, + _sexp_parser, + _struct_parser, +] class _ContextFrame(NamedTuple): parser: Callable[[SliceableBuffer], Tuple[IonEvent, SliceableBuffer]] @@ -84,10 +94,9 @@ def stream_handler(): # todo: flushable/commit or something something raise NotImplementedError("Incomplete is not supported") elif event_type is IonEventType.CONTAINER_START: - parser = _parser_table[ion_type] + parser = _container_parsers[ion_type - IonType.LIST] context_stack.append(_ContextFrame(parser, ion_type, depth + 1)) elif event_type is IonEventType.CONTAINER_END: assert ion_type is ctx_type assert depth > 0 context_stack.pop() - diff --git a/amazon/ion/sliceable_buffer.py b/amazon/ion/sliceable_buffer.py index c993e098e..7fef693a9 100644 --- a/amazon/ion/sliceable_buffer.py +++ b/amazon/ion/sliceable_buffer.py @@ -23,7 +23,7 @@ def empty(): """ return SliceableBuffer([]) - def __init__(self, chunks, offset=0, size=0): + def __init__(self, chunks, offset=0, size=0, eof=False): """ *Class internal usage only.* @@ -35,6 +35,7 @@ def __init__(self, chunks, offset=0, size=0): # on each read or skip. self._offset = offset self.size = size + self._eof = eof def extend(self, chunk): """ @@ -240,6 +241,12 @@ def __len__(self): """ return self.size + def is_eof(self): + return self._eof + + def eof(self): + return SliceableBuffer(self._chunks, self._offset, self.size, True) + class IncompleteReadError(IndexError): pass diff --git a/tests/simple_harness.py b/tests/simple_harness.py new file mode 100755 index 000000000..e85c668ec --- /dev/null +++ b/tests/simple_harness.py @@ -0,0 +1,51 @@ +import time +from pstats import SortKey + +from amazon.ion.reader import blocking_reader, NEXT_EVENT, reader_trampoline +from amazon.ion.core import IonEvent, IonEventType +from amazon.ion.reader_binary import binary_reader +from amazon.ion.reader_managed import managed_reader + +from amazon.ion import simpleion + + +def ionc_load(fp): + iter = simpleion.load(fp, single_value=False, parse_eagerly=False) + ct = 0 + for v in iter: + # print(v) + ct += 1 + print(ct) + +def do_it(fp): + g = blocking_reader(managed_reader(binary_reader()), fp) + + while 1: + e: IonEvent = g.send(NEXT_EVENT) + print(f"event: {e}") + if e.event_type is IonEventType.STREAM_END: + print("Done!") + break + + +if __name__ == '__main__': + + import cProfile + + # fp = open('/Users/calhounr/python_bm.ion') + # g = blocking_reader(managed_reader(text_reader(is_unicode=True), None), fp) + # todo: wrap with skipping and symbol table mgmt + for i in range(0, 1): + fp = open('service_log_legacy.i0n', 'rb') + ionc_load(fp) + + # with cProfile.Profile() as p: + # # do_it(fp) + # ct = 0 + # iter = simpleion.load(fp, single_value=False, parse_eagerly=False) + # for v in iter: + # #print(v) + # ct += 1 + # print(ct) + # + # p.print_stats(sort=SortKey.TIME) diff --git a/tests/simple_write_harness.py b/tests/simple_write_harness.py new file mode 100755 index 000000000..1caad04a2 --- /dev/null +++ b/tests/simple_write_harness.py @@ -0,0 +1,44 @@ +import time +from pstats import SortKey + +from amazon.ion.reader import blocking_reader, NEXT_EVENT, reader_trampoline +from amazon.ion.core import IonEvent, IonEventType +from amazon.ion.reader_binary import binary_reader +from amazon.ion.reader_managed import managed_reader + +from amazon.ion import simpleion + + +def ionc_load(fp): + iter = simpleion.load(fp, single_value=False, parse_eagerly=False) + data = [] + for v in iter: + data.append(v) + if len(data) >= 10000: + break + return data + + +if __name__ == '__main__': + + import cProfile + + # fp = open('/Users/calhounr/python_bm.ion') + # g = blocking_reader(managed_reader(text_reader(is_unicode=True), None), fp) + # todo: wrap with skipping and symbol table mgmt + fp = open('service_log_legacy.i0n', 'rb') + data = ionc_load(fp) + + wfp = open('out.i0n', 'wb') + simpleion.dump(data, wfp, sequence_as_stream=True) + + # with cProfile.Profile() as p: + # # do_it(fp) + # ct = 0 + # iter = simpleion.load(fp, single_value=False, parse_eagerly=False) + # for v in iter: + # #print(v) + # ct += 1 + # print(ct) + # + # p.print_stats(sort=SortKey.TIME) diff --git a/tests/test_protons.py b/tests/test_protons.py index 032f32145..df07421e0 100644 --- a/tests/test_protons.py +++ b/tests/test_protons.py @@ -2,8 +2,7 @@ import pytest -from amazon.protonic.protons import * -from amazon.protonic.protons import _failure, _success +from amazon.ion.protons import * def expect_value(v, next=None): @@ -11,14 +10,15 @@ def expect_value(v, next=None): If next is None expects that the result context is exhausted. Otherwise expects next to match the first bytes in the buffer. """ - def expect(_, result): + def expect(result: ParseResult): assert result.type is ResultType.SUCCESS assert v == result.value if next: n = len(next) - assert result.context.read(n) == next + (data, _) = result.buffer.read_slice(n) + assert data == next else: - assert not result.context.avail() + assert not result.buffer.size return expect @@ -26,35 +26,32 @@ def expect_value_if_done(v): """ Expects v if the source is complete, otherwise incomplete. """ - def expect(ctx, result): - if ctx.source.is_complete(): - expect_value(v)(ctx, result) + def expect(result: ParseResult): + if not result.buffer.size and result.buffer.is_eof(): + expect_value(v)(result) else: - expect_incomplete()(ctx, result) + expect_incomplete()(result) return expect def expect_failure(): - def expect(ctx, result): + def expect(result: ParseResult): assert result.type is ResultType.FAILURE - assert ctx.avail() == result.context.avail() return expect def expect_incomplete(): - def expect(ctx, result): + def expect(result: ParseResult): assert result.type is ResultType.INCOMPLETE - assert ctx.avail() == result.context.avail() return expect def expect_inc_or_fail(): - def expect(ctx, result): - if ctx.source.is_complete(): + def expect(result: ParseResult): + if not result.buffer.size and result.buffer.is_eof(): assert result.type is ResultType.FAILURE else: assert result.type is ResultType.INCOMPLETE - assert ctx.avail() == result.context.avail() return expect @@ -71,34 +68,34 @@ def parameterify(*tests: Tuple[Parser, List]): ("", expect_inc_or_fail()), ("spa", expect_inc_or_fail()) ]), - (one_of(b"abc"), [ - ("b", expect_value(b"b")), - ("abc", expect_value(b"a", next=b"b")), - ("d", expect_failure()), - ("", expect_inc_or_fail()) - ]), - (value(tag(b"spam"), "eggs"), [ + # (one_of(b"abc"), [ + # ("b", expect_value(b"b")), + # ("abc", expect_value(b"a", next=b"b")), + # ("d", expect_failure()), + # ("", expect_inc_or_fail()) + # ]), + (constant(tag(b"spam"), "eggs"), [ ("spam", expect_value("eggs")), ("spam musubi", expect_value("eggs", next=b" ")), ("beef", expect_failure()) ]), - (delim(tag(b"{"), tag(b" "), tag(b"}")), [ - ("{ }", expect_value(b" ")), - ("{ };", expect_value(b" ", next=b";")), - ("{}", expect_failure()), - ("{", expect_inc_or_fail()), - ("{ ", expect_inc_or_fail()), - ("", expect_inc_or_fail()), - ("[]", expect_failure()), - (" }", expect_failure()), - ("{bad}", expect_failure()) - ]), - (take_while(lambda b: ord(b'a') <= b <= ord(b'c')), [ - ("abc", expect_value_if_done(b"abc")), - ("abc123", expect_value(b"abc", next=b"1")), - ("", expect_value_if_done(b"")), - ("123", expect_value(b"", next=b"1")) - ]), + # (delim(tag(b"{"), tag(b" "), tag(b"}")), [ + # ("{ }", expect_value(b" ")), + # ("{ };", expect_value(b" ", next=b";")), + # ("{}", expect_failure()), + # ("{", expect_inc_or_fail()), + # ("{ ", expect_inc_or_fail()), + # ("", expect_inc_or_fail()), + # ("[]", expect_failure()), + # (" }", expect_failure()), + # ("{bad}", expect_failure()) + # ]), + # (take_while(lambda b: ord(b'a') <= b <= ord(b'c')), [ + # ("abc", expect_value_if_done(b"abc")), + # ("abc123", expect_value(b"abc", next=b"1")), + # ("", expect_value_if_done(b"")), + # ("123", expect_value(b"", next=b"1")) + # ]), (terminated(tag(b"foo"), tag(b";")), [ ("foo;", expect_value(b"foo")), ("foo|", expect_failure()), @@ -127,32 +124,34 @@ def test_rule(rule, data, expect): """ Tests rule against both complete and incomplete data sources. """ - ctx = context_from(data) - result = rule(ctx) - expect(ctx, result) + buffer = buffer_from(data) + result = rule(buffer) + expect(result) - ctx = context_from(data, True) - result = rule(ctx) - expect(ctx, result) + buffer = buffer_from(data, True) + result = rule(buffer) + expect(result) -def context_from(strdata, end=False): - source = DataSource() - source.extend(bytes(strdata, "utf-8")) +def buffer_from(strdata, end=False): + source = SliceableBuffer.empty() + if len(strdata): + source = source.extend(bytes(strdata, "utf-8")) if end: - source.eof() - - return ParserContext(source) - - -def expect_next(next) -> Parser: - def p(c: ParserContext): - if not next: - assert not c.avail() - elif not c.avail(): - return _failure(c) - else: - # bytes() is not necessary but improves failure message - assert next == bytes(c.read(1)) - return _success(c, b"") - return p + source = source.eof() + + return source + + +# def expect_next(next) -> Parser: +# def p(c: ParserContext): +# if not next: +# assert not c.avail() +# elif not c.avail(): +# return _failure(c) +# else: +# # bytes() is not necessary but improves failure message +# assert next == bytes(c.read(1)) +# return _success(c, b"") +# return p +# \ No newline at end of file From 0d5c6566dc0a2b55e428d63af19e47fad5c0b1c9 Mon Sep 17 00:00:00 2001 From: Rob Marrowstone Date: Tue, 23 Apr 2024 10:49:10 -0700 Subject: [PATCH 5/8] Working Parsing of Bools, Null (untyped), and Whitespace Some fairly savage hacking at both the SliceableBuffer and protons abstractions. It works and I _think_ it's the right contracts for the components but not totally sure. I think at a minimum I need to fix the EOF marking so that it is clearly correct for all cases. That means an actual flag. But I don't want to have the parsers or the buffer have to track that, it should just be in the parser. I need to track depth as context anyway, so we can have the parse methods in the reader track that as well. --- amazon/ion/protons.py | 96 ++++++++++++++++------------------ amazon/ion/reader_text2.py | 63 +++++++++++++++++++--- amazon/ion/sliceable_buffer.py | 12 ++--- tests/reader_util.py | 4 ++ tests/test_protons.py | 25 ++++----- tests/test_reader_text.py | 94 ++++++++++++++++++--------------- 6 files changed, 175 insertions(+), 119 deletions(-) diff --git a/amazon/ion/protons.py b/amazon/ion/protons.py index abbc3cf40..adc8a7895 100644 --- a/amazon/ion/protons.py +++ b/amazon/ion/protons.py @@ -2,7 +2,7 @@ from enum import Enum from typing import Callable, Any -from amazon.ion.sliceable_buffer import SliceableBuffer +from amazon.ion.sliceable_buffer import SliceableBuffer, IncompleteReadError """ "protonic" is a parser combinator library for parsing ion and similar document @@ -72,7 +72,7 @@ def p(buffer: SliceableBuffer): else: data = b"" - if data == tag[:avail] and not buffer.is_eof(): + if data == tag[:avail]: return ParseResult(ResultType.INCOMPLETE, buffer) else: return ParseResult(ResultType.FAILURE, buffer) @@ -80,22 +80,23 @@ def p(buffer: SliceableBuffer): return p -# def one_of(items: bytes) -> Parser: -# """ -# Match one of the bytes passed. -# """ -# def p(ctx): -# if not ctx.avail(): -# return _inc_or_fail(ctx) -# -# b = ctx.read(1) -# -# if b[0] in items: -# return _success(ctx.remaining(1), b) -# else: -# return _failure(ctx) -# return p -# +def one_of(items: bytes) -> Parser: + """ + Match one of the bytes passed. + """ + def p(buffer: SliceableBuffer): + if not buffer.size: + result_type = ResultType.FAILURE if buffer.is_eof() else ResultType.INCOMPLETE + return ParseResult(result_type, buffer) + + (b, buffer) = buffer.read_byte() + + if b in items: + return ParseResult(ResultType.SUCCESS, buffer, b) + else: + return ParseResult(ResultType.FAILURE, buffer) + return p + def terminated(item: Parser, terminal: Parser) -> Parser: """ @@ -140,37 +141,25 @@ def p(buffer: SliceableBuffer): # return p -# def take_while(pred) -> Parser: -# """ -# pred is fed one byte at a time. Why is it written this way? -# I don't want to add the complexity in surface area to generalize -# it to take a parser _and_ doing so would mean i'd need to combine -# the results in a general way, _and_ ensure the parser made progress... -# so this is the simplest and more performant thing i can think to do -# while still having some generality. -# -# if the data ends before pred returns False _and_ there may be more -# then it will be Incomplete. -# -# pred can signal an error by raising ParserError. -# """ -# def p(buffer: ParserContext): -# initial_ctx = ctx -# n = 0 -# -# while ctx.avail(): -# result = pred(ctx.read(1)[0]) -# if not result: -# return _success(ctx, initial_ctx.read(n)) -# -# ctx = ctx.remaining(1) -# n += 1 -# -# if ctx.source.is_complete(): -# return _success(ctx, initial_ctx.read(n)) -# else: -# return _incomplete(initial_ctx) -# return p +def take_while(pred) -> Parser: + """ + pred is fed one byte at a time. Why is it written this way? + I don't want to add the complexity in surface area to generalize + it to take a parser _and_ doing so would mean i'd need to combine + the results in a general way, _and_ ensure the parser made progress... + so this is the simplest and more performant thing i can think to do + while still having some generality. + + if the data ends before pred returns False _and_ there may be more + then it will be Incomplete. + + pred can signal an error by raising ParserError. + """ + def p(buffer: SliceableBuffer): + (data, buffer) = buffer.read_while(pred) + return ParseResult(ResultType.SUCCESS, buffer, data) + + return p def constant(parser: Parser, constant: Any) -> Parser: @@ -242,9 +231,14 @@ def p(buffer: SliceableBuffer): prior = prefix(buffer) if prior.type is not ResultType.SUCCESS: - return prior + return ParseResult(prior.type, buffer) + + value = item(prior.buffer) + if value.type is not ResultType.SUCCESS: + return ParseResult(value.type, buffer) + else: + return value - return item(prior.buffer) return p diff --git a/amazon/ion/reader_text2.py b/amazon/ion/reader_text2.py index eafca3b9e..ebb4981e9 100644 --- a/amazon/ion/reader_text2.py +++ b/amazon/ion/reader_text2.py @@ -16,7 +16,8 @@ from typing import Optional, NamedTuple, Callable, Tuple from amazon.ion.core import IonType, IonEvent, \ - IonEventType + IonEventType, ION_STREAM_INCOMPLETE_EVENT, ION_STREAM_END_EVENT +from amazon.ion.protons import * from amazon.ion.reader import ReadEventType from amazon.ion.sliceable_buffer import SliceableBuffer from amazon.ion.util import coroutine @@ -26,9 +27,45 @@ def _whitespace(byte): return byte in bytearray(b" \n\t\r\f") +_stop = peek(one_of(b" \n\t\r\f{}[](),")) + + +def tag_stop(tag_value): + return terminated(tag(tag_value), _stop) + + +def is_empty(buffer: SliceableBuffer): + if buffer.size: + return ParseResult(ResultType.FAILURE, buffer, None) + else: + return ParseResult(ResultType.SUCCESS, buffer, None) + + +_tlv_parsec = preceded( + take_while(_whitespace), + alt( + # constant(is_empty, ION_STREAM_END_EVENT), + constant(tag_stop(b"nan"), IonEvent(IonEventType.SCALAR, IonType.FLOAT, "Nan")), + constant(tag_stop(b"null"), IonEvent(IonEventType.SCALAR, IonType.NULL, None)), + constant(tag_stop(b"true"), IonEvent(IonEventType.SCALAR, IonType.BOOL, True)), + constant(tag_stop(b"false"), IonEvent(IonEventType.SCALAR, IonType.BOOL, False)), + constant(tag(b"{"), IonEvent(IonEventType.CONTAINER_START, IonType.STRUCT)), + constant(tag(b"["), IonEvent(IonEventType.CONTAINER_START, IonType.LIST)))) + +_stream_end = preceded(take_while(_whitespace), is_empty) + def _tlv_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: - pass + result = _tlv_parsec(buffer) + if result.type is ResultType.SUCCESS: + return result.value.derive_depth(0), result.buffer + if result.type is ResultType.INCOMPLETE: + stream_end = _stream_end(buffer) + if stream_end.type is ResultType.SUCCESS: + return ION_STREAM_END_EVENT, stream_end.buffer + + return ION_STREAM_INCOMPLETE_EVENT, buffer + raise ValueError("parse failed on _____") def _list_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: pass @@ -47,6 +84,7 @@ def _sexp_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: _struct_parser, ] + class _ContextFrame(NamedTuple): parser: Callable[[SliceableBuffer], Tuple[IonEvent, SliceableBuffer]] ion_type: Optional[IonType] @@ -54,7 +92,7 @@ class _ContextFrame(NamedTuple): @coroutine -def stream_handler(): +def text_stream_handler(): """ Handler for an Ion Text value-stream. """ @@ -63,6 +101,7 @@ def stream_handler(): ion_event = None skip_or_next = ReadEventType.NEXT expect_data = False + incomplete = False while True: read_event = yield ion_event @@ -71,8 +110,13 @@ def stream_handler(): # part 1: handle user's read event if expect_data: if read_event.type is not ReadEventType.DATA: - raise TypeError("Data expected") - buffer = buffer.extend(read_event.data) + # flush it + if incomplete: + buffer = buffer.extend(b"\t") + else: + raise TypeError("Data expected") + else: + buffer = buffer.extend(read_event.data) else: if read_event.type is ReadEventType.DATA: raise TypeError("Next or Skip expected") @@ -85,14 +129,17 @@ def stream_handler(): (parser, ctx_type, depth) = context_stack[-1] (ion_event, buffer) = parser(buffer) - event_type = ion_event.type + + event_type = ion_event.event_type ion_type = ion_event.ion_type + expect_data = False + incomplete = False if event_type is IonEventType.STREAM_END: expect_data = True elif event_type is IonEventType.INCOMPLETE: - # todo: flushable/commit or something something - raise NotImplementedError("Incomplete is not supported") + expect_data = True + incomplete = True elif event_type is IonEventType.CONTAINER_START: parser = _container_parsers[ion_type - IonType.LIST] context_stack.append(_ContextFrame(parser, ion_type, depth + 1)) diff --git a/amazon/ion/sliceable_buffer.py b/amazon/ion/sliceable_buffer.py index 7fef693a9..018b74e9b 100644 --- a/amazon/ion/sliceable_buffer.py +++ b/amazon/ion/sliceable_buffer.py @@ -129,28 +129,30 @@ def read_slice(self, n): def read_while(self, pred): """ Read bytes while pred(byte) is True, return (bytes, new buffer). - - Raise IncompleteReadError if pred never returns False. """ size = self.size chunks = self._chunks offset = self._offset + if not size: + return b"", self + # short-circuit when we can serve full read from first chunk # optimizes for common case and simplifies accumulation loop (chunk, length) = chunks[0] for cursor in range(offset, length): if not pred(chunk[cursor]): # drop chunk if fully consumed - if cursor + 1 == length: + if cursor == length: return chunk[offset:], SliceableBuffer(chunks[1:], 0, size - (length - offset)) else: - return chunk[offset:cursor], SliceableBuffer(chunks, cursor + 1, size - (1 + cursor - offset)) + return chunk[offset:cursor], SliceableBuffer(chunks, cursor, size - (cursor - offset)) count = length - offset slices = [_ChunkPair(chunk[offset:], count)] # i is used to init the new buffer after the loop + # todo: this probably has some off-by-one issues i = 1 for (i, pair) in enumerate(chunks[1:], start=1): (chunk, length) = pair @@ -170,8 +172,6 @@ def read_while(self, pred): slices.append(pair) continue break - else: - raise IncompleteReadError("pred never returned False") combined = bytearray(count) offset = 0 diff --git a/tests/reader_util.py b/tests/reader_util.py index 757e71675..e0cb6dc0b 100644 --- a/tests/reader_util.py +++ b/tests/reader_util.py @@ -55,11 +55,15 @@ def reader_scaffold(reader, event_pairs): input_events = (e for e, _ in event_pairs) output_events = add_depths(e for _, e in event_pairs) for read_event, expected in zip(input_events, output_events): + print(f"sending: {read_event}") if is_exception(expected): with raises(expected): reader.send(read_event).value # Forces evaluation of all value thunks. else: + print(f"expecting: {expected}") actual = reader.send(read_event) + print(f"received: {actual}") + assert expected == actual diff --git a/tests/test_protons.py b/tests/test_protons.py index df07421e0..f5aec7736 100644 --- a/tests/test_protons.py +++ b/tests/test_protons.py @@ -68,12 +68,12 @@ def parameterify(*tests: Tuple[Parser, List]): ("", expect_inc_or_fail()), ("spa", expect_inc_or_fail()) ]), - # (one_of(b"abc"), [ - # ("b", expect_value(b"b")), - # ("abc", expect_value(b"a", next=b"b")), - # ("d", expect_failure()), - # ("", expect_inc_or_fail()) - # ]), + (one_of(b"abc"), [ + ("b", expect_value(b'b'[0])), + ("abc", expect_value(b"a"[0], next=b"b")), + ("d", expect_failure()), + ("", expect_inc_or_fail()) + ]), (constant(tag(b"spam"), "eggs"), [ ("spam", expect_value("eggs")), ("spam musubi", expect_value("eggs", next=b" ")), @@ -90,12 +90,13 @@ def parameterify(*tests: Tuple[Parser, List]): # (" }", expect_failure()), # ("{bad}", expect_failure()) # ]), - # (take_while(lambda b: ord(b'a') <= b <= ord(b'c')), [ - # ("abc", expect_value_if_done(b"abc")), - # ("abc123", expect_value(b"abc", next=b"1")), - # ("", expect_value_if_done(b"")), - # ("123", expect_value(b"", next=b"1")) - # ]), + (take_while(lambda b: ord(b'a') <= b <= ord(b'c')), [ + ("abc", expect_value_if_done(b"abc")), + ("abc123", expect_value(b"abc", next=b"1")), + ("", expect_incomplete()), + ("123", expect_value(b"", next=b"1")) + ]), + # todo: peek (terminated(tag(b"foo"), tag(b";")), [ ("foo;", expect_value(b"foo")), ("foo|", expect_failure()), diff --git a/tests/test_reader_text.py b/tests/test_reader_text.py index f21ac1def..9c819602f 100644 --- a/tests/test_reader_text.py +++ b/tests/test_reader_text.py @@ -19,6 +19,7 @@ from amazon.ion.exceptions import IonException from amazon.ion.reader import ReadEventType from amazon.ion.reader_text import reader, _POS_INF, _NEG_INF, _NAN +from amazon.ion.reader_text2 import text_stream_handler from amazon.ion.symbols import SymbolToken from amazon.ion.util import coroutine from tests import listify, parametrize @@ -1227,48 +1228,57 @@ def _paired_params(params, desc, top_level=True): _good_unicode_params = partial(_basic_params, _end, 'GOOD - UNICODE', u'') +TEXT2_GOOD = ( + (b'null ', e_null()), + (b'false ', e_bool(False)), + (b'falsey', e_symbol("falsey")), + (b'true ', e_bool(True))) + @parametrize(*chain( - _good_params(_GOOD), - _bad_grammar_params(_BAD_GRAMMAR), - _bad_value_params(_BAD_VALUE), - _incomplete_params(_INCOMPLETE), - _good_unicode_params(_GOOD_UNICODE), - _good_unicode_params(_GOOD_ESCAPES_FROM_UNICODE), - _good_params(_GOOD_ESCAPES_FROM_BYTES), - _bad_unicode_params(_BAD_UNICODE), - _bad_unicode_params(_BAD_ESCAPES_FROM_UNICODE), - _bad_grammar_params(_BAD_ESCAPES_FROM_BYTES), - _paired_params(_INCOMPLETE_ESCAPES, 'INCOMPLETE ESCAPES'), - _good_params(_UNSPACED_SEXPS), - _paired_params(_SKIP, 'SKIP'), - _paired_params(_GOOD_FLUSH, 'GOOD FLUSH'), - _paired_params(_BAD_FLUSH, 'BAD FLUSH'), - # All top-level values as individual data events, space-delimited. - _top_level_value_params(), - # All top-level values as one data event, space-delimited. - all_top_level_as_one_stream_params(_scalar_iter, (b' ', False)), - # All top-level values as one data event, block comment-delimited. - all_top_level_as_one_stream_params(_scalar_iter, (b'/*foo*/', False)), - # All top-level values as one data event, line comment-delimited. - all_top_level_as_one_stream_params(_scalar_iter, (b'//foo\n', False)), - # All annotated top-level values, space-delimited. - _annotate_params(_top_level_value_params(is_delegate=True)), - # All annotated top-level values, comment-delimited. - _annotate_params(_top_level_value_params(b'//foo\n/*bar*/', is_delegate=True)), - _annotate_params(_good_params(_UNSPACED_SEXPS, is_delegate=True)), - # All values, each as the only value within a container. - _containerize_params(_scalar_params()), - _containerize_params(_containerize_params(_scalar_params(), is_delegate=True, top_level=False), with_skip=False), - # All values, annotated, each as the only value within a container. - _containerize_params(_annotate_params(_scalar_params(), is_delegate=True)), - # All values within a single container. - _containerize_params(_all_scalars_in_one_container_params()), - # Annotated containers. - _containerize_params(_annotate_params(_all_scalars_in_one_container_params(), is_delegate=True)), - # All unspaced sexps, annotated, in containers. - _containerize_params(_annotate_params(_incomplete_params( - _UNSPACED_SEXPS, is_delegate=True, top_level=False), is_delegate=True - )), +# _good_params(_GOOD), +# _bad_grammar_params(_BAD_GRAMMAR), +# _bad_value_params(_BAD_VALUE), +# _incomplete_params(_INCOMPLETE), +# _good_unicode_params(_GOOD_UNICODE), +# _good_unicode_params(_GOOD_ESCAPES_FROM_UNICODE), +# _good_params(_GOOD_ESCAPES_FROM_BYTES), +# _bad_unicode_params(_BAD_UNICODE), +# _bad_unicode_params(_BAD_ESCAPES_FROM_UNICODE), +# _bad_grammar_params(_BAD_ESCAPES_FROM_BYTES), +# _paired_params(_INCOMPLETE_ESCAPES, 'INCOMPLETE ESCAPES'), +# _good_params(_UNSPACED_SEXPS), +# _paired_params(_SKIP, 'SKIP'), +# _paired_params(_GOOD_FLUSH, 'GOOD FLUSH'), +# _paired_params(_BAD_FLUSH, 'BAD FLUSH'), +# # All top-level values as individual data events, space-delimited. +# _top_level_value_params(), +# # All top-level values as one data event, space-delimited. +# all_top_level_as_one_stream_params(_scalar_iter, (b' ', False)), +# # All top-level values as one data event, block comment-delimited. +# all_top_level_as_one_stream_params(_scalar_iter, (b'/*foo*/', False)), +# # All top-level values as one data event, line comment-delimited. +# all_top_level_as_one_stream_params(_scalar_iter, (b'//foo\n', False)), +# # All annotated top-level values, space-delimited. +# _annotate_params(_top_level_value_params(is_delegate=True)), +# # All annotated top-level values, comment-delimited. +# _annotate_params(_top_level_value_params(b'//foo\n/*bar*/', is_delegate=True)), +# _annotate_params(_good_params(_UNSPACED_SEXPS, is_delegate=True)), +# # All values, each as the only value within a container. +# _containerize_params(_scalar_params()), +# _containerize_params(_containerize_params(_scalar_params(), is_delegate=True, top_level=False), with_skip=False), +# # All values, annotated, each as the only value within a container. +# _containerize_params(_annotate_params(_scalar_params(), is_delegate=True)), +# # All values within a single container. +# _containerize_params(_all_scalars_in_one_container_params()), +# # Annotated containers. +# _containerize_params(_annotate_params(_all_scalars_in_one_container_params(), is_delegate=True)), +# # All unspaced sexps, annotated, in containers. +# _containerize_params(_annotate_params(_incomplete_params( +# _UNSPACED_SEXPS, is_delegate=True, top_level=False), is_delegate=True +# )), + _good_params(TEXT2_GOOD), + _paired_params(_GOOD_FLUSH, 'GOOD FLUSH') )) def test_raw_reader(p): - reader_scaffold(reader(is_unicode=p.is_unicode), p.event_pairs) + #reader_scaffold(reader(is_unicode=p.is_unicode), p.event_pairs) + reader_scaffold(text_stream_handler(), p.event_pairs) From 3cc2b7cc5966aabfd58b4d7805910952d0c3517f Mon Sep 17 00:00:00 2001 From: Rob Marrowstone Date: Wed, 24 Apr 2024 14:22:40 -0700 Subject: [PATCH 6/8] More than half of 88 basic "Good" tests passing Pretty savage hacking at SliceableBuffer and protons abstractions. Many todos, but have basics working and some momentum. Known todos: * Annotations * Comments * Underscores in Ints * Decimals and Floats * Timestamps * Long quoted strings * Blobs/Clobs * Operator parsing in Sexps * Typed nulls --- amazon/ion/protons.py | 147 ++++++++++---------------- amazon/ion/reader_text2.py | 185 ++++++++++++++++++++++++++------- amazon/ion/sliceable_buffer.py | 86 +++++---------- tests/reader_util.py | 1 + tests/test_reader_text.py | 14 ++- 5 files changed, 237 insertions(+), 196 deletions(-) diff --git a/amazon/ion/protons.py b/amazon/ion/protons.py index adc8a7895..39a25caaa 100644 --- a/amazon/ion/protons.py +++ b/amazon/ion/protons.py @@ -52,8 +52,6 @@ class ParseResult: def tag(tag: bytes) -> Parser: """ Match a sequence of bytes, a "tag". - - """ length = len(tag) @@ -86,8 +84,7 @@ def one_of(items: bytes) -> Parser: """ def p(buffer: SliceableBuffer): if not buffer.size: - result_type = ResultType.FAILURE if buffer.is_eof() else ResultType.INCOMPLETE - return ParseResult(result_type, buffer) + return ParseResult(ResultType.INCOMPLETE, buffer) (b, buffer) = buffer.read_byte() @@ -115,30 +112,43 @@ def p(buffer: SliceableBuffer): return ParseResult(ResultType.SUCCESS, term.buffer, body.value) return p +def debug(parser: Parser) -> Parser: + """ + prints the result of the parser to stdout. + """ + def p(buffer: SliceableBuffer): + print(f"input {buffer}") + result = parser(buffer) + print(f"result {result}") + if type(result.value) is memoryview: + print(f"buffer {bytes(result.value)}") + return result + return p + + +def delim(start: Parser, value: Parser, end: Parser) -> Parser: + """ + result of value is returned + + if start matches. + + this doesn't do any checks about incomplete vs done itself: it delegates that to it's component parsers + """ + def p(buffer: SliceableBuffer): + started = start(buffer) + if started.type is not ResultType.SUCCESS: + return ParseResult(started.type, buffer) -# def delim(start: Parser, value: Parser, end: Parser) -> Parser: -# """ -# result of value is returned -# -# if start matches. -# -# this doesn't do any checks about incomplete vs done itself: it delegates that to it's component parsers -# """ -# def p(ctx): -# started = start(ctx) -# if started.type is not ResultType.SUCCESS: -# return ParseResult(started.type, ctx) -# -# body = value(started.context) -# if body.type is not ResultType.SUCCESS: -# return ParseResult(body.type, ctx) -# -# ended = end(body.context) -# if ended.type is not ResultType.SUCCESS: -# return ParseResult(ended.type, ctx) -# -# return _success(ended.context, body.value) -# return p + body = value(started.buffer) + if body.type is not ResultType.SUCCESS: + return ParseResult(body.type, buffer) + + ended = end(body.buffer) + if ended.type is not ResultType.SUCCESS: + return ParseResult(ended.type, buffer) + + return ParseResult(ResultType.SUCCESS, ended.buffer, body.value) + return p def take_while(pred) -> Parser: @@ -202,25 +212,6 @@ def p(buffer: SliceableBuffer): return p -def is_eof() -> Parser: - """ - Results in the Successful None value when source is exhausted and marked EOF. - - Incomplete when source is exhausted but not EOF. - - Failure when there is data. - """ - def p(buffer: SliceableBuffer): - if not buffer.size: - if buffer.is_eof(): - return ParseResult(ResultType.SUCCESS, buffer) - else: - return ParseResult(ResultType.INCOMPLETE, buffer) - else: - return ParseResult(ResultType.FAILURE, buffer) - return p - - def preceded(prefix: Parser, item: Parser) -> Parser: """ checks that the prefix matches then returns item if it matches. @@ -242,42 +233,22 @@ def p(buffer: SliceableBuffer): return p -# def succeed(value=None) -> Parser: -# """ -# Always succeeds. Context will be returned as-is, value will be value -# """ -# return lambda ctx: _success(ctx, value) -# -# -# def fail() -> Parser: -# """ -# Always fails. Context will be returned as-is. -# -# todo: consider adding message -# """ -# return lambda ctx: _failure(ctx) -# -# -# def error(message: str) -> Parser: -# """ -# Raises a ParseError with message -# """ -# def p(ctx): -# raise ParseError(message) -# return p - - -#def ignore(parser: Parser) -> Parser: -# """ -# If the parser succeeds, throw away any value but keep the context. -# """ -# def p(ctx): -# result = parser(ctx) -# if result.type is ResultType.SUCCESS: -# return _success(result.context, None) -# else: -# return result -# return p +def pair(left: Parser, right: Parser) -> Parser: + """ + returns a tuple of the left and right values + """ + def p(buffer: SliceableBuffer): + l = left(buffer) + if l.type is not ResultType.SUCCESS: + return ParseResult(l.type, buffer) + + r = right(l.buffer) + if r.type is not ResultType.SUCCESS: + return ParseResult(r.type, buffer) + + return ParseResult(ResultType.SUCCESS, r.buffer, (l.value, r.value)) + + return p def peek(parser: Parser) -> Parser: @@ -288,15 +259,3 @@ def p(buffer: SliceableBuffer): result = parser(buffer) return ParseResult(result.type, buffer, result.value) return p - - -def _incomplete(buffer: SliceableBuffer) -> ParseResult: - raise NotImplementedError("todo") - - -def _failure(buffer: SliceableBuffer) -> ParseResult: - raise NotImplementedError("todo") - - -def _success(buffer: SliceableBuffer, value: Any) -> ParseResult: - raise NotImplementedError("todo") diff --git a/amazon/ion/reader_text2.py b/amazon/ion/reader_text2.py index ebb4981e9..25f0e9211 100644 --- a/amazon/ion/reader_text2.py +++ b/amazon/ion/reader_text2.py @@ -16,18 +16,19 @@ from typing import Optional, NamedTuple, Callable, Tuple from amazon.ion.core import IonType, IonEvent, \ - IonEventType, ION_STREAM_INCOMPLETE_EVENT, ION_STREAM_END_EVENT + IonEventType, ION_STREAM_INCOMPLETE_EVENT, ION_STREAM_END_EVENT, IonThunkEvent +from amazon.ion.exceptions import IonException from amazon.ion.protons import * from amazon.ion.reader import ReadEventType from amazon.ion.sliceable_buffer import SliceableBuffer +from amazon.ion.symbols import SymbolToken from amazon.ion.util import coroutine - def _whitespace(byte): return byte in bytearray(b" \n\t\r\f") -_stop = peek(one_of(b" \n\t\r\f{}[](),")) +_stop = peek(one_of(b" \n\t\r\f{}[](),\"'")) def tag_stop(tag_value): @@ -40,42 +41,144 @@ def is_empty(buffer: SliceableBuffer): else: return ParseResult(ResultType.SUCCESS, buffer, None) - -_tlv_parsec = preceded( - take_while(_whitespace), +# todo: not nan, null, true or false for field keys and annotations +_identifier_symbol = terminated( + preceded( + peek(one_of(b"$_abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")), + take_while(lambda b: b in bytearray(b"$_abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"))), + _stop) + +_quoted_symbol = delim( + tag(b"'"), + take_while(lambda b: b != ord(b"'")), + tag(b"'")) + +_quoted_string = delim( + tag(b'"'), + take_while(lambda b: b != ord(b'"')), + tag(b'"')) + +# todo: negative numbers +_integer = terminated( alt( + tag(b"0"), + preceded( + peek(one_of(b"123456789")), + take_while(lambda b: b in bytearray(b"0123456789")))), + _stop) + +_tlv_parsec = alt( # constant(is_empty, ION_STREAM_END_EVENT), - constant(tag_stop(b"nan"), IonEvent(IonEventType.SCALAR, IonType.FLOAT, "Nan")), + constant(tag_stop(b"nan"), IonEvent(IonEventType.SCALAR, IonType.FLOAT, float("nan"))), + constant(tag_stop(b"+inf"), IonEvent(IonEventType.SCALAR, IonType.FLOAT, float("+inf"))), + constant(tag_stop(b"-inf"), IonEvent(IonEventType.SCALAR, IonType.FLOAT, float("-inf"))), + constant(tag_stop(b"null"), IonEvent(IonEventType.SCALAR, IonType.NULL, None)), constant(tag_stop(b"true"), IonEvent(IonEventType.SCALAR, IonType.BOOL, True)), constant(tag_stop(b"false"), IonEvent(IonEventType.SCALAR, IonType.BOOL, False)), + + map_value(_integer, lambda v: IonThunkEvent(IonEventType.SCALAR, IonType.INT, lambda: int(bytes(v)))), + + + # must rule out annotation before doing symbol values, uggh. + + # must come after the above + map_value(alt(_identifier_symbol, _quoted_symbol), + lambda v: IonThunkEvent(IonEventType.SCALAR, IonType.SYMBOL, lambda: SymbolToken(bytes(v).decode('utf-8'), None))), + + map_value(_quoted_string, + lambda v: IonThunkEvent(IonEventType.SCALAR, IonType.STRING, + lambda: bytes(v).decode('utf-8'), None)), + constant(tag(b"{"), IonEvent(IonEventType.CONTAINER_START, IonType.STRUCT)), - constant(tag(b"["), IonEvent(IonEventType.CONTAINER_START, IonType.LIST)))) + constant(tag(b"["), IonEvent(IonEventType.CONTAINER_START, IonType.LIST)), + constant(tag(b"("), IonEvent(IonEventType.CONTAINER_START, IonType.SEXP)), -_stream_end = preceded(take_while(_whitespace), is_empty) + constant(tag(b"}"), IonEvent(IonEventType.CONTAINER_END, IonType.STRUCT)), + constant(tag(b"]"), IonEvent(IonEventType.CONTAINER_END, IonType.LIST)), + constant(tag(b")"), IonEvent(IonEventType.CONTAINER_END, IonType.SEXP)), +) -def _tlv_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: - result = _tlv_parsec(buffer) +_take_while_whitespace = take_while(_whitespace) + +def _trim_whitespace(buffer: SliceableBuffer): + result = _take_while_whitespace(buffer) + return result.buffer + + +def _map_result(result: ParseResult, buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: if result.type is ResultType.SUCCESS: - return result.value.derive_depth(0), result.buffer + return result.value, result.buffer if result.type is ResultType.INCOMPLETE: - stream_end = _stream_end(buffer) - if stream_end.type is ResultType.SUCCESS: - return ION_STREAM_END_EVENT, stream_end.buffer + if buffer.size: + return ION_STREAM_INCOMPLETE_EVENT, buffer + else: + return ION_STREAM_END_EVENT, buffer + if result.type is ResultType.FAILURE: + raise IonException("Parse failed on _____") - return ION_STREAM_INCOMPLETE_EVENT, buffer +def _tlv_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: + buffer = _trim_whitespace(buffer) + result = _tlv_parsec(buffer) - raise ValueError("parse failed on _____") + return _map_result(result, buffer) -def _list_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: - pass +_list_parsec = alt( + constant(tag(b"]"), IonEvent(IonEventType.CONTAINER_END, IonType.LIST)), + terminated( + preceded(_take_while_whitespace, _tlv_parsec), + preceded( + _take_while_whitespace, + alt(tag(b','), peek(tag(b']')))))) + +def _list_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: + buffer = _trim_whitespace(buffer) + result = _list_parsec(buffer) + + return _map_result(result, buffer) + +_field_name = alt( + _identifier_symbol, + _quoted_symbol, + # _quoted_string, + # todo: long quoted string, because why!?!?!? +) + +_struct_parsec = alt( + constant(tag(b"}"), IonEvent(IonEventType.CONTAINER_END, IonType.STRUCT)), + map_value( + terminated( + pair( + _field_name, + preceded( + preceded(_take_while_whitespace, tag(b':')), + preceded(_take_while_whitespace, _tlv_parsec))), + preceded(_take_while_whitespace, alt(tag(b','), peek(tag(b'}'))))), + # todo: lazy field name + lambda pair: pair[1].derive_field_name(SymbolToken(bytes(pair[0]).decode("utf-8"), None)))) def _struct_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: - pass + buffer = _trim_whitespace(buffer) + result = _struct_parsec(buffer) + + return _map_result(result, buffer) + +# todo this is far from complete +_operators = b"!#%&*+-./;<=>?@^`|~" +_whitespace_or_operator = alt(one_of(b" "), one_of(_operators)) + +_sexp_parsec = alt( + constant(tag(b")"), IonEvent(IonEventType.CONTAINER_END, IonType.SEXP)), + terminated( + preceded(_take_while_whitespace, _tlv_parsec), + alt(_whitespace_or_operator, peek(tag(b')'))))) def _sexp_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: - raise NotImplementedError("todo") + buffer = _trim_whitespace(buffer) + result = _sexp_parsec(buffer) + + return _map_result(result, buffer) _container_parsers = [ @@ -112,11 +215,15 @@ def text_stream_handler(): if read_event.type is not ReadEventType.DATA: # flush it if incomplete: + # todo: a cleaner way to do this is to set a flush flag + # and pass it to the parser as context... buffer = buffer.extend(b"\t") else: raise TypeError("Data expected") else: - buffer = buffer.extend(read_event.data) + # todo: this doesn't seem great. + data = read_event.data.encode("utf-8") if type(read_event.data) is str else read_event.data + buffer = buffer.extend(data) else: if read_event.type is ReadEventType.DATA: raise TypeError("Next or Skip expected") @@ -127,23 +234,27 @@ def text_stream_handler(): # part 2: do some lexxing (parser, ctx_type, depth) = context_stack[-1] - (ion_event, buffer) = parser(buffer) + # part 3: mutate state event_type = ion_event.event_type - ion_type = ion_event.ion_type - expect_data = False - incomplete = False - if event_type is IonEventType.STREAM_END: + if event_type.is_stream_signal: expect_data = True - elif event_type is IonEventType.INCOMPLETE: - expect_data = True - incomplete = True - elif event_type is IonEventType.CONTAINER_START: - parser = _container_parsers[ion_type - IonType.LIST] - context_stack.append(_ContextFrame(parser, ion_type, depth + 1)) - elif event_type is IonEventType.CONTAINER_END: - assert ion_type is ctx_type - assert depth > 0 - context_stack.pop() + incomplete = event_type is IonEventType.INCOMPLETE + else: + expect_data = False + incomplete = False + ion_type = ion_event.ion_type + + if event_type is IonEventType.CONTAINER_START: + parser = _container_parsers[ion_type - IonType.LIST] + context_stack.append(_ContextFrame(parser, ion_type, depth + 1)) + elif event_type is IonEventType.CONTAINER_END: + assert ion_type is ctx_type + assert depth > 0 + depth -= 1 + context_stack.pop() + + ion_event = ion_event.derive_depth(depth) + diff --git a/amazon/ion/sliceable_buffer.py b/amazon/ion/sliceable_buffer.py index 018b74e9b..409fe8f4b 100644 --- a/amazon/ion/sliceable_buffer.py +++ b/amazon/ion/sliceable_buffer.py @@ -23,7 +23,7 @@ def empty(): """ return SliceableBuffer([]) - def __init__(self, chunks, offset=0, size=0, eof=False): + def __init__(self, chunks, offset=0, size=0): """ *Class internal usage only.* @@ -35,7 +35,6 @@ def __init__(self, chunks, offset=0, size=0, eof=False): # on each read or skip. self._offset = offset self.size = size - self._eof = eof def extend(self, chunk): """ @@ -137,49 +136,26 @@ def read_while(self, pred): if not size: return b"", self - # short-circuit when we can serve full read from first chunk - # optimizes for common case and simplifies accumulation loop - (chunk, length) = chunks[0] - for cursor in range(offset, length): - if not pred(chunk[cursor]): - # drop chunk if fully consumed - if cursor == length: - return chunk[offset:], SliceableBuffer(chunks[1:], 0, size - (length - offset)) - else: - return chunk[offset:cursor], SliceableBuffer(chunks, cursor, size - (cursor - offset)) - - count = length - offset - slices = [_ChunkPair(chunk[offset:], count)] - - # i is used to init the new buffer after the loop - # todo: this probably has some off-by-one issues - i = 1 - for (i, pair) in enumerate(chunks[1:], start=1): - (chunk, length) = pair - for cursor in range(0, length): + n = 0 + end = False + for (chunk, length) in chunks: + for cursor in range(offset, length): if not pred(chunk[cursor]): - count += cursor + 1 - # drop chunk if fully consumed - if cursor + 1 == length: - i += 1 - cursor = 0 - slices.append(pair) - else: - slices.append(_ChunkPair(chunk[:cursor], cursor + 1)) + end = True + n += cursor - offset break else: - count += length - slices.append(pair) - continue - break + n += length - offset - combined = bytearray(count) - offset = 0 - for (chunk, length) in slices: - combined[offset:offset + length] = chunk - offset += length + if end: + break + + offset = 0 - return memoryview(combined), SliceableBuffer(chunks[i:], cursor + 1, size - count) + if n == 0: + return b"", self + else: + return self.read_slice(n) def peek(self, n): """ @@ -187,22 +163,7 @@ def peek(self, n): Raise IncompleteReadError if the buffer is n > size. """ - size = self.size - chunks = self._chunks - offset = self._offset - - if size < n: - raise IncompleteReadError(f'Buffer has size {size}, but {n} bytes were requested!') - - # short-circuit when we can serve full peek from first chunk - # optimizes for common case and simplifies accumulation loop - (chunk, length) = chunks[0] - if n < length: - return chunk[offset:offset + n] - elif n == length: - return chunk[offset:] - - raise NotImplementedError("no combiney yet!") + return self.read_slice(n)[0] def skip(self, n): """ @@ -241,12 +202,13 @@ def __len__(self): """ return self.size - def is_eof(self): - return self._eof - - def eof(self): - return SliceableBuffer(self._chunks, self._offset, self.size, True) - + def __repr__(self): + if self.size == 0: + return "SliceableBuffer(size=0, data=[])" + elif self.size < 5: + return f"SliceableBuffer(size={self.size}, data=[{bytes(self.peek(self.size))}])" + else: + return f"SliceableBuffer(size={self.size}, data=[{bytes(self.peek(5))}...])" class IncompleteReadError(IndexError): pass diff --git a/tests/reader_util.py b/tests/reader_util.py index e0cb6dc0b..6d7e47c75 100644 --- a/tests/reader_util.py +++ b/tests/reader_util.py @@ -62,6 +62,7 @@ def reader_scaffold(reader, event_pairs): else: print(f"expecting: {expected}") actual = reader.send(read_event) + actual.value # Forces evaluation of all value thunks. print(f"received: {actual}") assert expected == actual diff --git a/tests/test_reader_text.py b/tests/test_reader_text.py index 9c819602f..e68bd1383 100644 --- a/tests/test_reader_text.py +++ b/tests/test_reader_text.py @@ -1230,12 +1230,20 @@ def _paired_params(params, desc, top_level=True): TEXT2_GOOD = ( (b'null ', e_null()), + (b' null ', e_null()), (b'false ', e_bool(False)), - (b'falsey', e_symbol("falsey")), - (b'true ', e_bool(True))) + (b'"foo"', e_string("foo")), + (b'falsey ', e_symbol(SymbolToken("falsey", None))), + (b'spam{}', e_symbol(SymbolToken("spam", None)), e_start_struct(), e_end_struct()), + (b'true ', e_bool(True)), + (b'[]', e_start_list(), e_end_list()), + (b'{}', e_start_struct(), e_end_struct()), + (b'[null, false , ]', e_start_list(), e_null(), e_bool(False), e_end_list()), + (b"{ 'foo' : bar }", e_start_struct(), e_symbol(_st("bar"), field_name=_st(u'foo')), e_end_struct()), +) @parametrize(*chain( -# _good_params(_GOOD), + _good_params(_GOOD), # _bad_grammar_params(_BAD_GRAMMAR), # _bad_value_params(_BAD_VALUE), # _incomplete_params(_INCOMPLETE), From c870df9175b32fb4d74c0e35656ba9520ba23eac Mon Sep 17 00:00:00 2001 From: Rob Marrowstone Date: Fri, 26 Apr 2024 09:42:19 -0700 Subject: [PATCH 7/8] Cleaned up protons * Added back eof() as it just makes completeness handling clearer * Fixed protons I'd broken * Added and uncommented tests * Made sure reader_text2 works with changes --- amazon/ion/protons.py | 126 ++++++++++++++++++++++++++------- amazon/ion/reader_text2.py | 59 +++++++++------ amazon/ion/sliceable_buffer.py | 34 ++++++--- tests/test_protons.py | 40 ++++++----- 4 files changed, 187 insertions(+), 72 deletions(-) diff --git a/amazon/ion/protons.py b/amazon/ion/protons.py index 39a25caaa..5f9a0b113 100644 --- a/amazon/ion/protons.py +++ b/amazon/ion/protons.py @@ -2,7 +2,7 @@ from enum import Enum from typing import Callable, Any -from amazon.ion.sliceable_buffer import SliceableBuffer, IncompleteReadError +from amazon.ion.sliceable_buffer import SliceableBuffer """ "protonic" is a parser combinator library for parsing ion and similar document @@ -15,11 +15,11 @@ The goals: - Keep it simple: avoid unnecessary indirection, keep scope small, avoid fancy ast manipulations. -- Support streaming: receive incremental inputs and produce incremental results +- Support streaming: receive incremental inputs and produce possibly incomplete results - Reasonably performant: avoid data copying, reduce call overhead and reference counting. -- Enable good error messaging. TODO more here! -- Simple to extend: users can write their own functions and combinators. +- Enable good error messaging. TODO: more here! +- Simple to extend: users can easily mix their own functions and combinators. """ @@ -54,24 +54,29 @@ def tag(tag: bytes) -> Parser: Match a sequence of bytes, a "tag". """ length = len(tag) + if not length: + raise ValueError("tag must not be empty") def p(buffer: SliceableBuffer): avail = buffer.size - if avail >= length: - (data, buffer) = buffer.read_slice(length) - if data == tag: - return ParseResult(ResultType.SUCCESS, buffer, tag) - else: + if avail < length: + if buffer.is_eof(): return ParseResult(ResultType.FAILURE, buffer) - if avail: - (data, buffer) = buffer.read_slice(avail) - else: - data = b"" + if avail > 0: + (data, buffer) = buffer.read_slice(avail) + else: + data = b"" - if data == tag[:avail]: - return ParseResult(ResultType.INCOMPLETE, buffer) + if data == tag[:avail]: + return ParseResult(ResultType.INCOMPLETE, buffer) + else: + return ParseResult(ResultType.FAILURE, buffer) + + (data, buffer) = buffer.read_slice(length) + if data == tag: + return ParseResult(ResultType.SUCCESS, buffer, tag) else: return ParseResult(ResultType.FAILURE, buffer) @@ -80,11 +85,14 @@ def p(buffer: SliceableBuffer): def one_of(items: bytes) -> Parser: """ - Match one of the bytes passed. + Match the next byte to one of the bytes passed. """ def p(buffer: SliceableBuffer): if not buffer.size: - return ParseResult(ResultType.INCOMPLETE, buffer) + if buffer.is_eof(): + return ParseResult(ResultType.FAILURE, buffer) + else: + return ParseResult(ResultType.INCOMPLETE, buffer) (b, buffer) = buffer.read_byte() @@ -112,16 +120,18 @@ def p(buffer: SliceableBuffer): return ParseResult(ResultType.SUCCESS, term.buffer, body.value) return p + def debug(parser: Parser) -> Parser: """ - prints the result of the parser to stdout. + prints the input buffer and result of ``parser`` to stdout. + + also provides a nice place to put a debug breakpoint. """ def p(buffer: SliceableBuffer): print(f"input {buffer}") result = parser(buffer) print(f"result {result}") - if type(result.value) is memoryview: - print(f"buffer {bytes(result.value)}") + return result return p @@ -151,7 +161,7 @@ def p(buffer: SliceableBuffer): return p -def take_while(pred) -> Parser: +def take_while(pred: Callable[[int], bool]) -> Parser: """ pred is fed one byte at a time. Why is it written this way? I don't want to add the complexity in surface area to generalize @@ -172,11 +182,38 @@ def p(buffer: SliceableBuffer): return p -def constant(parser: Parser, constant: Any) -> Parser: +def take_while_n(n: int, pred: Callable[[int], bool]) -> Parser: + """ + same as take_while but expects to, and only reads n bytes + """ + def p(buffer: SliceableBuffer): + ct = 0 + + def wrapped_pred(byte): + nonlocal ct + if ct > n: + return False + ct += 1 + return pred(byte) + + avail = buffer.size + (data, buffer) = buffer.read_while(wrapped_pred) + + if len(data) == n: + return ParseResult(ResultType.SUCCESS, buffer, data) + elif len(data) == avail: + return ParseResult(ResultType.INCOMPLETE, buffer) + else: + return ParseResult(ResultType.FAILURE, buffer) + + return p + + +def constant(parser: Parser, value: Any) -> Parser: """ If the parser succeeds, the resulting value will be replaced by the constant. """ - return map_value(parser, lambda _: constant) + return map_value(parser, lambda _: value) def map_value(parser: Parser, mapper: Callable) -> Parser: @@ -214,7 +251,7 @@ def p(buffer: SliceableBuffer): def preceded(prefix: Parser, item: Parser) -> Parser: """ - checks that the prefix matches then returns item if it matches. + check that the prefix matches then returns item if it matches. if both match then the buffer returned from item is returned """ @@ -251,6 +288,28 @@ def p(buffer: SliceableBuffer): return p +def delim_pair(left: Parser, delim: Parser, right: Parser) -> Parser: + """ + returns a tuple of the left and right values + """ + def p(buffer: SliceableBuffer): + l = left(buffer) + if l.type is not ResultType.SUCCESS: + return ParseResult(l.type, buffer) + + d = delim(l.buffer) + if d.type is not ResultType.SUCCESS: + return ParseResult(d.type, buffer) + + r = right(d.buffer) + if r.type is not ResultType.SUCCESS: + return ParseResult(r.type, buffer) + + return ParseResult(ResultType.SUCCESS, r.buffer, (l.value, r.value)) + + return p + + def peek(parser: Parser) -> Parser: """ Peek at the stream, result propagates but the context doesn't move forward. @@ -259,3 +318,22 @@ def p(buffer: SliceableBuffer): result = parser(buffer) return ParseResult(result.type, buffer, result.value) return p + +def is_eof() -> Parser: + """ + returns Success if the buffer is empty and marked EOF + + Should this return Incomplete if the buffer is empty but not EOF? + It does for now, I guess. + """ + + def p(buffer: SliceableBuffer): + if not buffer.size: + if buffer.is_eof(): + return ParseResult(ResultType.SUCCESS, buffer) + else: + return ParseResult(ResultType.INCOMPLETE, buffer) + else: + return ParseResult(ResultType.FAILURE, buffer) + + return p diff --git a/amazon/ion/reader_text2.py b/amazon/ion/reader_text2.py index 25f0e9211..8dd070f76 100644 --- a/amazon/ion/reader_text2.py +++ b/amazon/ion/reader_text2.py @@ -13,10 +13,10 @@ # License. from collections import deque -from typing import Optional, NamedTuple, Callable, Tuple +from typing import Optional, NamedTuple, Tuple from amazon.ion.core import IonType, IonEvent, \ - IonEventType, ION_STREAM_INCOMPLETE_EVENT, ION_STREAM_END_EVENT, IonThunkEvent + IonEventType, ION_STREAM_INCOMPLETE_EVENT, ION_STREAM_END_EVENT, IonThunkEvent, ION_VERSION_MARKER_EVENT from amazon.ion.exceptions import IonException from amazon.ion.protons import * from amazon.ion.reader import ReadEventType @@ -25,10 +25,13 @@ from amazon.ion.util import coroutine def _whitespace(byte): - return byte in bytearray(b" \n\t\r\f") + return byte in bytearray(b" \t\n\r\v\f") -_stop = peek(one_of(b" \n\t\r\f{}[](),\"'")) +_stop = peek( + alt( + one_of(b"{}[](),\"\' \t\n\r\v\f"), + is_eof())) def tag_stop(tag_value): @@ -41,6 +44,7 @@ def is_empty(buffer: SliceableBuffer): else: return ParseResult(ResultType.SUCCESS, buffer, None) + # todo: not nan, null, true or false for field keys and annotations _identifier_symbol = terminated( preceded( @@ -67,7 +71,12 @@ def is_empty(buffer: SliceableBuffer): take_while(lambda b: b in bytearray(b"0123456789")))), _stop) -_tlv_parsec = alt( + +_timestamp = alt( + +) + +_value_parsec = debug(alt( # constant(is_empty, ION_STREAM_END_EVENT), constant(tag_stop(b"nan"), IonEvent(IonEventType.SCALAR, IonType.FLOAT, float("nan"))), constant(tag_stop(b"+inf"), IonEvent(IonEventType.SCALAR, IonType.FLOAT, float("+inf"))), @@ -77,6 +86,8 @@ def is_empty(buffer: SliceableBuffer): constant(tag_stop(b"true"), IonEvent(IonEventType.SCALAR, IonType.BOOL, True)), constant(tag_stop(b"false"), IonEvent(IonEventType.SCALAR, IonType.BOOL, False)), + # ignore(delim(tag_stop(b"//"), take_while(lambda b: b != ord(b"\n")), ), + map_value(_integer, lambda v: IonThunkEvent(IonEventType.SCALAR, IonType.INT, lambda: int(bytes(v)))), @@ -97,26 +108,35 @@ def is_empty(buffer: SliceableBuffer): constant(tag(b"}"), IonEvent(IonEventType.CONTAINER_END, IonType.STRUCT)), constant(tag(b"]"), IonEvent(IonEventType.CONTAINER_END, IonType.LIST)), constant(tag(b")"), IonEvent(IonEventType.CONTAINER_END, IonType.SEXP)), -) +)) + +_tlv_parsec = alt(constant(tag_stop(b"$ion_1_0"), ION_VERSION_MARKER_EVENT), _value_parsec) _take_while_whitespace = take_while(_whitespace) + def _trim_whitespace(buffer: SliceableBuffer): result = _take_while_whitespace(buffer) return result.buffer +def whitespace_then(parser: Parser) -> Parser: + return preceded(_take_while_whitespace, parser) + + def _map_result(result: ParseResult, buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: if result.type is ResultType.SUCCESS: return result.value, result.buffer if result.type is ResultType.INCOMPLETE: - if buffer.size: - return ION_STREAM_INCOMPLETE_EVENT, buffer - else: + if not buffer.size: return ION_STREAM_END_EVENT, buffer + return ION_STREAM_INCOMPLETE_EVENT, buffer if result.type is ResultType.FAILURE: + if not buffer.size and buffer.is_eof(): + return ION_STREAM_END_EVENT, buffer raise IonException("Parse failed on _____") + def _tlv_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: buffer = _trim_whitespace(buffer) result = _tlv_parsec(buffer) @@ -127,10 +147,9 @@ def _tlv_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: _list_parsec = alt( constant(tag(b"]"), IonEvent(IonEventType.CONTAINER_END, IonType.LIST)), terminated( - preceded(_take_while_whitespace, _tlv_parsec), - preceded( - _take_while_whitespace, - alt(tag(b','), peek(tag(b']')))))) + _value_parsec, + whitespace_then(alt(tag(b','), peek(tag(b']')))))) + def _list_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: buffer = _trim_whitespace(buffer) @@ -149,12 +168,11 @@ def _list_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: constant(tag(b"}"), IonEvent(IonEventType.CONTAINER_END, IonType.STRUCT)), map_value( terminated( - pair( + delim_pair( _field_name, - preceded( - preceded(_take_while_whitespace, tag(b':')), - preceded(_take_while_whitespace, _tlv_parsec))), - preceded(_take_while_whitespace, alt(tag(b','), peek(tag(b'}'))))), + whitespace_then(tag(b':')), + whitespace_then(_value_parsec)), + whitespace_then(alt(tag(b','), peek(tag(b'}'))))), # todo: lazy field name lambda pair: pair[1].derive_field_name(SymbolToken(bytes(pair[0]).decode("utf-8"), None)))) @@ -171,7 +189,7 @@ def _struct_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: _sexp_parsec = alt( constant(tag(b")"), IonEvent(IonEventType.CONTAINER_END, IonType.SEXP)), terminated( - preceded(_take_while_whitespace, _tlv_parsec), + whitespace_then(_value_parsec), alt(_whitespace_or_operator, peek(tag(b')'))))) def _sexp_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: @@ -217,7 +235,7 @@ def text_stream_handler(): if incomplete: # todo: a cleaner way to do this is to set a flush flag # and pass it to the parser as context... - buffer = buffer.extend(b"\t") + buffer = buffer.eof() else: raise TypeError("Data expected") else: @@ -257,4 +275,3 @@ def text_stream_handler(): context_stack.pop() ion_event = ion_event.derive_depth(depth) - diff --git a/amazon/ion/sliceable_buffer.py b/amazon/ion/sliceable_buffer.py index 409fe8f4b..a8503a0b6 100644 --- a/amazon/ion/sliceable_buffer.py +++ b/amazon/ion/sliceable_buffer.py @@ -23,7 +23,7 @@ def empty(): """ return SliceableBuffer([]) - def __init__(self, chunks, offset=0, size=0): + def __init__(self, chunks, offset=0, size=0, eof=False): """ *Class internal usage only.* @@ -34,11 +34,14 @@ def __init__(self, chunks, offset=0, size=0): # chunks which is more efficient than slicing and copying the chunks # on each read or skip. self._offset = offset + self._eof = eof self.size = size def extend(self, chunk): """ Return a new buffer with the chunk appended. + + The new buffer is not marked EOF even if self is. """ if not chunk: raise ValueError("Chunk must be not None and non-empty!") @@ -67,9 +70,9 @@ def read_byte(self): raise IncompleteReadError("Buffer is empty!") if length == offset + 1: - return chunk[offset], SliceableBuffer(chunks[1:], 0, size - 1) + return chunk[offset], SliceableBuffer(chunks[1:], 0, size - 1, self._eof) else: - return chunk[offset], SliceableBuffer(chunks, offset + 1, size - 1), + return chunk[offset], SliceableBuffer(chunks, offset + 1, size - 1, self._eof) def read_slice(self, n): """ @@ -95,9 +98,9 @@ def read_slice(self, n): # optimizes for common case and simplifies accumulation loop (chunk, length) = chunks[0] if endpos < length: - return chunk[offset:endpos], SliceableBuffer(chunks, offset + n, size - n) + return chunk[offset:endpos], SliceableBuffer(chunks, offset + n, size - n, self._eof) elif endpos == length: - return chunk[offset:], SliceableBuffer(chunks[1:], 0, size - n) + return chunk[offset:], SliceableBuffer(chunks[1:], 0, size - n, self._eof) slices = [_ChunkPair(chunk[offset:], length - offset)] @@ -123,7 +126,7 @@ def read_slice(self, n): combined[cursor:cursor + length] = chunk cursor += length - return memoryview(combined), SliceableBuffer(chunks[i:], remaining, size - n) + return memoryview(combined), SliceableBuffer(chunks[i:], remaining, size - n, self._eof) def read_while(self, pred): """ @@ -194,7 +197,7 @@ def skip(self, n): i += 1 break - return n, SliceableBuffer(chunks[i:], remaining, size - n) + return n, SliceableBuffer(chunks[i:], remaining, size - n, self._eof) def __len__(self): """ @@ -204,11 +207,20 @@ def __len__(self): def __repr__(self): if self.size == 0: - return "SliceableBuffer(size=0, data=[])" - elif self.size < 5: - return f"SliceableBuffer(size={self.size}, data=[{bytes(self.peek(self.size))}])" + data = "" + elif self.size <= 5: + data = f"{bytes(self.peek(self.size))}" else: - return f"SliceableBuffer(size={self.size}, data=[{bytes(self.peek(5))}...])" + data = f"{bytes(self.peek(5))}..." + + return f"SliceableBuffer(size={self.size}, data=[{data}], eof={self._eof})" + + def eof(self): + return SliceableBuffer(self._chunks, self._offset, self.size, True) + + def is_eof(self): + return self._eof + class IncompleteReadError(IndexError): pass diff --git a/tests/test_protons.py b/tests/test_protons.py index f5aec7736..652fe7c7b 100644 --- a/tests/test_protons.py +++ b/tests/test_protons.py @@ -12,7 +12,7 @@ def expect_value(v, next=None): """ def expect(result: ParseResult): assert result.type is ResultType.SUCCESS - assert v == result.value + assert result.value == v if next: n = len(next) (data, _) = result.buffer.read_slice(n) @@ -48,7 +48,7 @@ def expect(result: ParseResult): def expect_inc_or_fail(): def expect(result: ParseResult): - if not result.buffer.size and result.buffer.is_eof(): + if result.buffer.is_eof(): assert result.type is ResultType.FAILURE else: assert result.type is ResultType.INCOMPLETE @@ -79,24 +79,23 @@ def parameterify(*tests: Tuple[Parser, List]): ("spam musubi", expect_value("eggs", next=b" ")), ("beef", expect_failure()) ]), - # (delim(tag(b"{"), tag(b" "), tag(b"}")), [ - # ("{ }", expect_value(b" ")), - # ("{ };", expect_value(b" ", next=b";")), - # ("{}", expect_failure()), - # ("{", expect_inc_or_fail()), - # ("{ ", expect_inc_or_fail()), - # ("", expect_inc_or_fail()), - # ("[]", expect_failure()), - # (" }", expect_failure()), - # ("{bad}", expect_failure()) - # ]), + (delim(tag(b"{"), tag(b" "), tag(b"}")), [ + ("{ }", expect_value(b" ")), + ("{ };", expect_value(b" ", next=b";")), + ("{}", expect_failure()), + ("{", expect_inc_or_fail()), + ("{ ", expect_inc_or_fail()), + ("", expect_inc_or_fail()), + ("[]", expect_failure()), + (" }", expect_failure()), + ("{bad}", expect_failure()) + ]), (take_while(lambda b: ord(b'a') <= b <= ord(b'c')), [ - ("abc", expect_value_if_done(b"abc")), + ("abc", expect_value(b"abc")), ("abc123", expect_value(b"abc", next=b"1")), - ("", expect_incomplete()), + ("", expect_value(b"")), ("123", expect_value(b"", next=b"1")) ]), - # todo: peek (terminated(tag(b"foo"), tag(b";")), [ ("foo;", expect_value(b"foo")), ("foo|", expect_failure()), @@ -119,7 +118,16 @@ def parameterify(*tests: Tuple[Parser, List]): (is_eof(), [ ("", expect_value_if_done(None)), ("a", expect_failure()) + ]), + (take_while_n(3, lambda b: b in b"bar"), [ + ("", expect_incomplete()), + ("B", expect_failure()), + ("ba", expect_incomplete()), + ("bar", expect_value(b"bar")), + ("baD", expect_failure()), + ("barD", expect_value(b"bar", next=b"D")) ]) + # todo: peek, pair )) def test_rule(rule, data, expect): """ From c574592eba6ae13c1d68b5cd0c43609af8216d92 Mon Sep 17 00:00:00 2001 From: Rob Marrowstone Date: Mon, 29 Apr 2024 12:43:34 -0700 Subject: [PATCH 8/8] Changes for Text Parser Refactor I'm putting this on the shelf for a bit. Basically: I did some stuff to move this along, I ran it against my own "hkc" dataset and the results were not great. Specifically: the existing pure-python text parser does about .3 ops/second whereas this code does .4. And it's not complete. Struct parsing is broken becuase as of now each of the container parsers is stateless, but the parsers themselves are not re-entrant. So you effectively can't have containers inside of containers. It took me about 3 - 4 heads down days to get here and I suspect it would take about as much to productionize it. Given what I saw from messing around, simply changing out the main value alt for a table (or reading only once and passing the data?) would likely have a significant impact on performance. Changes: * Added up to day precision Timestamp parsing * Broken Semantically, but "working" Decimal Parsing * Untested formally but attempted "table" proton * Hacked up ability to run Refactor against tests --- amazon/ion/protons.py | 56 ++++++++++-- amazon/ion/reader_text2.py | 125 ++++++++++++++++++++++----- amazon/ion/simpleion.py | 33 +++++++ amazon/ion/sliceable_buffer.py | 17 ++++ amazon/ionbenchmark/ion_load_dump.py | 3 +- 5 files changed, 205 insertions(+), 29 deletions(-) diff --git a/amazon/ion/protons.py b/amazon/ion/protons.py index 5f9a0b113..b8c748861 100644 --- a/amazon/ion/protons.py +++ b/amazon/ion/protons.py @@ -1,6 +1,7 @@ +from collections import namedtuple from dataclasses import dataclass from enum import Enum -from typing import Callable, Any +from typing import Callable, Any, NamedTuple from amazon.ion.sliceable_buffer import SliceableBuffer @@ -121,16 +122,16 @@ def p(buffer: SliceableBuffer): return p -def debug(parser: Parser) -> Parser: +def debug(name: str, parser: Parser) -> Parser: """ prints the input buffer and result of ``parser`` to stdout. also provides a nice place to put a debug breakpoint. """ def p(buffer: SliceableBuffer): - print(f"input {buffer}") + print(f"{name} input {buffer}") result = parser(buffer) - print(f"result {result}") + print(f"{name} result {result}") return result return p @@ -177,6 +178,9 @@ def take_while(pred: Callable[[int], bool]) -> Parser: """ def p(buffer: SliceableBuffer): (data, buffer) = buffer.read_while(pred) + if not buffer.size and not buffer.is_eof(): + return ParseResult(ResultType.INCOMPLETE, buffer) + return ParseResult(ResultType.SUCCESS, buffer, data) return p @@ -249,6 +253,48 @@ def p(buffer: SliceableBuffer): return p +class Exact(NamedTuple): + value: int + +class Range(NamedTuple): + low: int + high: int + + +SwitchRule = Exact | Range + + +def table(*mappings: tuple[SwitchRule, Parser]) -> Parser: + """The parsers should expect to "re-parse" the byte, at least for now. + """ + + table: list[Parser | None] = [None] * 256 + for rule, parser in mappings: + if type(rule) is Exact: + keys = [rule.value] + else: + keys = [i for i in range(rule.low, rule.high + 1)] + for key in keys: + if table[key]: + raise ValueError(f"mapping for int value: {key} already present!") + table[key] = parser + + def p(buffer: SliceableBuffer): + if not buffer.size: + if buffer.is_eof(): + return ParseResult(ResultType.FAILURE, buffer) + else: + return ParseResult(ResultType.INCOMPLETE, buffer) + byte = buffer.peek_byte() + parser = table[byte] + if not parser: + return ParseResult(ResultType.FAILURE, buffer) + else: + return parser(buffer) + + return p + + def preceded(prefix: Parser, item: Parser) -> Parser: """ check that the prefix matches then returns item if it matches. @@ -287,7 +333,6 @@ def p(buffer: SliceableBuffer): return p - def delim_pair(left: Parser, delim: Parser, right: Parser) -> Parser: """ returns a tuple of the left and right values @@ -319,6 +364,7 @@ def p(buffer: SliceableBuffer): return ParseResult(result.type, buffer, result.value) return p + def is_eof() -> Parser: """ returns Success if the buffer is empty and marked EOF diff --git a/amazon/ion/reader_text2.py b/amazon/ion/reader_text2.py index 8dd070f76..70d940768 100644 --- a/amazon/ion/reader_text2.py +++ b/amazon/ion/reader_text2.py @@ -13,10 +13,11 @@ # License. from collections import deque +from decimal import Decimal from typing import Optional, NamedTuple, Tuple from amazon.ion.core import IonType, IonEvent, \ - IonEventType, ION_STREAM_INCOMPLETE_EVENT, ION_STREAM_END_EVENT, IonThunkEvent, ION_VERSION_MARKER_EVENT + IonEventType, ION_STREAM_INCOMPLETE_EVENT, ION_STREAM_END_EVENT, IonThunkEvent, ION_VERSION_MARKER_EVENT, Timestamp from amazon.ion.exceptions import IonException from amazon.ion.protons import * from amazon.ion.reader import ReadEventType @@ -46,11 +47,10 @@ def is_empty(buffer: SliceableBuffer): # todo: not nan, null, true or false for field keys and annotations -_identifier_symbol = terminated( +_identifier_symbol = \ preceded( peek(one_of(b"$_abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")), - take_while(lambda b: b in bytearray(b"$_abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"))), - _stop) + take_while(lambda b: b in bytearray(b"$_abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"))) _quoted_symbol = delim( tag(b"'"), @@ -62,22 +62,71 @@ def is_empty(buffer: SliceableBuffer): take_while(lambda b: b != ord(b'"')), tag(b'"')) +def _is_digit(byte): + return byte in bytearray(b"0123456789") + # todo: negative numbers _integer = terminated( alt( tag(b"0"), preceded( peek(one_of(b"123456789")), - take_while(lambda b: b in bytearray(b"0123456789")))), + take_while(_is_digit))), + _stop) + +_timestamp = pair( + take_while_n(4, _is_digit), + alt( + tag_stop(b"T"), + preceded( + tag(b"-"), + pair( + take_while_n(2, _is_digit), + alt( + tag_stop(b"T"), + preceded( + tag(b"-"), + pair( + take_while_n(2, _is_digit), + alt( + tag_stop(b"T"), + _stop)))))))) + +# todo: lazy +def _to_timestamp_event(value): + (year, rem1) = value + if type(rem1) is tuple: + (month, rem2) = rem1 + if type(rem2) is tuple: + (day, rem3) = rem2 + return IonEvent(IonEventType.SCALAR, IonType.TIMESTAMP, Timestamp(int(bytes(year)), int(bytes(month)), int(bytes(day)))) + else: + return IonEvent(IonEventType.SCALAR, IonType.TIMESTAMP, Timestamp(int(bytes(year)), int(bytes(month)))) + else: + return IonEvent(IonEventType.SCALAR, IonType.TIMESTAMP, Timestamp(int(bytes(year)),)) + +_decimal = terminated( + delim_pair( + alt( + tag(b"0"), + preceded( + peek(one_of(b"123456789")), + take_while(_is_digit))), + # todo: this is a huge hack, but it works for my hkc dataset + alt(tag(b"."), tag(b"d")), + take_while(_is_digit)), _stop) -_timestamp = alt( - -) +# todo: lazy +def _to_decimal_event(value): + (integer, fraction) = value + dec_str = bytes(integer) + b"." + bytes(fraction) + #print(dec_str) + return IonEvent(IonEventType.SCALAR, IonType.DECIMAL, Decimal(dec_str.decode("utf-8"))) -_value_parsec = debug(alt( - # constant(is_empty, ION_STREAM_END_EVENT), +_value_parsec = alt( + constant(is_empty, ION_STREAM_END_EVENT), constant(tag_stop(b"nan"), IonEvent(IonEventType.SCALAR, IonType.FLOAT, float("nan"))), constant(tag_stop(b"+inf"), IonEvent(IonEventType.SCALAR, IonType.FLOAT, float("+inf"))), constant(tag_stop(b"-inf"), IonEvent(IonEventType.SCALAR, IonType.FLOAT, float("-inf"))), @@ -87,6 +136,8 @@ def is_empty(buffer: SliceableBuffer): constant(tag_stop(b"false"), IonEvent(IonEventType.SCALAR, IonType.BOOL, False)), # ignore(delim(tag_stop(b"//"), take_while(lambda b: b != ord(b"\n")), ), + map_value(_timestamp, _to_timestamp_event), + map_value(_decimal, _to_decimal_event), map_value(_integer, lambda v: IonThunkEvent(IonEventType.SCALAR, IonType.INT, lambda: int(bytes(v)))), @@ -101,14 +152,14 @@ def is_empty(buffer: SliceableBuffer): lambda v: IonThunkEvent(IonEventType.SCALAR, IonType.STRING, lambda: bytes(v).decode('utf-8'), None)), - constant(tag(b"{"), IonEvent(IonEventType.CONTAINER_START, IonType.STRUCT)), + constant(peek(tag(b"{")), IonEvent(IonEventType.CONTAINER_START, IonType.STRUCT)), constant(tag(b"["), IonEvent(IonEventType.CONTAINER_START, IonType.LIST)), constant(tag(b"("), IonEvent(IonEventType.CONTAINER_START, IonType.SEXP)), constant(tag(b"}"), IonEvent(IonEventType.CONTAINER_END, IonType.STRUCT)), constant(tag(b"]"), IonEvent(IonEventType.CONTAINER_END, IonType.LIST)), constant(tag(b")"), IonEvent(IonEventType.CONTAINER_END, IonType.SEXP)), -)) +) _tlv_parsec = alt(constant(tag_stop(b"$ion_1_0"), ION_VERSION_MARKER_EVENT), _value_parsec) @@ -134,7 +185,7 @@ def _map_result(result: ParseResult, buffer: SliceableBuffer) -> Tuple[IonEvent, if result.type is ResultType.FAILURE: if not buffer.size and buffer.is_eof(): return ION_STREAM_END_EVENT, buffer - raise IonException("Parse failed on _____") + raise IonException(f"Parse failed on {buffer}") def _tlv_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: @@ -164,17 +215,45 @@ def _list_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: # todo: long quoted string, because why!?!?!? ) +def opt(parser: Parser) -> Parser: + def p(buffer: SliceableBuffer): + result = parser(buffer) + if result.type is ResultType.SUCCESS: + return result + if result.type is ResultType.FAILURE: + return ParseResult(ResultType.SUCCESS, result.buffer) + return result + return p + +# todo: this is all eff'ed, basically it doesn't work when the _value_parsec does not fully consume the +# buffer. I didn't build this for these to be re-entrant so it's both an issue with expecting a comma or brace after a value +# and an issue when we close the container, then we're at a place where we have a whitespace and comma from a previous invocation. +# _struct_parsec = alt( +# constant(tag(b"}"), IonEvent(IonEventType.CONTAINER_END, IonType.STRUCT)), +# map_value( +# terminated( +# delim_pair( +# _field_name, +# whitespace_then(tag(b':')), +# whitespace_then(_value_parsec)), +# # todo: i think there's a bug here with whitespace after a trailing comma +# debug("struct end", whitespace_then(alt(tag(b','), peek(tag(b'}')))))), +# # todo: lazy field name +# lambda pair: pair[1].derive_field_name(SymbolToken(bytes(pair[0]).decode("utf-8"), None)))) + +# todo: empty structs don't work, because they're not parsed as a container_end event _struct_parsec = alt( - constant(tag(b"}"), IonEvent(IonEventType.CONTAINER_END, IonType.STRUCT)), - map_value( - terminated( - delim_pair( - _field_name, - whitespace_then(tag(b':')), - whitespace_then(_value_parsec)), - whitespace_then(alt(tag(b','), peek(tag(b'}'))))), - # todo: lazy field name - lambda pair: pair[1].derive_field_name(SymbolToken(bytes(pair[0]).decode("utf-8"), None)))) + whitespace_then(constant(tag(b"}"), IonEvent(IonEventType.CONTAINER_END, IonType.STRUCT))), + preceded( + whitespace_then(alt(tag(b"{"), tag(b","))), + map_value( + delim_pair( + whitespace_then(_field_name), + whitespace_then(tag(b':')), + whitespace_then(_value_parsec)), + # todo: lazy field name + lambda pair: pair[1].derive_field_name(SymbolToken(bytes(pair[0]).decode("utf-8"), None))))) + def _struct_parser(buffer: SliceableBuffer) -> Tuple[IonEvent, SliceableBuffer]: buffer = _trim_whitespace(buffer) diff --git a/amazon/ion/simpleion.py b/amazon/ion/simpleion.py index 4e29a568c..87232e2ca 100644 --- a/amazon/ion/simpleion.py +++ b/amazon/ion/simpleion.py @@ -70,6 +70,7 @@ from .reader import blocking_reader, NEXT_EVENT from .reader_binary import binary_reader from .reader_managed import managed_reader +from .reader_text2 import text_stream_handler from .simple_types import IonPyList, IonPyDict, IonPyNull, IonPyBool, IonPyInt, IonPyFloat, IonPyDecimal, \ IonPyTimestamp, IonPyText, IonPyBytes, IonPySymbol, is_null from .symbols import SymbolToken @@ -412,6 +413,38 @@ def load_python(fp, catalog=None, single_value=True, parse_eagerly=True): return result return out +def load_python_r2(fp, catalog=None, single_value=True, parse_eagerly=True): + """'pure' Python implementation. Users should prefer to call ``load``.""" + if isinstance(fp, _TEXT_TYPES): + raw_reader = text_stream_handler() + else: + pos = fp.tell() + maybe_ivm = fp.read(4) + fp.seek(pos) + if maybe_ivm == _IVM: + raw_reader = binary_reader() + else: + raw_reader = text_stream_handler() + reader = blocking_reader(managed_reader(raw_reader, catalog), fp) + if parse_eagerly: + out = [] # top-level + _load(out, reader) + if single_value: + if len(out) != 1: + raise IonException('Stream contained %d values; expected a single value.' % (len(out),)) + return out[0] + return out + else: + out = _load_iteratively(reader) + if single_value: + result = next(out) + try: + next(out) + raise IonException('Stream contained more than 1 values; expected a single value.') + except StopIteration: + return result + return out + _FROM_ION_TYPE = [ IonPyNull, diff --git a/amazon/ion/sliceable_buffer.py b/amazon/ion/sliceable_buffer.py index a8503a0b6..d381ac226 100644 --- a/amazon/ion/sliceable_buffer.py +++ b/amazon/ion/sliceable_buffer.py @@ -74,6 +74,23 @@ def read_byte(self): else: return chunk[offset], SliceableBuffer(chunks, offset + 1, size - 1, self._eof) + + def peek_byte(self): + """ + Peek at the next byte from the buffer, return it. + + Raise IncompleteReadError if the buffer is empty. + """ + chunks = self._chunks + offset = self._offset + + try: + # assume that we have data, and that chunks are non-empty + (chunk, length) = chunks[0] + return chunk[offset] + except IndexError: + raise IncompleteReadError("Buffer is empty!") + def read_slice(self, n): """ Read a slice of the buffer, return (slice, new buffer). diff --git a/amazon/ionbenchmark/ion_load_dump.py b/amazon/ionbenchmark/ion_load_dump.py index 92202ee87..4603363ae 100644 --- a/amazon/ionbenchmark/ion_load_dump.py +++ b/amazon/ionbenchmark/ion_load_dump.py @@ -22,11 +22,12 @@ def __init__(self, binary, c_ext=True, value_model=ion.IonPyValueModel.ION_PY): def loads(self, s): ion.c_ext = self._c_ext + print("loads?") return ion.loads(s, single_value=self._single_value, value_model=self.value_model) def load(self, fp): ion.c_ext = self._c_ext - it = ion.load(fp, parse_eagerly=False, single_value=False, value_model=self.value_model) + it = ion.load_python_r2(fp, parse_eagerly=False, single_value=False) while True: try: yield next(it)