Source code for bytesparse.io

# Copyright (c) 2020-2024, Andrea Zoppi.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
#    this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

r"""Streaming utilities."""

import io
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Optional
from typing import Union

from . import Memory as _Memory
from .base import Address
from .base import AnyBytes
from .base import EllipsisType
from .base import ImmutableMemory
from .base import MutableMemory

# Not all the platforms support sparse files natively, thus they do not provide
# os.SEEK_DATA and os.SEEK_HOLE by themselves; we do it here!

SEEK_SET: int = 0
r"""Seek from the beginning of the stream (position 0)."""

SEEK_CUR: int = 1
r"""Seek from the current stream position (:meth:`MemoryIO.tell`)."""

SEEK_END: int = 2
r"""Seek from the end of the underlying memory object (:meth:`ImmutableMemory.endex`)."""

SEEK_DATA: int = 3
r"""Seek to the next data block."""

SEEK_HOLE: int = 4
r"""Seek to the next memory hole."""


[docs] class MemoryIO(io.BufferedIOBase): r"""Buffered I/O wrapper. This class wraps an :obj:`ImmutableMemory` object so that the latter can be accessed like a typical Python I/O read-only stream. A memory stream is writable if, on its creation (:meth:`__init__`), its underlying memory object can be written an empty byte string at its start address, without raising any exceptions. Any operations executed on a closed stream may fail, raising an exception. The stream position (the result of :meth:`tell`) indicates the address currently pointed by the stream. It is just a number, and as such it is allowed to fall outside the actual memory bounds. The stream position always refers to absolute address 0; in no way it ever refers to the :attr:`ImmutableMemory.start` of the underlying wrapped memory object. Arguments: memory (:obj:`ImmutableMemory`): The memory object to wrap. If ``None``, it assigns a new empty :obj:`bytesparse.Memory`. seek (int): If ``Ellipsis``, :meth:`seek` to :attr:`memory.start`. If not ``None``, :meth:`seek` to the absolute address `seek`. Attributes: _memory (:obj:`ImmutableMemory`): The underlying wrapped memory object. It is set to ``None`` when :attr:`closed`. _position (int): The current stream position. It always refers to absolute address 0. _writable (bool): The stream is writable on creation. See Also: :class:`bytesparse.base.ImmutableMemory` :class:`bytesparse.base.MutableMemory` :meth:`seek` :meth:`writable` Examples: >>> from bytesparse import Memory, MemoryIO >>> stream = MemoryIO(seek=3) >>> stream.write(b'Hello') 5 >>> str(stream.memory) "<[[3, b'Hello']]>" >>> stream.seek(10) 10 >>> stream.write(b'World!') 6 >>> str(stream.memory) "<[[3, b'Hello'], [10, b'World!']]>" >>> memory = Memory.from_bytes(b'Hello, World!') >>> stream = MemoryIO(memory, seek=7) >>> stream.read() b'World!' >>> stream.tell() 13 >>> stream.seek(-6, SEEK_END) 7 >>> stream.write(b'Human') 5 >>> bytes(memory) b'Hello, Human!' >>> stream.truncate(5) 5 >>> stream.seek(3, SEEK_CUR) 8 >>> stream.write(b'World') 5 >>> memory.to_blocks() [[0, b'Hello'], [8, b'World']] >>> stream.close() >>> blocks = [[3, b'Hello'], [10, b'World!']] >>> memory = Memory.from_blocks(blocks) >>> stream = MemoryIO(memory, seek=...) >>> stream.tell() 3 >>> stream.read() b'Hello' >>> blocks = [[3, b'Hello\nWorld!'], [20, b'Bye\n'], [28, b'Bye!']] >>> with MemoryIO(Memory.from_blocks(blocks)) as stream: ... lines = stream.readlines() >>> lines [b'Hello\n', b'World!', b'Bye\n', b'Bye!'] >>> stream.seek(0) Traceback (most recent call last): ... ValueError: I/O operation on closed stream. """
[docs] def __del__(self) -> None: r"""Prepares the object for destruction. It makes sure the stream is closed upon object destruction. """ self.close()
[docs] def __enter__(self) -> 'MemoryIO': r"""Context manager enter function. Returns: :obj:`MemoryIO`: The stream object itself. Examples: >>> from bytesparse import Memory, MemoryIO >>> with MemoryIO(Memory.from_bytes(b'Hello, World!')) as stream: ... data = stream.read() >>> data b'Hello, World!' """ self._check_closed() return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb) -> None: r"""Context manager exit function. It makes sure the stream is closed upon context exit. Examples: >>> from bytesparse import Memory, MemoryIO >>> with MemoryIO(Memory.from_bytes(b'Hello, World!')) as stream: ... print(stream.closed) False >>> print(stream.closed) True """ self.close()
[docs] def __init__( self, memory: Optional[Union[ImmutableMemory, MutableMemory]] = None, seek: Optional[Union[Address, EllipsisType]] = None, ): if memory is None: memory = _Memory() writable = True else: start = memory.start try: memory.write(start, b'') except Exception: writable = False else: writable = True self._memory: Optional[Union[ImmutableMemory, MutableMemory]] = memory self._writable: bool = writable self._position: Address = 0 if seek is Ellipsis: self.seek(memory.start) elif seek is not None: self.seek(seek)
[docs] def __iter__(self) -> Iterator[bytes]: r"""Iterates over lines. Repeatedly calls :meth:`readline`, as long as it returns byte strings. Yields the values returned by such calls. Yields: bytes: Single line; terminator included. See Also: :meth:`readline` Examples: >>> from bytesparse import Memory, MemoryIO >>> blocks = [[3, b'Hello\nWorld!'], [20, b'Bye\n'], [28, b'Bye!']] >>> with MemoryIO(Memory.from_blocks(blocks)) as stream: ... lines = [line for line in stream] >>> lines [b'Hello\n', b'World!', b'Bye\n', b'Bye!'] """ while 1: line = self.readline() try: line < 0 except TypeError: if line: yield line else: break
[docs] def __next__(self) -> bytes: r"""Next iterated line. Calls :meth:`readline` once, returning the value. Returns: bytes or int: Line read from the stream, or the negative gap size. See Also: :meth:`readline` Examples: >>> from bytesparse import Memory, MemoryIO >>> blocks = [[3, b'Hello\nWorld!'], [20, b'Bye\n'], [28, b'Bye!']] >>> with MemoryIO(Memory.from_blocks(blocks), seek=9) as stream: ... print(next(stream)) b'World!' """ return self.readline()
[docs] def _check_closed(self) -> None: r"""Checks if the stream is closed. In case the stream is :attr:`closed`, it raises :obj:`ValueError`. Raises: ValueError: The stream is closed. Examples: >>> from bytesparse import Memory, MemoryIO >>> with MemoryIO(Memory.from_bytes(b'ABC')) as stream: ... stream._check_closed() >>> stream._check_closed() Traceback (most recent call last): ... ValueError: I/O operation on closed stream. """ if self.closed: raise ValueError('I/O operation on closed stream.')
[docs] def close(self) -> None: r"""Closes the stream. Any subsequent operations on the closed stream may fail, and some properties may change state. The stream no more links to an underlying memory object. See Also: :attr:`closed` Examples: >>> from bytesparse import Memory, MemoryIO >>> stream = MemoryIO(Memory.from_bytes(b'ABC')) >>> stream.closed False >>> stream.memory is None False >>> stream.readable() True >>> stream.close() >>> stream.closed True >>> stream.memory is None True >>> stream.readable() Traceback (most recent call last): ... ValueError: I/O operation on closed stream. """ self._memory = None self._writable = False self._position = 0
@property def closed(self) -> bool: r"""Closed stream. Returns: bool: Closed stream. See Also: :meth:`close` Examples: >>> from bytesparse import Memory, MemoryIO >>> stream = MemoryIO(Memory.from_bytes(b'ABC')) >>> stream.closed False >>> stream.close() >>> stream.closed True >>> with MemoryIO(Memory.from_bytes(b'ABC')) as stream: ... print(stream.closed) False >>> print(stream.closed) True """ return self._memory is None
[docs] def detach(self) -> None: r"""Detaches the underlying raw stream. Warnings: It always raises :exc:`io.UnsupportedOperation`. This method is present only for API compatibility. No actual underlying stream is present for this object. Raises: :exc:`io.UnsupportedOperation`: No underlying raw stream. """ raise io.UnsupportedOperation('detach')
[docs] def fileno(self) -> int: r"""File descriptor identifier. Warnings: It always raises :exc:`io.UnsupportedOperation`. This method is present only for API compatibility. No actual file descriptor is associated to this object. Raises: OSError: Not a file stream. """ raise io.UnsupportedOperation('fileno')
[docs] def flush(self) -> None: r"""Flushes buffered data into the underlying raw steam. Notes: Since no underlying stream is associated, this method does nothing. """ pass
[docs] def getbuffer(self) -> memoryview: r"""Memory view of the underlying memory object. Warnings: This method may fail when the underlying memory object has gaps within data. Returns: memoryview: Memory view over the underlying memory object. See Also: :meth:`ImmutableMemory.view` Examples: >>> from bytesparse import Memory, MemoryIO >>> with MemoryIO(Memory.from_bytes(b'Hello, World!')) as stream: ... with stream.getbuffer() as buffer: ... print(type(buffer), '=', bytes(buffer)) <class 'memoryview'> = b'Hello, World!' >>> blocks = [[3, b'Hello'], [10, b'World!']] >>> with MemoryIO(Memory.from_blocks(blocks)) as stream: ... stream.getbuffer() Traceback (most recent call last): ... ValueError: non-contiguous data within range """ self._check_closed() return self._memory.view()
[docs] def getvalue(self) -> bytes: r"""Byte string copy of the underlying memory object. Warnings: This method may fail when the underlying memory object has gaps within data. Returns: bytes: Byte string copy of the underlying memory object. See Also: :meth:`ImmutableMemory.to_bytes` Examples: >>> from bytesparse import Memory, MemoryIO >>> with MemoryIO(Memory.from_bytes(b'Hello, World!')) as stream: ... value = stream.getvalue() ... print(type(value), '=', bytes(value)) <class 'bytes'> = b'Hello, World!' >>> blocks = [[3, b'Hello'], [10, b'World!']] >>> with MemoryIO(Memory.from_blocks(blocks)) as stream: ... stream.getvalue() Traceback (most recent call last): ... ValueError: non-contiguous data within range """ self._check_closed() return self._memory.to_bytes()
[docs] def isatty(self) -> bool: r"""Interactive console stream. Returns: bool: ``False``, not an interactive console stream. """ return False
@property def memory(self) -> Optional[ImmutableMemory]: r""":obj:`ImmutableMemory`: Underlying memory object. ``None`` when :attr:`closed`. Examples: >>> from bytesparse import Memory, MemoryIO >>> memory = Memory.from_bytes(b'Hello, World!') >>> with MemoryIO(memory) as stream: ... print(stream.memory is memory) True >>> print(stream.memory is memory) False >>> print(stream.memory is None) True """ return self._memory
[docs] def peek( self, size: Optional[Address] = 0, asmemview: bool = False, ) -> Union[bytes, memoryview, Address]: r"""Previews the next chunk of bytes. Similar to :meth:`read`, without moving the stream position instead. This method can be used to preview the next chunk of bytes, without affecting the stream itself. The number of returned bytes may be different from `size`, which acts as a mere hint. If the current stream position lies within a memory gap, this method returns the negative amount of bytes to reach the next data block. If the current stream position is after the end of memory data, this method returns an empty byte string. Arguments: size (int): Number of bytes to read. If negative or ``None``, read as many bytes as possible. asmemview (bool): Return a :obj:`memoryview` instead of :obj:`bytes`. Returns: bytes: Chunk of bytes. See Also: :meth:`read` Examples: >>> from bytesparse import Memory, MemoryIO >>> blocks = [[3, b'Hello'], [10, b'World!']] >>> memory = Memory.from_blocks(blocks) >>> stream = MemoryIO(memory, seek=4) >>> stream.peek() b'' >>> stream.peek(1) b'e' >>> stream.peek(11) b'ello' >>> stream.peek(None) b'ello' >>> stream.tell() 4 >>> memview = stream.peek(-1, asmemview=True) >>> type(memview) <class 'memoryview'> >>> bytes(memview) b'ello' >>> stream.seek(8) 8 >>> stream.peek() -2 """ if size is None: size = -1 self._check_closed() start = self._position memory = self._memory _, block_endex, block_value = memory.block_span(start) if block_value is None: return start - block_endex if size < 0: endex = block_endex else: endex = start + size if endex > block_endex: endex = block_endex if asmemview: chunk = memory.view(start=start, endex=endex) else: chunk = memory.to_bytes(start=start, endex=endex) return chunk
[docs] def read( self, size: Optional[Address] = -1, asmemview: bool = False, ) -> Union[bytes, memoryview, Address]: r"""Reads a chunk of bytes. Starting from the current stream position, this method tries to read up to `size` bytes (or as much as possible if negative or ``None``). The number of bytes can be less than `size` in the case a memory hole or the end are encountered. If the current stream position lies within a memory gap, this method returns the negative amount of bytes to reach the next data block. If the current stream position is after the end of memory data, this method returns an empty byte string. Arguments: size (int): Number of bytes to read. If negative or ``None``, read as many bytes as possible. asmemview (bool): Return a :obj:`memoryview` instead of :obj:`bytes`. Returns: bytes: Chunk of up to `size` bytes. Examples: >>> from bytesparse import Memory, MemoryIO >>> blocks = [[3, b'Hello'], [10, b'World!']] >>> memory = Memory.from_blocks(blocks) >>> stream = MemoryIO(memory, seek=4) >>> stream.read(1) b'e' >>> stream.tell() 5 >>> stream.read(99) b'llo' >>> stream.tell() 8 >>> stream.read() -2 >>> stream.tell() 10 >>> memview = stream.read(None, asmemview=True) >>> type(memview) <class 'memoryview'> >>> bytes(memview) b'World!' >>> stream.tell() 16 >>> stream.read() b'' >>> stream.tell() 16 """ if size is None: size = -1 self._check_closed() start = self._position memory = self._memory _, block_endex, block_value = memory.block_span(start) if block_value is None: if block_endex is None: return b'' else: self._position = block_endex return start - block_endex if size < 0: endex = block_endex else: endex = start + size if endex > block_endex: endex = block_endex if asmemview: chunk = memory.view(start=start, endex=endex) else: chunk = memory.to_bytes(start=start, endex=endex) self._position = endex return chunk
read1 = read
[docs] def readable(self) -> bool: r"""Stream is readable. Returns: bool: ``True``, stream is always readable. """ self._check_closed() return True
[docs] def readline( self, size: Optional[Address] = -1, skipgaps: bool = True, asmemview: bool = False, ) -> Union[bytes, memoryview, Address]: r"""Reads a line. A standard line is a sequence of bytes terminating with a ``b'\n'`` newline character. If `size` is provided (not ``None`` nor negative), the current line ends there, without a trailing newline character. If the stream is pointing after the memory end, an empty byte string is returned. If a memory hole (gap) is encountered, the current line ends there without a trailing newline character. The stream is always positioned after the gap. If the stream points within a memory hole, it returns the negative number of bytes until the next data block. The stream is always positioned after the gap. Arguments: size (int): Maximum number of bytes for the line to read. If ``None`` or negative, no limit is set. skipgaps (bool): If false, the negative size of the pointed memory hole. asmemview (bool): If true, the returned object is a :obj:`memoryview` instead of :obj:`bytes`. Returns: bytes or int: Line read from the stream, or the negative gap size. See Also: :meth:`read` :meth:`readlines` Examples: >>> from bytesparse import Memory, MemoryIO >>> blocks = [[3, b'Hello\nWorld!'], [20, b'Bye\n'], [28, b'Bye!']] >>> stream = MemoryIO(Memory.from_blocks(blocks)) >>> stream.readline() b'Hello\n' >>> stream.tell() 9 >>> stream.readline(None) b'World!' >>> stream.tell() 15 >>> stream.readline(99) b'Bye\n' >>> stream.tell() 24 >>> stream.readline(99) b'Bye!' >>> stream.tell() 32 >>> stream.readline() b'' >>> stream.tell() 32 >>> stream = MemoryIO(Memory.from_blocks(blocks)) >>> stream.readline(4) b'Hell' >>> stream.tell() 7 >>> stream.readline(4) b'o\n' >>> stream.tell() 9 >>> stream = MemoryIO(Memory.from_blocks(blocks)) >>> view = stream.readline(asmemview=True) >>> type(view) is memoryview True >>> bytes(view) b'Hello\n' >>> stream = MemoryIO(Memory.from_blocks(blocks)) >>> # Emulating stream.readlines(skipgaps=False) >>> lines = [] >>> line = True >>> while line: ... line = stream.readline(skipgaps=False) ... lines.append(line) >>> lines [-3, b'Hello\n', b'World!', -5, b'Bye\n', -4, b'Bye!'] >>> stream.tell() 32 >>> stream.readline(skipgaps=False) b'' >>> stream.tell() 32 """ if size is None: size = -1 self._check_closed() memory = self._memory start = self._position try: block_start, block_endex = next(memory.intervals(start=start)) except StopIteration: return b'' if start < block_start: if not skipgaps: self._position = block_start return start - block_start start = block_start self._position = start if size < 0: endex = block_endex else: endex = start + size if endex > block_endex: endex = block_endex try: endex = memory.index(b'\n', start=start, endex=endex) + 1 except ValueError: pass if asmemview: chunk = memory.view(start=start, endex=endex) else: chunk = memory.to_bytes(start=start, endex=endex) self._position = endex return chunk
[docs] def readlines( self, hint: Optional[Address] = -1, skipgaps: bool = True, asmemview: bool = False, ) -> List[Union[bytes, Address]]: r"""Reads a list of lines. It repeatedly calls :meth:`readline`, collecting the returned values into a list, until the total number of bytes read reaches `hint`. If a memory hole (gap) is encountered, the current line ends there without a trailing newline character, and the stream is positioned after the gap. If `skipgaps` is false, the list is appended the negative size of each encountered memory hole. Arguments: hint (int): Number of bytes after which line reading stops. If ``None`` or negative, no limit is set. skipgaps (bool): If false, the list hosts the negative size of each memory hole. asmemview (bool): If true, the returned objects are memory views instead of byte strings. Returns: list of bytes or int: List of lines and gaps read from the stream. See Also: :meth:`__iter__` :meth:`read` :meth:`readline` Examples: >>> from bytesparse import Memory, MemoryIO >>> blocks = [[3, b'Hello\nWorld!'], [20, b'Bye\n'], [28, b'Bye!']] >>> stream = MemoryIO(Memory.from_blocks(blocks)) >>> stream.readlines() [b'Hello\n', b'World!', b'Bye\n', b'Bye!'] >>> stream.tell() 32 >>> stream.readlines() [] >>> stream.tell() 32 >>> stream = MemoryIO(Memory.from_blocks(blocks)) >>> stream.readlines(hint=10) [b'Hello\n', b'World!'] >>> stream.tell() 15 >>> stream = MemoryIO(Memory.from_blocks(blocks)) >>> views = stream.readlines(asmemview=True) >>> all(type(view) is memoryview for view in views) True >>> [bytes(view) for view in views] [b'Hello\n', b'World!', b'Bye\n', b'Bye!'] >>> stream = MemoryIO(Memory.from_blocks(blocks)) >>> stream.readlines(skipgaps=False) [-3, b'Hello\n', b'World!', -5, b'Bye\n', -4, b'Bye!'] >>> stream.tell() 32 >>> stream.readlines(skipgaps=False) [] >>> stream.tell() 32 """ if hint is not None and hint < 0: hint = None total = 0 lines: List[bytes] = [] while hint is None or total < hint: line = self.readline(skipgaps=skipgaps, asmemview=asmemview) try: line < 0 except TypeError: if line: lines.append(line) total += len(line) else: break else: # skipgaps lines.append(line) return lines
[docs] def readinto( self, buffer: Union[bytearray, memoryview, MutableMemory], skipgaps: bool = True, ) -> Address: r"""Reads data into a byte buffer. If the stream is pointing after the memory end, no bytes are read. If pointing within a memory hole (gap), the negative number of bytes until the next data block is returned. The stream is always positioned after the gap. If a memory hole (gap) is encountered after reading some bytes, the reading stops there, and the number of bytes read is returned. The stream is always positioned after the gap. Standard operation reads data until `buffer` is full, or encountering the memory end. It returns the number of bytes read. Arguments: buffer (bytearray): A pre-allocated byte array to fill with bytes read from the stream. skipgaps (bool): If false, it stops reading when a memory hole (gap) is encountered. Returns: int: Number of bytes read, or the negative gap size. Examples: >>> from bytesparse import Memory, MemoryIO >>> blocks = [[3, b'Hello'], [10, b'World!']] >>> memory = Memory.from_blocks(blocks) >>> stream = MemoryIO(memory, seek=4) >>> buffer = bytearray(b'.' * 8) >>> stream.readinto(buffer, skipgaps=True) 8 >>> buffer bytearray(b'elloWorl') >>> stream.tell() 14 >>> stream.readinto(buffer, skipgaps=True) 2 >>> buffer bytearray(b'd!loWorl') >>> stream.tell() 16 >>> stream.readinto(buffer, skipgaps=True) 0 >>> buffer bytearray(b'd!loWorl') >>> stream.tell() 16 >>> stream = MemoryIO(memory, seek=4) >>> buffer = bytearray(b'.' * 8) >>> stream.readinto(buffer, skipgaps=False) 4 >>> buffer bytearray(b'ello....') >>> stream.tell() 8 >>> stream.readinto(buffer, skipgaps=False) -2 >>> stream.tell() 10 >>> stream.readinto(buffer, skipgaps=False) 6 >>> buffer bytearray(b'World!..') >>> stream.tell() 16 >>> stream.readinto(buffer, skipgaps=False) 0 >>> buffer bytearray(b'World!..') >>> stream.tell() 16 """ self._check_closed() memory = self._memory start = self._position size = len(buffer) pending = size offset = 0 for block_start, block_endex in memory.intervals(start=start): if start < block_start: if not skipgaps: if offset: return offset else: self._position = block_start return start - block_start start = block_start self._position = start endex = start + pending if endex > block_endex: endex = block_endex size = endex - start with memory.view(start=start, endex=endex) as view: buffer[offset:(offset + size)] = view start += size self._position = start offset += size pending -= size if pending <= 0: break return offset
readinto1 = readinto
[docs] def seek( self, offset: Address, whence: int = SEEK_SET, ) -> Address: r"""Changes the current stream position. It performs the classic ``seek()`` I/O operation. The `whence` can be any of: * :const:`SEEK_SET` (``0`` or ``None``): referring to the absolute address 0. * :const:`SEEK_CUR` (``1``): referring to the current stream position (:meth:`tell`). * :const:`SEEK_END` (``2``): referring to the memory end (:attr:`ImmutableMemory.endex`). * :const:`SEEK_DATA` (``3``): if the current stream position lies within a memory hole, it moves to the beginning of the next data block; no operation is performed otherwise. * :const:`SEEK_HOLE` (``4``): if the current stream position lies within a data block, it moves to the beginning of the next memory hole (note: the end of the stream is considered as a memory hole); no operation is performed otherwise. Arguments: offset (int): Position offset to apply. whence (int): Where the offset is referred. It can be any of the standard ``SEEK_*`` values. By default, it refers to the beginning of the stream. Returns: int: The updated stream position. Notes: Stream position is just a number, not related to memory ranges. Examples: >>> from bytesparse import * >>> blocks = [[3, b'Hello'], [12, b'World!']] >>> stream = MemoryIO(Memory.from_blocks(blocks)) >>> stream.seek(5) 5 >>> stream.seek(-3, SEEK_END) 15 >>> stream.seek(2, SEEK_CUR) 17 >>> stream.seek(1, SEEK_SET) 1 >>> stream.seek(stream.tell(), SEEK_HOLE) 1 >>> stream.seek(stream.tell(), SEEK_DATA) 3 >>> stream.seek(stream.tell(), SEEK_HOLE) 8 >>> stream.seek(stream.tell(), SEEK_DATA) 12 >>> stream.seek(stream.tell(), SEEK_HOLE) # EOF 18 >>> stream.seek(stream.tell(), SEEK_DATA) # EOF 18 >>> stream.seek(22) # after 22 >>> stream.seek(0) # before 0 """ self._check_closed() if whence == SEEK_SET: self._position = offset elif whence == SEEK_CUR: self._position += offset elif whence == SEEK_END: self._position = self._memory.endex + offset elif whence == SEEK_DATA: _, block_endex, block_value = self._memory.block_span(offset) if block_value is None and block_endex is not None: self._position = block_endex elif whence == SEEK_HOLE: _, block_endex, block_value = self._memory.block_span(offset) if block_value is not None: self._position = block_endex else: raise ValueError('invalid whence') return self._position
[docs] def seekable(self) -> bool: r"""Stream is seekable. Returns: bool: ``True``, stream is always seekable. """ self._check_closed() return True
[docs] def skip_data(self) -> Address: r"""Skips a data block. It moves the current stream position after the end of the currently pointed data block. No action is performed if the current stream position lies within a memory hole (gap). Returns: int: Updated stream position. See Also: :meth:`seek` Examples: >>> from bytesparse import Memory, MemoryIO >>> blocks = [[3, b'Hello'], [12, b'World!']] >>> stream = MemoryIO(Memory.from_blocks(blocks)) >>> stream.skip_data() 0 >>> stream.seek(6) 6 >>> stream.skip_data() 8 >>> stream.skip_data() 8 >>> stream.seek(12) 12 >>> stream.skip_data() 18 >>> stream.skip_data() 18 >>> stream.seek(20) 20 >>> stream.skip_data() 20 """ _, block_endex, block_value = self._memory.block_span(self._position) if block_value is not None: self._position = block_endex return self._position
[docs] def skip_hole(self) -> Address: r"""Skips a memory hole. It moves the current stream position after the end of the currently pointed memory hole (gap). No action is performed if the current stream position lies within a data block. Returns: int: Updated stream position. See Also: :meth:`seek` Examples: >>> from bytesparse import Memory, MemoryIO >>> blocks = [[3, b'Hello'], [12, b'World!']] >>> stream = MemoryIO(Memory.from_blocks(blocks)) >>> stream.skip_hole() 3 >>> stream.skip_hole() 3 >>> stream.seek(9) 9 >>> stream.skip_hole() 12 >>> stream.skip_hole() 12 >>> stream.seek(20) 20 >>> stream.skip_hole() 20 """ _, block_endex, block_value = self._memory.block_span(self._position) if block_value is None and block_endex is not None: self._position = block_endex return self._position
[docs] def tell(self) -> Address: r"""Current stream position. Returns: int: Current stream position. Examples: >>> from bytesparse import Memory, MemoryIO >>> blocks = [[3, b'Hello'], [12, b'World!']] >>> stream = MemoryIO(Memory.from_blocks(blocks)) >>> stream.tell() 0 >>> stream.skip_hole() 3 >>> stream.tell() 3 >>> stream.read(5) b'Hello' >>> stream.tell() 8 >>> stream.skip_hole() 12 >>> stream.read() b'World!' >>> stream.tell() 18 >>> stream.seek(20) 20 >>> stream.tell() 20 """ self._check_closed() return self._position
[docs] def truncate( self, size: Optional[Address] = None, ) -> Address: r"""Truncates stream. If `size` is provided, it moves the current stream position to it. Any data after the updated stream position are deleted from the underlying memory object. The updated stream position can lie outside the actual memory bounds (i.e. extending after the memory). No filling is performed, only the stream position is moved there. Arguments: size (int): If not ``None``, the stream is positioned there. Returns: int: Updated stream position. Raises: :exc:`io.UnsupportedOperation`: Stream not writable. Examples: >>> from bytesparse import Memory, MemoryIO >>> blocks = [[3, b'Hello'], [12, b'World!']] >>> stream = MemoryIO(Memory.from_blocks(blocks)) >>> stream.seek(7) 7 >>> stream.truncate() 7 >>> stream.tell() 7 >>> stream.memory.to_blocks() [[3, b'Hell']] >>> stream.truncate(10) 10 >>> stream.tell() 10 >>> memory = Memory.from_bytes(b'Hello, World!') >>> setattr(memory, 'write', None) # exception on write() >>> stream = MemoryIO(memory) >>> stream.seek(7) 7 >>> stream.truncate() Traceback (most recent call last): ... io.UnsupportedOperation: truncate """ self._check_closed() if self._writable: if size is None: size = self._position else: if size < 0: raise ValueError('negative size value') self._memory.delete(start=size) self._position = size return size else: raise io.UnsupportedOperation('truncate')
[docs] def writable(self) -> bool: r"""Stream is writable. Returns: bool: Stream is writable. Examples: >>> from bytesparse import Memory, MemoryIO >>> memory = Memory.from_bytes(b'Hello, World!') >>> with MemoryIO(memory) as stream: ... print(stream.writable()) True >>> setattr(memory, 'write', None) # exception on write() >>> with MemoryIO(memory) as stream: ... print(stream.writable()) False """ self._check_closed() return self._writable
[docs] def write( self, buffer: Union[AnyBytes, ImmutableMemory, int], ) -> Address: r"""Writes data into the stream. The behaviour depends on the nature of `buffer`: byte-like or integer. Byte-like data are written into the underlying memory object via its :meth:`bytesparse.base.MutableMemory.write` method, at the current stream position (i.e. :meth:`tell`). The stream position is always incremented by the size of `buffer`, regardless of the actual number of bytes written into the underlying memory object (e.g. when cropped by existing :attr:`bytesparse.base.MutableMemory.bounds_span` settings). If `buffer` is a positive integer, that is the amount of bytes to :meth:`bytesparse.base.MutableMemory.clear` from the current stream position onwards. The stream position is incremented by `buffer` bytes. It returns `buffer` as a positive number. If `buffer` is a negative integer, that is the amount of bytes to :meth:`bytesparse.base.MutableMemory.delete` from the current stream position onwards. The stream position is not changed. It returns `buffer` as a positive number. Notes: `buffer` is considered an integer if the execution of ``buffer.__index__()`` does not raise an :exc:`Exception`. Arguments: buffer (bytes): Byte data to write at the current stream position. Returns: int: Size of the written `buffer`. Raises: :exc:`io.UnsupportedOperation`: Stream not writable. See Also: :meth:`bytesparse.base.MutableMemory.clear` :meth:`bytesparse.base.MutableMemory.delete` :meth:`bytesparse.base.MutableMemory.write` Examples: >>> from bytesparse import Memory, MemoryIO >>> blocks = [[3, b'Hello'], [10, b'World!']] >>> memory = Memory.from_blocks(blocks) >>> stream = MemoryIO(memory, seek=10) >>> stream.write(b'Human') 5 >>> memory.to_blocks() [[3, b'Hello'], [10, b'Human!']] >>> stream.tell() 15 >>> stream.seek(7) 7 >>> stream.write(5) # clear 5 bytes 5 >>> memory.to_blocks() [[3, b'Hell'], [12, b'man!']] >>> stream.tell() 12 >>> stream.seek(7) 7 >>> stream.write(-5) # delete 5 bytes 5 >>> memory.to_blocks() [[3, b'Hellman!']] >>> stream.tell() 7 >>> memory = Memory.from_bytes(b'Hello, World!') >>> setattr(memory, 'write', None) # exception on write() >>> stream = MemoryIO(memory, seek=7) >>> stream.write(b'Human') Traceback (most recent call last): ... io.UnsupportedOperation: not writable """ self._check_closed() if self._writable: start = self._position try: size = buffer.__index__() except Exception: size = len(buffer) self._memory.write(start, buffer) self._position = start + size else: if size < 0: size = -size endex = start + size self._memory.delete(start=start, endex=endex) else: endex = start + size self._memory.clear(start=start, endex=endex) self._position = endex return size else: raise io.UnsupportedOperation('not writable')
[docs] def writelines( self, lines: Iterable[Union[AnyBytes, int]], ) -> None: r""" Writes lines to the stream. Line separators are not added, so it is usual for each of the lines provided to have a line separator at the end. If a `line` is an integer, its behavior is as per :meth:`write` (positive: clear, negative: delete). Arguments: lines (list of bytes): List of byte strings to write. See Also: :meth:`bytesparse.base.MutableMemory.clear` :meth:`bytesparse.base.MutableMemory.delete` :meth:`write` Examples: >>> from bytesparse import Memory, MemoryIO >>> lines = [3, b'Hello\n', b'World!', 5, b'Bye\n', 4, b'Bye!'] >>> stream = MemoryIO() >>> stream.writelines(lines) >>> stream.memory.to_blocks() [[3, b'Hello\nWorld!'], [20, b'Bye\n'], [28, b'Bye!']] """ for line in lines: self.write(line)