# Copyright (c) 2020-2024, Andrea Zoppi.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
r"""In-place implementation.
This implementation in pure Python uses the basic :class:`bytearray` data type
to hold block data, which allows mutable in-place operations.
"""
import io
import sys
from itertools import count as _count
from itertools import islice as _islice
from itertools import repeat as _repeat
from itertools import zip_longest as _zip_longest
from typing import Any
from typing import ByteString
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Mapping
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import Union
from .base import HUMAN_ASCII
from .base import STR_MAX_CONTENT_SIZE
from .base import Address
from .base import AddressValueMapping
from .base import AnyBytes
from .base import Block
from .base import BlockIndex
from .base import BlockIterable
from .base import BlockList
from .base import BlockSequence
from .base import ClosedInterval
from .base import EllipsisType
from .base import ImmutableMemory
from .base import MutableBytesparse
from .base import MutableMemory
from .base import OpenInterval
from .base import Value
def _repeat2(
pattern: Optional[ByteString],
offset: Address,
size: Optional[Address],
) -> Iterator[Value]:
r"""Pattern repetition.
Arguments:
pattern (list of int):
The pattern to repeat, made of byte integers, or ``None``.
offset (int):
Index of the first value within the pattern. Wraparound supported.
size (int):
Size of the repeated pattern; ``None`` for infinite repetition.
Yields:
int: Repeated pattern values.
"""
if pattern is None:
if size is None:
yield from _repeat(None)
elif 0 < size:
yield from _repeat(None, size)
else:
pattern_size = len(pattern)
if offset:
offset %= pattern_size
if size is None:
while 1:
yield from _islice(pattern, offset, pattern_size)
yield from _islice(pattern, offset)
else:
for _ in range(size // pattern_size):
yield from _islice(pattern, offset, pattern_size)
yield from _islice(pattern, offset)
size %= pattern_size
chunk_size = pattern_size - offset
if size < chunk_size:
chunk_size = size
yield from _islice(pattern, offset, offset + chunk_size)
yield from _islice(pattern, size - chunk_size)
else:
if size is None:
while 1:
yield from pattern
else:
for _ in range(size // pattern_size):
yield from pattern
yield from _islice(pattern, size % pattern_size)
[docs]
class Memory(MutableMemory):
r"""Virtual memory.
This class is a handy wrapper around `blocks`, so that it can behave mostly
like a :obj:`bytearray`, but on sparse chunks of data.
Please look at examples of each method to get a glimpse of the features of
this class.
See Also:
:obj:`ImmutableMemory`
:obj:`MutableMemory`
Attributes:
_blocks (list of blocks):
A sequence of spaced blocks, sorted by address.
_bound_start (int):
Memory bounds start address. Any data before this address is
automatically discarded; disabled if ``None``.
_bound_endex (int):
Memory bounds exclusive end address. Any data at or after this
address is automatically discarded; disabled if ``None``.
"""
__doc__ += ImmutableMemory.__doc__[ImmutableMemory.__doc__.index('Arguments:'):
ImmutableMemory.__doc__.index('Method Groups:')]
[docs]
def __add__(
self,
value: Union[AnyBytes, ImmutableMemory],
) -> 'Memory':
memory = self.from_memory(self, validate=False)
memory.extend(value)
return memory
[docs]
def __bool__(
self,
) -> bool:
return bool(self._blocks)
[docs]
def __bytes__(
self,
) -> bytes:
return bytes(self.view())
[docs]
def __contains__(
self,
item: Union[AnyBytes, Value],
) -> bool:
return any(item in block_data for _, block_data in self._blocks)
[docs]
def __copy__(
self,
) -> 'Memory':
return self.from_memory(self, start=self._bound_start, endex=self._bound_endex, copy=False)
[docs]
def __deepcopy__(
self,
) -> 'Memory':
return self.from_memory(self, start=self._bound_start, endex=self._bound_endex, copy=True)
[docs]
def __delitem__(
self,
key: Union[Address, slice],
) -> None:
if self._blocks:
if isinstance(key, slice):
start = key.start
if start is None:
start = self.start
endex = key.stop
if endex is None:
endex = self.endex
if start < endex:
step = key.step
if step is None or step == 1:
self._erase(start, endex, True) # delete
elif step > 1:
for address in reversed(range(start, endex, step)):
self._erase(address, address + 1, True) # delete
else:
address = key.__index__()
self._erase(address, address + 1, True) # delete
[docs]
def __eq__(
self,
other: Any,
) -> bool:
if isinstance(other, Memory):
return self._blocks == other._blocks
elif isinstance(other, ImmutableMemory):
zipping = _zip_longest(self._blocks, other.blocks(), fillvalue=(0, b''))
return all(b1 == b2 for b1, b2 in zipping)
elif isinstance(other, (bytes, bytearray, memoryview)):
blocks = self._blocks
block_count = len(blocks)
if block_count > 1:
return False
elif block_count:
return blocks[0][1] == other
else:
return len(other) == 0
else:
iter_self = self.values()
iter_other = iter(other)
return all(a == b for a, b in _zip_longest(iter_self, iter_other, fillvalue=None))
[docs]
def __getitem__(
self,
key: Union[Address, slice],
) -> Any:
if isinstance(key, slice):
start = key.start
if start is None:
start = self.start
endex = key.stop
if endex is None:
endex = self.endex
step = key.step
if isinstance(step, Value):
if step >= 1:
return self.extract(start=start, endex=endex, step=step)
else:
return self.__class__() # empty
else:
return self.extract(start=start, endex=endex, pattern=step)
else:
return self.peek(key.__index__())
[docs]
def __iadd__(
self,
value: Union[AnyBytes, ImmutableMemory],
) -> 'Memory':
self.extend(value)
return self
[docs]
def __imul__(
self,
times: int,
) -> 'Memory':
times = int(times)
if times < 0:
times = 0
blocks = self._blocks
if times and blocks:
start = self.start
size = self.endex - start
offset = size
memory = self.from_memory(self, validate=False)
for time in range(times - 1):
self.write(offset, memory, clear=True)
offset += size
else:
blocks.clear()
return self
[docs]
def __init__(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
):
if start is not None:
start = Address(start)
if endex is not None:
endex = Address(endex)
if start is not None and endex < start:
endex = start
self._blocks: BlockList = []
self._bound_start: Optional[Address] = start
self._bound_endex: Optional[Address] = endex
[docs]
def __ior__(
self,
value: Union[AnyBytes, ImmutableMemory],
) -> 'Memory':
self.write(0, value)
return self
[docs]
def __iter__(
self,
) -> Iterator[Optional[Value]]:
yield from self.values(start=self.start, endex=self.endex)
[docs]
def __len__(
self,
) -> Address:
return self.endex - self.start
[docs]
def __mul__(
self,
times: int,
) -> 'Memory':
times = int(times)
if times < 0:
times = 0
blocks = self._blocks
if times and blocks:
start = self.start
size = self.endex - start
offset = size # adjust first write
memory = self.from_memory(self, validate=False)
for time in range(times - 1):
memory.write(offset, self)
offset += size
return memory
else:
return self.__class__()
[docs]
def __or__(
self,
value: Union[AnyBytes, ImmutableMemory],
) -> 'Memory':
memory = self.from_memory(self, validate=False)
memory.write(0, value)
return memory
[docs]
def __repr__(
self,
) -> str:
return f'<{self.__class__.__name__}[0x{self.start:X}:0x{self.endex:X}]@0x{id(self):X}>'
[docs]
def __reversed__(
self,
) -> Iterator[Optional[Value]]:
yield from self.rvalues(start=self.start, endex=self.endex)
[docs]
def __setitem__(
self,
key: Union[Address, slice],
value: Optional[Union[AnyBytes, Value]],
) -> None:
if isinstance(key, slice):
start = key.start
if start is None:
start = self.start
endex = key.stop
if endex is None:
endex = self.endex
if endex < start:
endex = start
step = key.step
if step is None or step == 1:
step = None
elif step < 1:
return # empty range
if value is None:
# Clear range
if step is None:
self._erase(start, endex, False) # clear
else:
for address in range(start, endex, step):
self._erase(address, address + 1, False) # clear
return # nothing to write
slice_size = endex - start
if step is not None:
slice_size = (slice_size + step - 1) // step
if isinstance(value, Value):
value = bytearray((value,))
value *= slice_size
value_size = len(value)
if value_size < slice_size:
# Shrink: remove excess, overwrite existing
if step is None:
del_start = start + value_size
del_endex = del_start + (slice_size - value_size)
self._erase(del_start, del_endex, True) # delete
self.write(start, value)
else:
raise ValueError(f'attempt to assign bytes of size {value_size}'
f' to extended slice of size {slice_size}')
elif slice_size < value_size:
# Enlarge: insert excess, overwrite existing
if step is None:
self.insert(endex, value[slice_size:])
self.write(start, value[:slice_size])
else:
raise ValueError(f'attempt to assign bytes of size {value_size}'
f' to extended slice of size {slice_size}')
else:
# Same size: overwrite existing
if step is None:
self.write(start, value)
else:
for offset, item in enumerate(value):
self.poke(start + (step * offset), item)
else:
self.poke(key.__index__(), value)
[docs]
def __str__(
self,
) -> str:
if self.content_size < STR_MAX_CONTENT_SIZE:
bound_start = '' if self._bound_start is None else f'{self._bound_start}, '
bound_endex = '' if self._bound_endex is None else f', {self._bound_endex}'
inner = ', '.join(f'[{block_start}, b{block_data.decode()!r}]'
for block_start, block_data in self._blocks)
return f'<{bound_start}[{inner}]{bound_endex}>'
else:
return repr(self)
[docs]
def _block_index_at(
self,
address: Address,
) -> Optional[BlockIndex]:
blocks = self._blocks
if blocks:
if address < blocks[0][0]: # before first block
return None
block_start, block_data = blocks[-1]
if block_start + len(block_data) <= address: # after last block
return None
else:
return None
# Dichotomic search
left = 0
right = len(blocks)
while left <= right:
center = (left + right) >> 1
block_start, block_data = blocks[center]
if block_start + len(block_data) <= address: # after center block
left = center + 1
elif address < block_start: # before center block
right = center - 1
else: # within center block
return center
return None
[docs]
def _block_index_endex(
self,
address: Address,
) -> BlockIndex:
blocks = self._blocks
if blocks:
if address < blocks[0][0]: # before first block
return 0
block_start, block_data = blocks[-1]
if block_start + len(block_data) <= address: # after last block
return len(blocks)
else:
return 0
# Dichotomic search
left = 0
right = len(blocks)
while left <= right:
center = (left + right) >> 1
block_start, block_data = blocks[center]
if block_start + len(block_data) <= address: # after center block
left = center + 1
elif address < block_start: # before center block
right = center - 1
else: # within center block
return center + 1
return right + 1
[docs]
def _block_index_start(
self,
address: Address,
) -> BlockIndex:
blocks = self._blocks
if blocks:
if address <= blocks[0][0]: # before first block
return 0
block_start, block_data = blocks[-1]
if block_start + len(block_data) <= address: # after last block
return len(blocks)
else:
return 0
# Dichotomic search
left = 0
right = len(blocks)
while left <= right:
center = (left + right) >> 1
block_start, block_data = blocks[center]
if block_start + len(block_data) <= address: # after center block
left = center + 1
elif address < block_start: # before center block
right = center - 1
else: # within center block
return center
return left
[docs]
def _erase(
self,
start: Address,
endex: Address,
shift_after: bool,
) -> None:
r"""Erases an address range.
Low-level method to erase data within the underlying data structure.
Arguments:
start (int):
Start address of the erasure range.
endex (int):
Exclusive end address of the erasure range.
shift_after (bool):
Shifts addresses of blocks after the end of the range,
subtracting the size of the range itself.
If data blocks before and after the address range are
contiguous after erasure, merge the two blocks together.
"""
size = endex - start
if size > 0:
blocks = self._blocks
block_index = self._block_index_start(start)
# Delete final/inner part of deletion start block
if block_index < len(blocks):
block_start, block_data = blocks[block_index]
if start > block_start:
if shift_after:
del block_data[(start - block_start):(endex - block_start)]
else:
block_data = block_data[:(start - block_start)]
blocks.insert(block_index, [block_start, block_data])
block_index += 1 # skip this from inner part
# Delete initial part of deletion end block
inner_start = block_index
for block_index in range(block_index, len(blocks)):
block_start, block_data = blocks[block_index]
if endex <= block_start:
break # inner ends before here
block_endex = block_start + len(block_data)
if endex < block_endex:
offset = endex - block_start
del block_data[:offset]
blocks[block_index][0] += offset # update address
break # inner ends before here
else:
block_index = len(blocks)
inner_endex = block_index
if shift_after:
# Check if inner deletion can be merged
if inner_start and inner_endex < len(blocks):
block_start, block_data = blocks[inner_start - 1]
block_endex = block_start + len(block_data)
block_start2, block_data2 = blocks[inner_endex]
if block_endex + size == block_start2:
block_data += block_data2 # merge deletion boundaries
inner_endex += 1 # add to inner deletion
block_index += 1 # skip address update
# Shift blocks after deletion
for block_index in range(block_index, len(blocks)):
blocks[block_index][0] -= size # update address
# Delete inner full blocks
if inner_start < inner_endex:
del blocks[inner_start:inner_endex]
[docs]
def _place(
self,
address: Address,
data: bytearray,
shift_after: bool,
) -> None:
r"""Places data.
Low-level method to place data into the underlying data structure.
Arguments:
address (int):
Address of the insertion point.
data (:obj:`bytearray`):
Data to insert.
shift_after (bool):
Shifts the addresses of blocks after the insertion point,
adding the size of the inserted data.
"""
size = len(data)
if size:
blocks = self._blocks
block_index = self._block_index_start(address)
if block_index:
block_start, block_data = blocks[block_index - 1]
block_endex = block_start + len(block_data)
if block_endex == address:
# Extend previous block
block_data += data
# Shift blocks after
if shift_after:
for block_index in range(block_index, len(blocks)):
blocks[block_index][0] += size
else:
if block_index < len(blocks):
block_endex += size
block_start2, block_data2 = blocks[block_index]
# Merge with next block
if block_endex == block_start2:
block_data += block_data2
blocks.pop(block_index)
return
if block_index < len(blocks):
block_start, block_data = blocks[block_index]
if address < block_start:
if shift_after:
# Insert a standalone block before
blocks.insert(block_index, [address, data])
else:
if address + len(data) == block_start:
# Merge with next block
blocks[block_index][0] = address
block_data[0:0] = data
else:
# Insert a standalone block before
blocks.insert(block_index, [address, data])
else:
# Insert data into the current block
offset = address - block_start
block_data[offset:offset] = data
# Shift blocks after
if shift_after:
for block_index in range(block_index + 1, len(blocks)):
blocks[block_index][0] += size
else:
# Append a standalone block after
blocks.append([address, data[:]])
[docs]
def _prebound_endex(
self,
start_min: Optional[Address],
size: Address,
) -> None:
bound_endex = self._bound_endex
if bound_endex is not None and size > 0:
start = bound_endex - size
if start_min is not None and start < start_min:
start = start_min
self._erase(start, self.content_endex, False) # clear
[docs]
def _prebound_endex_backup(
self,
start_min: Optional[Address],
size: Address,
) -> ImmutableMemory:
bound_endex = self._bound_endex
if bound_endex is not None and size > 0:
start = bound_endex - size
if start_min is not None and start < start_min:
start = start_min
return self.extract(start=start, endex=None)
else:
return self.__class__()
[docs]
def _prebound_start(
self,
endex_max: Optional[Address],
size: Address,
) -> None:
bound_start = self._bound_start
if bound_start is not None and size > 0:
endex = bound_start + size
if endex_max is not None and endex > endex_max:
endex = endex_max
self._erase(self.content_start, endex, False) # clear
[docs]
def _prebound_start_backup(
self,
endex_max: Optional[Address],
size: Address,
) -> ImmutableMemory:
bound_start = self._bound_start
if bound_start is not None and size > 0:
endex = bound_start + size
if endex_max is not None and endex > endex_max:
endex = endex_max
return self.extract(start=None, endex=endex)
else:
return self.__class__()
[docs]
def align(
self,
modulo: int,
start: Optional[Address] = None,
endex: Optional[Address] = None,
pattern: Union[AnyBytes, Value] = 0,
) -> None:
modulo = modulo.__index__()
if modulo < 1:
raise ValueError('invalid modulo')
if modulo == 1:
return
intervals = list(self.intervals(start=start, endex=endex))
for start, endex in intervals:
start_offset = start % modulo
if start_offset:
start -= start_offset
endex_offset = endex % modulo
if endex_offset:
endex += modulo - endex_offset
if start_offset or endex_offset:
self.flood(start=start, endex=endex, pattern=pattern)
[docs]
def align_backup(
self,
modulo: int,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> List[OpenInterval]:
modulo = modulo.__index__()
if modulo < 1:
raise ValueError('invalid modulo')
start, endex = self.bound(start, endex)
if modulo != 1:
start_offset = start % modulo
if start_offset:
start -= start_offset
endex_offset = endex % modulo
if endex_offset:
endex += modulo - endex_offset
return self.flood_backup(start=start, endex=endex)
[docs]
def align_restore(
self,
gaps: List[OpenInterval],
) -> None:
self.flood_restore(gaps)
[docs]
def append(
self,
item: Union[AnyBytes, Value],
) -> None:
if not isinstance(item, Value):
if len(item) != 1:
raise ValueError('expecting single item')
item = item[0]
blocks = self._blocks
if blocks:
blocks[-1][1].append(item)
else:
blocks.append([0, bytearray((item,))])
# noinspection PyMethodMayBeStatic
[docs]
def append_backup(
self,
) -> None:
return None
[docs]
def append_restore(
self,
) -> None:
self.pop()
[docs]
def block_span(
self,
address: Address,
) -> Tuple[Optional[Address], Optional[Address], Optional[Value]]:
block_index = self._block_index_start(address)
blocks = self._blocks
if block_index < len(blocks):
block_start, block_data = blocks[block_index]
block_endex = block_start + len(block_data)
if block_start <= address < block_endex:
# Address within a block
value = block_data[address - block_start]
return block_start, block_endex, value # block span
elif block_index:
# Address within a gap
block_endex = block_start # end gap before next block
block_start, block_data = blocks[block_index - 1]
block_start += len(block_data) # start gap after previous block
return block_start, block_endex, None # gap span
else:
# Address before content
return None, block_start, None # open left
else:
# Address after content
if blocks:
block_start, block_data = blocks[-1]
block_endex = block_start + len(block_data)
return block_endex, None, None # open right
else:
return None, None, None # fully open
[docs]
def blocks(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> Iterator[Block]:
blocks = self._blocks
if blocks:
if start is None and endex is None: # faster
for block_start, block_data in blocks:
yield [block_start, memoryview(block_data)]
else:
block_index_start = 0 if start is None else self._block_index_start(start)
block_index_endex = len(blocks) if endex is None else self._block_index_endex(endex)
start, endex = self.bound(start, endex)
block_iterator = _islice(blocks, block_index_start, block_index_endex)
for block_start, block_data in block_iterator:
block_endex = block_start + len(block_data)
slice_start = block_start if start < block_start else start
slice_endex = endex if endex < block_endex else block_endex
if slice_start < slice_endex:
slice_view = memoryview(block_data)
slice_view = slice_view[(slice_start - block_start):(slice_endex - block_start)]
yield [slice_start, slice_view]
[docs]
def bound(
self,
start: Optional[Address],
endex: Optional[Address],
) -> ClosedInterval:
blocks = self._blocks
bound_start = self._bound_start
bound_endex = self._bound_endex
if start is None:
if bound_start is None:
if blocks:
start = blocks[0][0]
else:
start = 0
else:
start = bound_start
else:
if bound_start is not None:
if start < bound_start:
start = bound_start
if endex is not None:
if endex < start:
endex = start
if endex is None:
if bound_endex is None:
if blocks:
block_start, block_data = blocks[-1]
endex = block_start + len(block_data)
else:
endex = start
else:
endex = bound_endex
else:
if bound_endex is not None:
if endex > bound_endex:
endex = bound_endex
if start > endex:
start = endex
return start, endex
@ImmutableMemory.bound_endex.getter
def bound_endex(
self,
) -> Optional[Address]:
return self._bound_endex
@bound_endex.setter
def bound_endex(
self,
bound_endex: Optional[Address],
) -> None:
bound_start = self._bound_start
if bound_start is not None and bound_endex is not None and bound_endex < bound_start:
self._bound_start = bound_start = bound_endex
self._bound_endex = bound_endex
if bound_endex is not None:
self.crop(start=bound_start, endex=bound_endex)
@ImmutableMemory.bound_span.getter
def bound_span(
self,
) -> OpenInterval:
return self._bound_start, self._bound_endex
@bound_span.setter
def bound_span(
self,
bound_span: OpenInterval,
) -> None:
if bound_span is None:
bound_span = (None, None)
bound_start, bound_endex = bound_span
if bound_start is not None and bound_endex is not None and bound_endex < bound_start:
bound_endex = bound_start
self._bound_start = bound_start
self._bound_endex = bound_endex
if bound_start is not None or bound_endex is not None:
self.crop(start=bound_start, endex=bound_endex)
@ImmutableMemory.bound_start.getter
def bound_start(
self,
) -> Optional[Address]:
return self._bound_start
@bound_start.setter
def bound_start(
self,
bound_start: Optional[Address],
) -> None:
bound_endex = self._bound_endex
if bound_start is not None and bound_endex is not None and bound_endex < bound_start:
self._bound_endex = bound_endex = bound_start
self._bound_start = bound_start
if bound_start is not None:
self.crop(start=bound_start, endex=bound_endex)
[docs]
def chop(
self,
width: Address,
start: Optional[Address] = None,
endex: Optional[Address] = None,
align: bool = False,
) -> Iterator[Tuple[Address, memoryview]]:
if width < 1:
raise ValueError('invalid width')
for block_start, block_view in self.blocks(start=start, endex=endex):
block_size = len(block_view)
chunk_offset = 0
if align:
chunk_offset = block_start % width
if chunk_offset:
chunk_offset = width - chunk_offset
chunk_view = block_view[:chunk_offset]
yield block_start, chunk_view
while chunk_offset < block_size:
chunk_after = chunk_offset + width
chunk_view = block_view[chunk_offset:chunk_after]
yield block_start + chunk_offset, chunk_view
chunk_offset = chunk_after
[docs]
def clear(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> None:
if start is None:
start = self.start
if endex is None:
endex = self.endex
if start < endex:
self._erase(start, endex, False) # clear
[docs]
def clear_backup(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> ImmutableMemory:
return self.extract(start=start, endex=endex)
[docs]
def clear_restore(
self,
backup: ImmutableMemory,
) -> None:
self.write(0, backup, clear=True)
[docs]
@classmethod
def collapse_blocks(
cls,
blocks: BlockIterable,
) -> BlockList:
memory = cls()
for block_start, block_data in blocks:
memory.write(block_start, block_data)
return memory._blocks
[docs]
def content_blocks(
self,
block_index_start: Optional[BlockIndex] = None,
block_index_endex: Optional[BlockIndex] = None,
block_index_step: Optional[BlockIndex] = None,
) -> Iterator[Union[Tuple[Address, Union[memoryview, bytearray]], Block]]:
blocks = self._blocks
if block_index_start is not None and block_index_start < 0:
block_index_start = len(blocks) + block_index_start
if block_index_endex is not None and block_index_endex < 0:
block_index_endex = len(blocks) + block_index_endex
yield from _islice(blocks, block_index_start, block_index_endex, block_index_step)
@ImmutableMemory.content_endex.getter
def content_endex(
self,
) -> Address:
blocks = self._blocks
if blocks:
block_start, block_data = blocks[-1]
return block_start + len(block_data)
elif self._bound_start is None: # default to start
return 0
else:
return self._bound_start # default to start
@ImmutableMemory.content_endin.getter
def content_endin(
self,
) -> Address:
blocks = self._blocks
if blocks:
block_start, block_data = blocks[-1]
return block_start + len(block_data) - 1
elif self._bound_start is None: # default to start-1
return -1
else:
return self._bound_start - 1 # default to start-1
[docs]
def content_items(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> Iterator[Tuple[Address, Value]]:
blocks = self._blocks
if blocks:
if start is None and endex is None: # faster
for block_start, block_data in blocks:
address = block_start + 0
for value in block_data:
yield address, value
address += 1
else:
block_index_start = 0 if start is None else self._block_index_start(start)
block_index_endex = len(blocks) if endex is None else self._block_index_endex(endex)
start, endex = self.bound(start, endex)
block_iterator = _islice(blocks, block_index_start, block_index_endex)
for block_start, block_data in block_iterator:
block_endex = block_start + len(block_data)
slice_start = block_start if start < block_start else start
slice_endex = endex if endex < block_endex else block_endex
if slice_start < slice_endex:
slice_view = memoryview(block_data)
address = slice_start + 0
for value in slice_view[(slice_start - block_start):(slice_endex - block_start)]:
yield address, value
address += 1
[docs]
def content_keys(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> Iterator[Address]:
blocks = self._blocks
if blocks:
if start is None and endex is None: # faster
for block_start, block_data in blocks:
block_endex = block_start + len(block_data)
yield from range(block_start, block_endex)
else:
block_index_start = 0 if start is None else self._block_index_start(start)
block_index_endex = len(blocks) if endex is None else self._block_index_endex(endex)
start, endex = self.bound(start, endex)
block_iterator = _islice(blocks, block_index_start, block_index_endex)
for block_start, block_data in block_iterator:
block_endex = block_start + len(block_data)
slice_start = block_start if start < block_start else start
slice_endex = endex if endex < block_endex else block_endex
yield from range(slice_start, slice_endex)
@ImmutableMemory.content_parts.getter
def content_parts(
self,
) -> int:
return len(self._blocks)
@ImmutableMemory.content_size.getter
def content_size(
self,
) -> Address:
return sum(len(block_data) for _, block_data in self._blocks)
@ImmutableMemory.content_span.getter
def content_span(
self,
) -> ClosedInterval:
return self.content_start, self.content_endex
@ImmutableMemory.content_start.getter
def content_start(
self,
) -> Address:
blocks = self._blocks
if blocks:
return blocks[0][0]
elif self._bound_start is None:
return 0
else:
return self._bound_start
[docs]
def content_values(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> Iterator[Value]:
blocks = self._blocks
if blocks:
if start is None and endex is None: # faster
for block in blocks:
yield from block[1]
else:
block_index_start = 0 if start is None else self._block_index_start(start)
block_index_endex = len(blocks) if endex is None else self._block_index_endex(endex)
start, endex = self.bound(start, endex)
block_iterator = _islice(blocks, block_index_start, block_index_endex)
for block_start, block_data in block_iterator:
block_endex = block_start + len(block_data)
slice_start = block_start if start < block_start else start
slice_endex = endex if endex < block_endex else block_endex
if slice_start < slice_endex:
slice_view = memoryview(block_data)
yield from slice_view[(slice_start - block_start):(slice_endex - block_start)]
@ImmutableMemory.contiguous.getter
def contiguous(
self,
) -> bool:
start = self.start
endex = self.endex
if start < endex:
block_index = self._block_index_at(start)
if block_index is not None:
block_start, block_data = self._blocks[block_index]
if endex <= block_start + len(block_data):
return True
return False
else:
return True
[docs]
def count(
self,
item: Union[AnyBytes, Value],
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> int:
# Faster code for unbounded slice
if start is None and endex is None:
return sum(block_data.count(item) for _, block_data in self._blocks)
# Bounded slice
count = 0
start, endex = self.bound(start, endex)
block_index_start = self._block_index_start(start)
block_index_endex = self._block_index_endex(endex)
block_iterator = _islice(self._blocks, block_index_start, block_index_endex)
for block_start, block_data in block_iterator:
slice_start = start - block_start
if slice_start < 0:
slice_start = 0
slice_endex = endex - block_start
# if slice_endex < slice_start:
# slice_endex = slice_start
count += block_data.count(item, slice_start, slice_endex)
return count
[docs]
def copy(
self,
) -> 'Memory':
return self.__deepcopy__()
[docs]
def crop(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> None:
blocks = self._blocks # may change during execution
# Bound blocks exceeding before memory start
if start is not None and blocks:
block_start = blocks[0][0]
if block_start < start:
self._erase(block_start, start, False) # clear
# Bound blocks exceeding after memory end
if endex is not None and blocks:
block_start, block_data = blocks[-1]
block_endex = block_start + len(block_data)
if endex < block_endex:
self._erase(endex, block_endex, False) # clear
[docs]
def crop_backup(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> Tuple[Optional[ImmutableMemory], Optional[ImmutableMemory]]:
backup_start = None
backup_endex = None
blocks = self._blocks # may change
if blocks:
if start is not None:
block_start = blocks[0][0]
if block_start < start:
backup_start = self.extract(start=block_start, endex=start)
if endex is not None:
block_start, block_data = blocks[-1]
block_endex = block_start + len(block_data)
if endex < block_endex:
backup_endex = self.extract(start=endex, endex=block_endex)
return backup_start, backup_endex
[docs]
def crop_restore(
self,
backup_start: Optional[ImmutableMemory],
backup_endex: Optional[ImmutableMemory],
) -> None:
if backup_start is not None:
self.write(0, backup_start, clear=True)
if backup_endex is not None:
self.write(0, backup_endex, clear=True)
[docs]
def cut(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
bound: bool = True,
) -> 'Memory':
start_ = start
endex_ = endex
if start is None:
start = self.start
if endex is None:
endex = self.endex
if endex < start:
endex = start
memory = self.__class__()
blocks = self._blocks
if start < endex and blocks:
# Copy all the blocks except those completely outside selection
block_index_start = 0 if start_ is None else self._block_index_start(start)
block_index_endex = len(blocks) if endex_ is None else self._block_index_endex(endex)
if block_index_start < block_index_endex:
memory_blocks = _islice(blocks, block_index_start, block_index_endex)
memory_blocks = [[block_start, block_data]
for block_start, block_data in memory_blocks]
# Bound cloned data before the selection start address
block_start, block_data = memory_blocks[0]
if block_start < start:
memory_blocks[0] = [start, block_data[(start - block_start):]]
# Bound cloned data after the selection end address
block_start, block_data = memory_blocks[-1]
block_endex = block_start + len(block_data)
if endex < block_endex:
if block_start < endex:
memory_blocks[-1] = [block_start, block_data[:(endex - block_start)]]
else:
memory_blocks.pop()
memory._blocks = memory_blocks
self._erase(start, endex, False) # clear
if bound:
memory._bound_start = start
memory._bound_endex = endex
return memory
[docs]
def delete(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> None:
if start is None:
start = self.start
if endex is None:
endex = self.endex
if start < endex:
self._erase(start, endex, True) # delete
[docs]
def delete_backup(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> ImmutableMemory:
return self.extract(start=start, endex=endex)
[docs]
def delete_restore(
self,
backup: ImmutableMemory,
) -> None:
self.reserve(backup.start, len(backup))
self.write(0, backup, clear=True)
@ImmutableMemory.endex.getter
def endex(
self,
) -> Address:
bound_endex = self._bound_endex
if bound_endex is None:
# Return actual
blocks = self._blocks
if blocks:
block_start, block_data = blocks[-1]
return block_start + len(block_data)
else:
return self.start
else:
return bound_endex
@ImmutableMemory.endin.getter
def endin(
self,
) -> Address:
bound_endex = self._bound_endex
if bound_endex is None:
# Return actual
blocks = self._blocks
if blocks:
block_start, block_data = blocks[-1]
return block_start + len(block_data) - 1
else:
return self.start - 1
else:
return bound_endex - 1
[docs]
def equal_span(
self,
address: Address,
) -> Tuple[Optional[Address], Optional[Address], Optional[Value]]:
block_index = self._block_index_start(address)
blocks = self._blocks
if block_index < len(blocks):
block_start, block_data = blocks[block_index]
block_endex = block_start + len(block_data)
if block_start <= address < block_endex:
# Address within a block
offset = address - block_start
start = offset
endex = offset + 1
value = block_data[offset]
for start in range(start, -1, -1):
if block_data[start] != value:
start += 1
break
else:
start = 0
for endex in range(endex, len(block_data)):
if block_data[endex] != value:
break
else:
endex = len(block_data)
block_endex = block_start + endex
block_start = block_start + start
return block_start, block_endex, value # equal data span
elif block_index:
# Address within a gap
block_endex = block_start # end gap before next block
block_start, block_data = blocks[block_index - 1]
block_start += len(block_data) # start gap after previous block
return block_start, block_endex, None # gap span
else:
# Address before content
return None, block_start, None # open left
else:
# Address after content
if blocks:
block_start, block_data = blocks[-1]
block_endex = block_start + len(block_data)
return block_endex, None, None # open right
else:
return None, None, None # fully open
[docs]
def extend(
self,
items: Union[AnyBytes, ImmutableMemory],
offset: Address = 0,
) -> None:
if offset < 0:
raise ValueError('negative extension offset')
self.write(self.content_endex + offset, items, clear=True)
[docs]
def extend_backup(
self,
offset: Address = 0,
) -> Address:
if offset < 0:
raise ValueError('negative extension offset')
return self.content_endex + offset
[docs]
def extend_restore(
self,
content_endex: Address,
) -> None:
self.clear(content_endex)
[docs]
def fill(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
pattern: Union[AnyBytes, Value] = 0,
) -> None:
start_ = start
start, endex = self.bound(start, endex)
if start < endex:
if isinstance(pattern, Value):
pattern = bytearray((pattern,))
pattern_size = len(pattern)
else:
pattern_size = len(pattern)
if not pattern_size:
raise ValueError('non-empty pattern required')
pattern = bytearray(pattern)
if start_ is not None and start > start_:
offset = (start - start_) % pattern_size
pattern = pattern[offset:] + pattern[:offset] # rotate
# Resize the pattern to the target range
size = endex - start
if pattern_size < size:
pattern *= (size + (pattern_size - 1)) // pattern_size
del pattern[size:]
# Standard write method
self._erase(start, endex, False) # clear
self._place(start, pattern, False) # write
[docs]
def fill_backup(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> ImmutableMemory:
return self.extract(start=start, endex=endex)
[docs]
def fill_restore(
self,
backup: ImmutableMemory,
) -> None:
self.write(0, backup, clear=True)
[docs]
def find(
self,
item: Union[AnyBytes, Value],
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> Address:
try:
return self.index(item, start=start, endex=endex)
except ValueError:
return -1
[docs]
def flood(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
pattern: Union[AnyBytes, Value] = 0,
) -> None:
start, endex = self.bound(start, endex)
if start < endex:
if isinstance(pattern, Value):
pattern = (pattern,)
else:
if not pattern:
raise ValueError('non-empty pattern required')
pattern = bytearray(pattern)
pattern_size = len(pattern)
blocks = self._blocks
block_index_start = self._block_index_start(start)
# Check if touching previous block
if block_index_start:
block_start, block_data = blocks[block_index_start - 1]
block_endex = block_start + len(block_data)
if block_endex == start:
block_index_start -= 1
# Manage block near start
if block_index_start < len(blocks):
block_start, block_data = blocks[block_index_start]
block_endex = block_start + len(block_data)
if block_start <= start and endex <= block_endex:
return # no emptiness to flood
if block_start < start:
offset = (block_start - start) % pattern_size
pattern = pattern[offset:] + pattern[:offset] # rotate
start = block_start
# Manage block near end
block_index_endex = self._block_index_endex(endex)
if block_index_start < block_index_endex:
block_start, block_data = blocks[block_index_endex - 1]
block_endex = block_start + len(block_data)
if endex < block_endex:
endex = block_endex
size = endex - start
pattern *= (size + (pattern_size - 1)) // pattern_size
del pattern[size:]
blocks_inner = blocks[block_index_start:block_index_endex]
blocks[block_index_start:block_index_endex] = [[start, pattern]]
for block_start, block_data in blocks_inner:
block_endex = block_start + len(block_data)
pattern[(block_start - start):(block_endex - start)] = block_data
[docs]
def flood_backup(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> List[OpenInterval]:
return list(self.gaps(start=start, endex=endex))
[docs]
def flood_restore(
self,
gaps: List[OpenInterval],
) -> None:
for gap_start, gap_endex in gaps:
self.clear(start=gap_start, endex=gap_endex)
[docs]
@classmethod
def from_blocks(
cls,
blocks: BlockSequence,
offset: Address = 0,
start: Optional[Address] = None,
endex: Optional[Address] = None,
copy: bool = True,
validate: bool = True,
) -> 'Memory':
offset = Address(offset)
if copy:
blocks = [[block_start + offset, bytearray(block_data)]
for block_start, block_data in blocks]
elif offset:
blocks = [[block_start + offset, block_data]
for block_start, block_data in blocks]
memory = cls(start=start, endex=endex)
memory._blocks = blocks
if start is not None or endex is not None:
memory.crop(start, endex)
if validate:
memory.validate()
return memory
[docs]
@classmethod
def from_bytes(
cls,
data: AnyBytes,
offset: Address = 0,
start: Optional[Address] = None,
endex: Optional[Address] = None,
copy: bool = True,
validate: bool = True,
) -> 'Memory':
if data:
if copy:
data = bytearray(data)
blocks = [[offset, data]]
else:
blocks = []
return cls.from_blocks(blocks, start=start, endex=endex, copy=False, validate=validate)
[docs]
@classmethod
def from_items(
cls,
items: Union[AddressValueMapping,
Iterable[Tuple[Address, Optional[Value]]],
Mapping[Address, Optional[Union[Value, AnyBytes]]],
ImmutableMemory],
offset: Address = 0,
start: Optional[Address] = None,
endex: Optional[Address] = None,
validate: bool = True,
) -> 'Memory':
blocks = []
items = dict(items)
keys = [key for key, value in items.items() if value is not None]
if keys:
keys.sort()
key_seq = keys[0]
block_start = key_seq
block_data = bytearray()
for key in keys:
if key == key_seq:
block_data.append(items[key])
key_seq = key_seq + 1
else:
blocks.append([block_start + offset, block_data])
block_start = key
block_data = bytearray()
block_data.append(items[key])
key_seq = key + 1
blocks.append([block_start + offset, block_data])
return cls.from_blocks(blocks, start=start, endex=endex, copy=False, validate=validate)
[docs]
@classmethod
def from_memory(
cls,
memory: Union[ImmutableMemory, 'Memory'],
offset: Address = 0,
start: Optional[Address] = None,
endex: Optional[Address] = None,
copy: bool = True,
validate: bool = True,
) -> 'Memory':
offset = Address(offset)
is_memory = isinstance(memory, Memory)
if copy or not is_memory:
memory_blocks = memory._blocks if is_memory else memory.blocks()
blocks = [[block_start + offset, bytearray(block_data)]
for block_start, block_data in memory_blocks]
else:
if offset:
blocks = [[block_start + offset, block_data]
for block_start, block_data in memory._blocks]
else:
blocks = memory._blocks
return cls.from_blocks(blocks, start=start, endex=endex, copy=False, validate=validate)
[docs]
@classmethod
def from_values(
cls,
values: Iterable[Optional[Value]],
offset: Address = 0,
start: Optional[Address] = None,
endex: Optional[Address] = None,
validate: bool = True,
) -> 'Memory':
blocks = []
block_start = offset
block_data = bytearray()
for value in values:
offset += 1
if start is not None and offset <= start:
block_start = offset
continue
if endex is not None and offset > endex:
break
if value is None:
if block_data:
blocks.append([block_start, block_data])
block_data = bytearray()
block_start = offset
else:
block_data.append(value)
if block_data:
blocks.append([block_start, block_data])
return cls.from_blocks(blocks, start=start, endex=endex, copy=False, validate=validate)
[docs]
@classmethod
def fromhex(
cls,
string: str,
) -> 'Memory':
data = bytearray.fromhex(string)
obj = cls()
if data:
obj._blocks.append([0, data])
return obj
[docs]
def gaps(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> Iterator[OpenInterval]:
blocks = self._blocks
if blocks:
start_ = start
endex_ = endex
start, endex = self.bound(start, endex)
if start_ is None:
start = blocks[0][0] # override bound start
yield None, start
block_index_start = 0
else:
block_index_start = self._block_index_start(start)
if endex_ is None:
block_index_endex = len(blocks)
else:
block_index_endex = self._block_index_endex(endex)
block_iterator = _islice(blocks, block_index_start, block_index_endex)
for block_start, block_data in block_iterator:
if start < block_start:
yield start, block_start
start = block_start + len(block_data)
if endex_ is None:
yield start, None
elif start < endex:
yield start, endex
else:
yield None, None
[docs]
def get(
self,
address: Address,
default: Optional[Value] = None,
) -> Optional[Value]:
block_index = self._block_index_at(address)
if block_index is None:
return default
else:
block_start, block_data = self._blocks[block_index]
return block_data[address - block_start]
[docs]
def hex(
self,
*args: Any, # see docstring
) -> str:
block_count = len(self._blocks)
if not block_count:
return ''
if block_count > 1:
raise ValueError('non-contiguous data within range')
data = self._blocks[0][1]
return data.hex(*args)
[docs]
def hexdump(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
columns: int = 16,
addrfmt: str = '{:08X} ',
bytefmt: str = ' {:02X}',
headfmt: Optional[Union[str, EllipsisType]] = None,
charmap: Optional[Mapping[int, str]] = HUMAN_ASCII,
emptystr: str = ' --',
beforestr: str = ' >>',
afterstr: str = ' <<',
charsep: str = ' |',
charend: str = '|',
stream: Optional[Union[io.TextIOBase, EllipsisType]] = Ellipsis,
) -> Optional[str]:
if columns < 1:
raise ValueError('invalid columns')
if start is None:
start = self.start
if endex is None:
endex = self.endex
if endex <= start:
endex = start
elif endex < start + columns:
endex = start + columns
if stream is Ellipsis:
stream = sys.stdout
addrfmt_format = addrfmt.format
bytefmt_format = bytefmt.format
bytemap = [bytefmt_format(i) for i in range(0x100)]
tokens = []
append = tokens.append if stream is None else stream.write
self_values = self.values(start=start, endex=...)
bound_start = self._bound_start
bound_endex = self._bound_endex
values = [0x100] * columns
address = int(start)
if headfmt:
if headfmt is Ellipsis:
headfmt = bytefmt
append(' ' * len(addrfmt_format(address)))
if ((columns - 1) & columns) == 0: # power of 2
for i in range(columns):
append(headfmt.format((address + i) % columns))
else: # header value offset makes no sense
for i in range(columns):
append(headfmt.format(i))
append('\n')
while address < endex:
append(addrfmt_format(address))
for i in range(columns):
byteval = next(self_values)
if byteval is not None:
append(bytemap[byteval])
elif bound_start is not None and address < bound_start:
append(beforestr)
byteval = 0x101
elif bound_endex is not None and address >= bound_endex:
append(afterstr)
byteval = 0x102
else:
append(emptystr)
byteval = 0x100
values[i] = byteval
address += 1
if charmap is not None:
append(charsep)
for byteval in values:
append(charmap[byteval])
append(charend)
append('\n')
return ''.join(tokens) if stream is None else None
[docs]
def index(
self,
item: Union[AnyBytes, Value],
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> Address:
# Faster code for unbounded slice
if start is None and endex is None:
for block_start, block_data in self._blocks:
try:
offset = block_data.index(item)
except ValueError:
continue
else:
return block_start + offset
raise ValueError('subsection not found')
# Bounded slice
start, endex = self.bound(start, endex)
block_index_start = self._block_index_start(start)
block_index_endex = self._block_index_endex(endex)
block_iterator = _islice(self._blocks, block_index_start, block_index_endex)
for block_start, block_data in block_iterator:
slice_start = 0 if start < block_start else start - block_start
slice_endex = endex - block_start
try:
offset = block_data.index(item, slice_start, slice_endex)
except ValueError:
pass
else:
return block_start + offset
else:
raise ValueError('subsection not found')
[docs]
def insert(
self,
address: Address,
data: Union[AnyBytes, Value, ImmutableMemory],
) -> None:
size = 1 if isinstance(data, Value) else len(data)
self.reserve(address, size)
self.write(address, data, clear=True)
[docs]
def insert_backup(
self,
address: Address,
data: Union[AnyBytes, Value, ImmutableMemory],
) -> Tuple[Address, ImmutableMemory]:
size = 1 if isinstance(data, Value) else len(data)
backup = self._prebound_endex_backup(address, size)
return address, backup
[docs]
def insert_restore(
self,
address: Address,
backup: ImmutableMemory,
) -> None:
self.delete(start=address, endex=(address + len(backup)))
self.write(0, backup, clear=True)
[docs]
def intervals(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> Iterator[ClosedInterval]:
blocks = self._blocks
if blocks:
block_index_start = 0 if start is None else self._block_index_start(start)
block_index_endex = len(blocks) if endex is None else self._block_index_endex(endex)
start, endex = self.bound(start, endex)
block_iterator = _islice(blocks, block_index_start, block_index_endex)
for block_start, block_data in block_iterator:
block_endex = block_start + len(block_data)
slice_start = block_start if start < block_start else start
slice_endex = endex if endex < block_endex else block_endex
if slice_start < slice_endex:
yield slice_start, slice_endex
[docs]
def items(
self,
start: Optional[Address] = None,
endex: Optional[Union[Address, EllipsisType]] = None,
pattern: Optional[Union[AnyBytes, Value]] = None,
) -> Iterator[Tuple[Address, Optional[Value]]]:
if start is None:
start = self.start
if endex is None:
endex = self.endex
keys = self.keys(start=start, endex=endex)
values = self.values(start=start, endex=endex, pattern=pattern)
yield from zip(keys, values)
[docs]
def keys(
self,
start: Optional[Address] = None,
endex: Optional[Union[Address, EllipsisType]] = None,
) -> Iterator[Address]:
if start is None:
start = self.start
if endex is Ellipsis:
yield from _count(start)
else:
if endex is None:
endex = self.endex
yield from range(start, endex)
[docs]
def peek(
self,
address: Address,
) -> Optional[Value]:
block_index = self._block_index_at(address)
if block_index is None:
return None
else:
block_start, block_data = self._blocks[block_index]
return block_data[address - block_start]
[docs]
def poke(
self,
address: Address,
item: Optional[Union[AnyBytes, Value]],
) -> None:
if self._bound_start is not None and address < self._bound_start:
return
if self._bound_endex is not None and address >= self._bound_endex:
return
if item is None:
# Standard clear method
self._erase(address, address + 1, False) # clear
else:
if not isinstance(item, Value):
if len(item) != 1:
raise ValueError('expecting single item')
item = item[0]
blocks = self._blocks
block_index = self._block_index_endex(address) - 1
if 0 <= block_index < len(blocks):
block_start, block_data = blocks[block_index]
block_endex = block_start + len(block_data)
if block_start <= address < block_endex:
# Address within existing block, update directly
address -= block_start
block_data[address] = item
return
elif address == block_endex:
# Address just after the end of the block, append
block_data.append(item)
block_index += 1
if block_index < len(blocks):
block_start2, block_data2 = blocks[block_index]
if block_endex + 1 == block_start2:
# Merge with the following contiguous block
block_data += block_data2
blocks.pop(block_index)
return
else:
block_index += 1
if block_index < len(blocks):
block = blocks[block_index]
block_start, block_data = block
if address + 1 == block_start:
# Prepend to the next block
block_data.insert(0, item)
block[0] -= 1 # update address
return
# There is no faster way than the standard block writing method
self._erase(address, address + 1, False) # clear
self._place(address, bytearray((item,)), False) # write
self.crop(start=self._bound_start, endex=self._bound_endex)
[docs]
def poke_backup(
self,
address: Address,
) -> Tuple[Address, Optional[Value]]:
return address, self.peek(address)
[docs]
def poke_restore(
self,
address: Address,
item: Optional[Value],
) -> None:
self.poke(address, item)
[docs]
def pop(
self,
address: Optional[Address] = None,
default: Optional[Value] = None,
) -> Optional[Value]:
if address is None:
blocks = self._blocks
if blocks:
block_data = blocks[-1][1]
backup = block_data.pop()
if not block_data:
blocks.pop()
return backup
else:
return default
else:
backup = self.peek(address)
self._erase(address, address + 1, True) # delete
return default if backup is None else backup
[docs]
def pop_backup(
self,
address: Optional[Address] = None,
) -> Tuple[Address, Optional[Value]]:
if address is None:
address = self.endex - 1
return address, self.peek(address)
[docs]
def pop_restore(
self,
address: Address,
item: Optional[Value],
) -> None:
if item is None:
self.reserve(address, 1)
else:
if address == self.content_endex:
self.append(item)
else:
self.insert(address, item)
[docs]
def popitem(
self,
) -> Tuple[Address, Value]:
blocks = self._blocks
if blocks:
block_start, block_data = blocks[-1]
value = block_data.pop()
address = block_start + len(block_data)
if not block_data:
blocks.pop()
return address, value
raise KeyError('empty')
[docs]
def popitem_backup(
self,
) -> Tuple[Address, Value]:
blocks = self._blocks
if blocks:
block_start, block_data = blocks[-1]
value = block_data[-1]
address = block_start + len(block_data) - 1
return address, value
raise KeyError('empty')
[docs]
def popitem_restore(
self,
address: Address,
item: Value,
) -> None:
if address == self.content_endex:
self.append(item)
else:
self.insert(address, item)
[docs]
def read(
self,
address: Address,
size: Address,
) -> memoryview:
return self.view(start=address, endex=(address + size))
[docs]
def readinto(
self,
address: Address,
buffer: AnyBytes,
) -> int:
size = len(buffer)
view = self.view(start=address, endex=(address + size))
buffer[:] = view
return size
[docs]
def remove(
self,
item: Union[AnyBytes, Value],
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> None:
address = self.index(item, start=start, endex=endex)
size = 1 if isinstance(item, Value) else len(item)
self._erase(address, address + size, True) # delete
[docs]
def remove_backup(
self,
item: Union[AnyBytes, Value],
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> ImmutableMemory:
address = self.index(item, start=start, endex=endex)
size = 1 if isinstance(item, Value) else len(item)
return self.extract(start=address, endex=(address + size))
[docs]
def remove_restore(
self,
backup: ImmutableMemory,
) -> None:
self.reserve(backup.start, len(backup))
self.write(0, backup, clear=True)
[docs]
def reserve(
self,
address: Address,
size: Address,
) -> None:
blocks = self._blocks
if size > 0 and blocks:
self._prebound_endex(address, size)
block_index = self._block_index_start(address)
if block_index < len(blocks):
block = blocks[block_index]
block_start, block_data = block
if address > block_start:
# Split into two blocks, reserving emptiness
offset = address - block_start
data_after = block_data[offset:]
del block_data[offset:]
block_index += 1
blocks.insert(block_index, [address + size, data_after])
block_index += 1
for block_index in range(block_index, len(blocks)):
blocks[block_index][0] += size
[docs]
def reserve_backup(
self,
address: Address,
size: Address,
) -> Tuple[Address, ImmutableMemory]:
backup = self._prebound_endex_backup(address, size)
return address, backup
[docs]
def reserve_restore(
self,
address: Address,
backup: ImmutableMemory,
) -> None:
self.delete(address, address + len(backup))
self.write(0, backup, clear=True)
[docs]
def reverse(
self,
) -> None:
blocks = self._blocks
if blocks:
start, endex = self.span
for block in blocks:
block_start, block_data = block
block_data.reverse()
block[0] = endex - block_start - len(block_data) + start
blocks.reverse()
[docs]
def rfind(
self,
item: Union[AnyBytes, Value],
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> Address:
try:
return self.rindex(item, start=start, endex=endex)
except ValueError:
return -1
[docs]
def rindex(
self,
item: Union[AnyBytes, Value],
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> Address:
# Faster code for unbounded slice
if start is None and endex is None:
for block_start, block_data in reversed(self._blocks):
try:
offset = block_data.index(item)
except ValueError:
continue
else:
return block_start + offset
else:
raise ValueError('subsection not found')
# Bounded slice
start, endex = self.bound(start, endex)
block_index_start = self._block_index_start(start)
block_index_endex = self._block_index_endex(endex)
blocks = self._blocks
for block_index in reversed(range(block_index_start, block_index_endex)):
block_start, block_data = blocks[block_index]
slice_start = 0 if start < block_start else start - block_start
slice_endex = endex - block_start
try:
offset = block_data.rindex(item, slice_start, slice_endex)
except ValueError:
pass
else:
return block_start + offset
else:
raise ValueError('subsection not found')
[docs]
def rvalues(
self,
start: Optional[Union[Address, EllipsisType]] = None,
endex: Optional[Address] = None,
pattern: Optional[Union[AnyBytes, Value]] = None,
) -> Iterator[Optional[Value]]:
if pattern is None:
pattern_size = 0
else:
if isinstance(pattern, Value):
pattern = (pattern,)
pattern = bytearray(pattern)
if not pattern:
raise ValueError('non-empty pattern required')
pattern.reverse()
pattern_size = len(pattern)
start_ = start
if start is None or start is Ellipsis:
start = self.start
blocks = self._blocks
if endex is None:
endex = self.endex
block_index = len(blocks)
else:
block_index = self._block_index_endex(endex)
if 0 < block_index:
block_start, block_data = blocks[block_index - 1]
block_endex = block_start + len(block_data)
if block_endex < endex:
yield from _repeat2(pattern, pattern_size - (endex - start), endex - block_endex)
yield from reversed(block_data)
else:
yield from reversed(memoryview(block_data)[:(endex - block_start)])
endex = block_start
for block_index in range(block_index - 2, -1, -1):
block_start, block_data = blocks[block_index]
block_endex = block_start + len(block_data)
yield from _repeat2(pattern, pattern_size - (endex - start), endex - block_endex)
if start <= block_start:
yield from reversed(block_data)
endex = block_start
else:
yield from reversed(memoryview(block_data)[(start - block_start):])
endex = start
size = None if start_ is Ellipsis else endex - start
yield from _repeat2(pattern, pattern_size - (endex - start), size)
[docs]
def setdefault(
self,
address: Address,
default: Optional[Union[AnyBytes, Value]] = None,
) -> Optional[Value]:
backup = self.peek(address)
if backup is None:
if default is not None:
if not isinstance(default, Value):
if len(default) != 1:
raise ValueError('expecting single item')
default = default[0]
self.poke(address, default)
return default
else:
return backup
[docs]
def setdefault_backup(
self,
address: Address,
) -> Tuple[Address, Optional[Value]]:
backup = self.peek(address)
return address, backup
[docs]
def setdefault_restore(
self,
address: Address,
item: Optional[Value],
) -> None:
self.poke(address, item)
[docs]
def shift(
self,
offset: Address,
) -> None:
if offset and self._blocks:
if offset < 0:
self._prebound_start(None, -offset)
else:
self._prebound_endex(None, +offset)
for block in self._blocks:
block[0] += offset
[docs]
def shift_backup(
self,
offset: Address,
) -> Tuple[Address, ImmutableMemory]:
if offset < 0:
backup = self._prebound_start_backup(None, -offset)
else:
backup = self._prebound_endex_backup(None, +offset)
return offset, backup
[docs]
def shift_restore(
self,
offset: Address,
backup: ImmutableMemory,
) -> None:
self.shift(-offset)
self.write(0, backup, clear=True)
@ImmutableMemory.span.getter
def span(
self,
) -> ClosedInterval:
return self.start, self.endex
@ImmutableMemory.start.getter
def start(
self,
) -> Address:
bound_start = self._bound_start
if bound_start is None:
# Return actual
blocks = self._blocks
if blocks:
return blocks[0][0]
else:
return 0
else:
return bound_start
[docs]
def to_blocks(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> BlockList:
blocks = [[block_start, bytes(block_data)]
for block_start, block_data in self.blocks(start=start, endex=endex)]
return blocks
[docs]
def to_bytes(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> bytes:
return bytes(self.view(start=start, endex=endex))
[docs]
def update(
self,
data: Union[AddressValueMapping,
Iterable[Tuple[Address, Optional[Value]]],
Mapping[Address, Optional[Union[Value, AnyBytes]]],
ImmutableMemory],
clear: bool = False,
**kwargs: Any, # string keys cannot become addresses
) -> None:
if kwargs:
raise KeyError('cannot convert kwargs.keys() into addresses')
if isinstance(data, ImmutableMemory):
self.write(0, data, clear=clear)
else:
if isinstance(data, Mapping):
data = data.items()
poke = self.poke
for address, value in data:
poke(address, value)
[docs]
def update_backup(
self,
data: Union[AddressValueMapping,
Iterable[Tuple[Address, Optional[Value]]],
Mapping[Address, Optional[Union[Value, AnyBytes]]],
ImmutableMemory],
clear: bool = False,
**kwargs: Any, # string keys cannot become addresses
) -> Union[AddressValueMapping, List[ImmutableMemory]]:
if kwargs:
raise KeyError('cannot convert kwargs.keys() into addresses')
if isinstance(data, ImmutableMemory):
return self.write_backup(0, data, clear=clear)
else:
peek = self.peek
if isinstance(data, Mapping):
backups = {address: peek(address) for address in data.keys()}
else:
backups = {address: peek(address) for address, _ in data}
return backups
[docs]
def update_restore(
self,
backups: Union[AddressValueMapping, List[ImmutableMemory]],
) -> None:
if isinstance(backups, list):
for backup in backups:
self.write(0, backup, clear=True)
else:
self.update(backups)
[docs]
def validate(
self,
) -> None:
start, endex = self.bound(None, None)
blocks = self._blocks
if blocks:
if endex <= start:
raise ValueError('invalid bounds')
previous_endex = blocks[0][0] - 1 # before first start
for block_start, block_data in blocks:
block_endex = block_start + len(block_data)
if block_start <= previous_endex:
raise ValueError('invalid block interleaving')
if block_endex <= block_start:
raise ValueError('invalid block data size')
if block_start < start or endex < block_endex:
raise ValueError('invalid block bounds')
previous_endex = block_endex
else:
if endex < start:
raise ValueError('invalid bounds')
[docs]
def values(
self,
start: Optional[Address] = None,
endex: Optional[Union[Address, EllipsisType]] = None,
pattern: Optional[Union[AnyBytes, Value]] = None,
) -> Iterator[Optional[Value]]:
if endex is None or endex is Ellipsis:
if pattern is not None:
if isinstance(pattern, Value):
pattern = (pattern,)
pattern = bytearray(pattern)
if not pattern:
raise ValueError('non-empty pattern required')
if start is None:
start = self.start
block_index = 0
else:
block_index = self._block_index_start(start)
start_ = start
blocks = self._blocks
if block_index < len(blocks):
block_start, block_data = blocks[block_index]
if block_start < start:
yield from memoryview(block_data)[(start - block_start):]
else:
yield from _repeat2(pattern, (start - start_), (block_start - start))
yield from block_data
start = block_start + len(block_data)
for block_start, block_data in _islice(blocks, block_index + 1, len(blocks)):
yield from _repeat2(pattern, (start - start_), (block_start - start))
yield from block_data
start = block_start + len(block_data)
if endex is Ellipsis:
yield from _repeat2(pattern, (start - start_), None)
else:
if start is None:
start = self.start
if start < endex:
values = self.values(start=start, endex=Ellipsis, pattern=pattern)
yield from _islice(values, (endex - start))
[docs]
def view(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> memoryview:
if start is None:
start = self.start
if endex is None:
endex = self.endex
if start < endex:
block_index = self._block_index_at(start)
if block_index is not None:
block_start, block_data = self._blocks[block_index]
if endex <= block_start + len(block_data):
return memoryview(block_data)[(start - block_start):(endex - block_start)]
raise ValueError('non-contiguous data within range')
else:
return memoryview(b'')
[docs]
def write(
self,
address: Address,
data: Union[AnyBytes, Value, ImmutableMemory, 'Memory'],
clear: bool = False,
) -> None:
if isinstance(data, Value):
self.poke(address, data) # faster
return
data_is_immutable_memory = isinstance(data, ImmutableMemory)
if data_is_immutable_memory:
start = data.start + address
endex = data.endex + address
size = endex - start
else:
data = bytearray(data) # clone
size = len(data)
if size == 1:
self.poke(address, data[0]) # faster
return
start = address
endex = start + size
if not size:
return
bound_start = self._bound_start
if bound_start is not None and endex <= bound_start:
return
bound_endex = self._bound_endex
if bound_endex is not None and bound_endex <= start:
return
if data_is_immutable_memory:
data_is_memory = isinstance(data, Memory)
if clear:
# Clear anything between source data boundaries
self._erase(start, endex, False) # clear
else:
# Clear only overwritten ranges
data_blocks = data._blocks if data_is_memory else data.blocks()
for block_start, block_data in data_blocks:
block_start = block_start + address
block_endex = block_start + len(block_data)
self._erase(block_start, block_endex, False) # clear
data_blocks = data._blocks if data_is_memory else data.blocks()
for block_start, block_data in data_blocks:
block_start = block_start + address
block_endex = block_start + len(block_data)
if bound_start is not None and block_endex <= bound_start:
continue
if bound_endex is not None and bound_endex <= block_start:
break
block_data = bytearray(block_data) # clone
# Bound before memory
if bound_start is not None and block_start < bound_start:
offset = bound_start - block_start
block_start += offset
del block_data[:offset]
# Bound after memory
if bound_endex is not None and bound_endex < block_endex:
offset = block_endex - bound_endex
block_endex -= offset
del block_data[(block_endex - block_start):]
self._place(block_start, block_data, False) # write
else:
# Bound before memory
if bound_start is not None and start < bound_start:
offset = bound_start - start
size -= offset
start += offset
del data[:offset]
# Bound after memory
if bound_endex is not None and bound_endex < endex:
offset = endex - bound_endex
size -= offset
endex -= offset
del data[size:]
# Check if extending the actual content
blocks = self._blocks
if blocks:
block_start, block_data = blocks[-1]
block_endex = block_start + len(block_data)
if start == block_endex:
block_data += data # faster
return
# Standard write method
self._erase(start, endex, False) # clear
self._place(start, data, False) # write
[docs]
def write_backup(
self,
address: Address,
data: Union[AnyBytes, Value, ImmutableMemory],
clear: bool = False,
) -> List[ImmutableMemory]:
if isinstance(data, ImmutableMemory):
start = data.start + address
endex = data.endex + address
if endex <= start:
backups = []
elif clear:
backups = [self.extract(start=start, endex=endex)]
else:
intervals = data.intervals(start=start, endex=endex)
backups = [self.extract(start=block_start, endex=block_endex)
for block_start, block_endex in intervals]
else:
if isinstance(data, Value):
data = (data,)
start = address
endex = start + len(data)
if start < endex:
backups = [self.extract(start=start, endex=endex)]
else:
backups = []
return backups
[docs]
def write_restore(
self,
backups: Sequence[ImmutableMemory],
) -> None:
for backup in backups:
self.write(0, backup, clear=True)
# noinspection PyPep8Naming
[docs]
class bytesparse(Memory, MutableBytesparse):
__doc__ = MutableBytesparse.__doc__
[docs]
def __delitem__(
self,
key: Union[Address, slice],
) -> None:
if isinstance(key, slice):
start, endex = self._rectify_span(key.start, key.stop)
key = slice(start, endex, key.step)
else:
key = self._rectify_address(key)
super().__delitem__(key)
[docs]
def __getitem__(
self,
key: Union[Address, slice],
) -> Any:
if isinstance(key, slice):
start, endex = self._rectify_span(key.start, key.stop)
key = slice(start, endex, key.step)
else:
key = self._rectify_address(key)
return super().__getitem__(key)
[docs]
def __init__(
self,
*args: Any, # see bytearray.__init__()
start: Optional[Address] = None,
endex: Optional[Address] = None,
):
super().__init__(start=start, endex=endex)
data = bytearray(*args)
if data:
if start is None:
start = 0
if endex is not None:
if endex <= start:
return
del data[(endex - start):]
self._blocks.append([start, data])
[docs]
def __setitem__(
self,
key: Union[Address, slice],
value: Optional[Union[AnyBytes, Value]],
) -> None:
if isinstance(key, slice):
start, endex = self._rectify_span(key.start, key.stop)
key = slice(start, endex, key.step)
else:
key = self._rectify_address(key)
super().__setitem__(key, value)
[docs]
def _rectify_address(
self,
address: Address,
) -> Address:
address = address.__index__()
if address < 0:
address = self.endex + address
if address < 0:
raise IndexError('index out of range')
return address
[docs]
def _rectify_span(
self,
start: Optional[Address],
endex: Optional[Address],
) -> OpenInterval:
endex_ = None
if start is not None and start < 0:
endex_ = self.endex
start = endex_ + start
if start < 0:
start = 0
if endex is not None and endex < 0:
if endex_ is None:
endex_ = self.endex
endex = endex_ + endex
if endex < 0:
endex = 0
return start, endex
[docs]
def block_span(
self,
address: Address,
) -> Tuple[Optional[Address], Optional[Address], Optional[Value]]:
address = self._rectify_address(address)
return super().block_span(address)
[docs]
def blocks(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> Iterator[Block]:
start, endex = self._rectify_span(start, endex)
yield from super().blocks(start=start, endex=endex)
[docs]
def bound(
self,
start: Optional[Address],
endex: Optional[Address],
) -> ClosedInterval:
start, endex = self._rectify_span(start, endex)
return super().bound(start=start, endex=endex)
[docs]
def clear(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> None:
start, endex = self._rectify_span(start, endex)
super().clear(start=start, endex=endex)
[docs]
def clear_backup(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> ImmutableMemory:
start, endex = self._rectify_span(start, endex)
return super().clear_backup(start=start, endex=endex)
[docs]
def count(
self,
item: Union[AnyBytes, Value],
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> int:
start, endex = self._rectify_span(start, endex)
return super().count(item, start=start, endex=endex)
[docs]
def crop(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> None:
start, endex = self._rectify_span(start, endex)
super().crop(start=start, endex=endex)
[docs]
def crop_backup(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> Tuple[Optional[ImmutableMemory], Optional[ImmutableMemory]]:
start, endex = self._rectify_span(start, endex)
return super().crop_backup(start=start, endex=endex)
[docs]
def cut(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
bound: bool = True,
) -> ImmutableMemory:
start, endex = self._rectify_span(start, endex)
return super().cut(start=start, endex=endex)
[docs]
def delete(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> None:
start, endex = self._rectify_span(start, endex)
super().delete(start=start, endex=endex)
[docs]
def delete_backup(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> ImmutableMemory:
start, endex = self._rectify_span(start, endex)
return super().delete_backup(start=start, endex=endex)
[docs]
def equal_span(
self,
address: Address,
) -> Tuple[Optional[Address], Optional[Address], Optional[Value]]:
address = self._rectify_address(address)
return super().equal_span(address)
[docs]
def fill(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
pattern: Union[AnyBytes, Value] = 0,
) -> None:
start, endex = self._rectify_span(start, endex)
super().fill(start=start, endex=endex, pattern=pattern)
[docs]
def fill_backup(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> ImmutableMemory:
start, endex = self._rectify_span(start, endex)
return super().fill_backup(start=start, endex=endex)
[docs]
def find(
self,
item: Union[AnyBytes, Value],
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> Address:
start, endex = self._rectify_span(start, endex)
return super().find(item, start=start, endex=endex)
[docs]
def flood(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
pattern: Union[AnyBytes, Value] = 0,
) -> None:
start, endex = self._rectify_span(start, endex)
super().flood(start=start, endex=endex, pattern=pattern)
[docs]
def flood_backup(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> List[OpenInterval]:
start, endex = self._rectify_span(start, endex)
return super().flood_backup(start=start, endex=endex)
[docs]
@classmethod
def from_blocks(
cls,
blocks: BlockSequence,
offset: Address = 0,
start: Optional[Address] = None,
endex: Optional[Address] = None,
copy: bool = True,
validate: bool = True,
) -> 'bytesparse':
if blocks:
block_start = blocks[0][0]
if block_start + offset < 0:
raise ValueError('negative offseted start')
if start is not None and start < 0:
raise ValueError('negative start')
if endex is not None and endex < 0:
raise ValueError('negative endex')
memory1 = super().from_blocks(blocks, offset=offset, start=start, endex=endex, copy=copy, validate=validate)
memory2 = cls()
memory2._blocks = memory1._blocks
memory2._bound_start = memory1._bound_start
memory2._bound_endex = memory1._bound_endex
return memory2
[docs]
@classmethod
def from_bytes(
cls,
data: AnyBytes,
offset: Address = 0,
start: Optional[Address] = None,
endex: Optional[Address] = None,
copy: bool = True,
validate: bool = True,
) -> 'bytesparse':
if offset < 0:
raise ValueError('negative offset')
if start is not None and start < 0:
raise ValueError('negative start')
if endex is not None and endex < 0:
raise ValueError('negative endex')
memory1 = super().from_bytes(data, offset=offset, start=start, endex=endex, copy=copy, validate=validate)
memory2 = cls()
memory2._blocks = memory1._blocks
memory2._bound_start = memory1._bound_start
memory2._bound_endex = memory1._bound_endex
return memory2
[docs]
@classmethod
def from_memory(
cls,
memory: ImmutableMemory,
offset: Address = 0,
start: Optional[Address] = None,
endex: Optional[Address] = None,
copy: bool = True,
validate: bool = True,
) -> 'bytesparse':
if isinstance(memory, Memory):
blocks = memory._blocks
if blocks:
block_start = blocks[0][0]
if block_start + offset < 0:
raise ValueError('negative offseted start')
else:
if memory:
if memory.start + offset < 0:
raise ValueError('negative offseted start')
if start is not None and start < 0:
raise ValueError('negative start')
if endex is not None and endex < 0:
raise ValueError('negative endex')
memory1 = super().from_memory(memory, offset=offset, start=start, endex=endex, copy=copy, validate=validate)
memory2 = cls()
memory2._blocks = memory1._blocks
memory2._bound_start = memory1._bound_start
memory2._bound_endex = memory1._bound_endex
return memory2
[docs]
def gaps(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> Iterator[OpenInterval]:
start, endex = self._rectify_span(start, endex)
yield from super().gaps(start=start, endex=endex)
[docs]
def get(
self,
address: Address,
default: Optional[Value] = None,
) -> Optional[Value]:
address = self._rectify_address(address)
return super().get(address, default=default)
[docs]
def hexdump(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
columns: int = 16,
addrfmt: str = '{:08X} ',
bytefmt: str = ' {:02X}',
headfmt: Optional[Union[str, EllipsisType]] = None,
charmap: Optional[Mapping[int, str]] = HUMAN_ASCII,
emptystr: str = ' --',
beforestr: str = ' >>',
afterstr: str = ' <<',
charsep: str = ' |',
charend: str = '|',
stream: Optional[Union[io.TextIOBase, EllipsisType]] = Ellipsis,
) -> Optional[str]:
start, endex = self._rectify_span(start, endex)
return super().hexdump(
start=start,
endex=endex,
columns=columns,
addrfmt=addrfmt,
bytefmt=bytefmt,
headfmt=headfmt,
charmap=charmap,
emptystr=emptystr,
beforestr=beforestr,
afterstr=afterstr,
charsep=charsep,
charend=charend,
stream=stream,
)
[docs]
def index(
self,
item: Union[AnyBytes, Value],
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> Address:
start, endex = self._rectify_span(start, endex)
return super().index(item, start=start, endex=endex)
[docs]
def insert(
self,
address: Address,
data: Union[AnyBytes, Value, ImmutableMemory],
) -> None:
address = self._rectify_address(address)
super().insert(address, data)
[docs]
def insert_backup(
self,
address: Address,
data: Union[AnyBytes, Value, ImmutableMemory],
) -> Tuple[Address, ImmutableMemory]:
address = self._rectify_address(address)
return super().insert_backup(address, data)
[docs]
def intervals(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> Iterator[ClosedInterval]:
start, endex = self._rectify_span(start, endex)
yield from super().intervals(start=start, endex=endex)
[docs]
def items(
self,
start: Optional[Address] = None,
endex: Optional[Union[Address, EllipsisType]] = None,
pattern: Optional[Union[AnyBytes, Value]] = None,
) -> Iterator[Tuple[Address, Optional[Value]]]:
endex_ = endex # backup
if endex is Ellipsis:
endex = None
start, endex = self._rectify_span(start, endex)
if endex_ is Ellipsis:
endex = endex_ # restore
yield from super().items(start=start, endex=endex, pattern=pattern)
[docs]
def keys(
self,
start: Optional[Address] = None,
endex: Optional[Union[Address, EllipsisType]] = None,
) -> Iterator[Address]:
endex_ = endex # backup
if endex is Ellipsis:
endex = None
start, endex = self._rectify_span(start, endex)
if endex_ is Ellipsis:
endex = endex_ # restore
yield from super().keys(start=start, endex=endex)
[docs]
def peek(
self,
address: Address,
) -> Optional[Value]:
address = self._rectify_address(address)
return super().peek(address)
[docs]
def poke(
self,
address: Address,
item: Optional[Union[AnyBytes, Value]],
) -> None:
address = self._rectify_address(address)
super().poke(address, item)
[docs]
def poke_backup(
self,
address: Address,
) -> Tuple[Address, Optional[Value]]:
address = self._rectify_address(address)
return super().poke_backup(address)
[docs]
def pop(
self,
address: Optional[Address] = None,
default: Optional[Value] = None,
) -> Optional[Value]:
if address is not None:
address = self._rectify_address(address)
return super().pop(address=address, default=default)
[docs]
def pop_backup(
self,
address: Optional[Address] = None,
) -> Tuple[Address, Optional[Value]]:
if address is not None:
address = self._rectify_address(address)
return super().pop_backup(address=address)
[docs]
def read(
self,
address: Address,
size: Address,
) -> memoryview:
address = self._rectify_address(address)
return super().read(address, size)
[docs]
def readinto(
self,
address: Address,
buffer: AnyBytes,
) -> int:
address = self._rectify_address(address)
return super().readinto(address, buffer)
[docs]
def remove(
self,
item: Union[AnyBytes, Value],
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> None:
start, endex = self._rectify_span(start, endex)
super().remove(item, start=start, endex=endex)
[docs]
def remove_backup(
self,
item: Union[AnyBytes, Value],
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> ImmutableMemory:
start, endex = self._rectify_span(start, endex)
return super().remove_backup(item, start=start, endex=endex)
[docs]
def reserve(
self,
address: Address,
size: Address,
) -> None:
address = self._rectify_address(address)
super().reserve(address, size)
[docs]
def reserve_backup(
self,
address: Address,
size: Address,
) -> Tuple[Address, ImmutableMemory]:
address = self._rectify_address(address)
return super().reserve_backup(address, size)
[docs]
def rfind(
self,
item: Union[AnyBytes, Value],
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> Address:
start, endex = self._rectify_span(start, endex)
return super().rfind(item, start=start, endex=endex)
[docs]
def rindex(
self,
item: Union[AnyBytes, Value],
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> Address:
start, endex = self._rectify_span(start, endex)
return super().rindex(item, start=start, endex=endex)
[docs]
def rvalues(
self,
start: Optional[Union[Address, EllipsisType]] = None,
endex: Optional[Address] = None,
pattern: Optional[Union[AnyBytes, Value]] = None,
) -> Iterator[Optional[Value]]:
start_ = start # backup
if start is Ellipsis:
start = None
start, endex = self._rectify_span(start, endex)
if start_ is Ellipsis:
start = start_ # restore
yield from super().rvalues(start=start, endex=endex, pattern=pattern)
[docs]
def setdefault(
self,
address: Address,
default: Optional[Value] = None,
) -> Optional[Value]:
address = self._rectify_address(address)
return super().setdefault(address, default=default)
[docs]
def setdefault_backup(
self,
address: Address,
) -> Tuple[Address, Optional[Value]]:
address = self._rectify_address(address)
return super().setdefault_backup(address)
[docs]
def shift(
self,
offset: Address,
) -> None:
if self._bound_start is None and offset < 0:
blocks = self._blocks
if blocks:
block_start = blocks[0][0]
if block_start + offset < 0:
raise ValueError('negative offseted start')
super().shift(offset)
[docs]
def shift_backup(
self,
offset: Address,
) -> Tuple[Address, ImmutableMemory]:
if self._bound_start is None and offset < 0:
blocks = self._blocks
if blocks:
block_start = blocks[0][0]
if block_start + offset < 0:
raise ValueError('negative offseted start')
return super().shift_backup(offset)
@ImmutableMemory.bound_endex.getter
def bound_endex(
self,
) -> Optional[Address]:
# Copy-pasted from Memory, because I cannot figure out how to override properties
return self._bound_endex
@bound_endex.setter
def bound_endex(
self,
bound_endex: Optional[Address],
) -> None:
if bound_endex is not None and bound_endex < 0:
raise ValueError('negative endex')
# Copy-pasted from Memory, because I cannot figure out how to override properties
bound_start = self._bound_start
if bound_start is not None and bound_endex is not None and bound_endex < bound_start:
self._bound_start = bound_start = bound_endex
self._bound_endex = bound_endex
if bound_endex is not None:
self.crop(start=bound_start, endex=bound_endex)
@ImmutableMemory.bound_span.getter
def bound_span(
self,
) -> OpenInterval:
# Copy-pasted from Memory, because I cannot figure out how to override properties
return self._bound_start, self._bound_endex
@bound_span.setter
def bound_span(
self,
bound_span: OpenInterval,
) -> None:
if bound_span is None:
bound_span = (None, None)
bound_start, bound_endex = bound_span
if bound_start is not None and bound_start < 0:
raise ValueError('negative start')
if bound_endex is not None and bound_endex < 0:
raise ValueError('negative endex')
# Copy-pasted from Memory, because I cannot figure out how to override properties
bound_start, bound_endex = bound_span
if bound_start is not None and bound_endex is not None and bound_endex < bound_start:
bound_endex = bound_start
self._bound_start = bound_start
self._bound_endex = bound_endex
if bound_start is not None or bound_endex is not None:
self.crop(start=bound_start, endex=bound_endex)
@ImmutableMemory.bound_start.getter
def bound_start(
self,
) -> Optional[Address]:
# Copy-pasted from Memory, because I cannot figure out how to override properties
return self._bound_start
@bound_start.setter
def bound_start(
self,
bound_start: Optional[Address],
) -> None:
if bound_start is not None and bound_start < 0:
raise ValueError('negative start')
# Copy-pasted from Memory, because I cannot figure out how to override properties
bound_endex = self._bound_endex
if bound_start is not None and bound_endex is not None and bound_endex < bound_start:
self._bound_endex = bound_endex = bound_start
self._bound_start = bound_start
if bound_start is not None:
self.crop(start=bound_start, endex=bound_endex)
[docs]
def validate(
self,
) -> None:
for block in self._blocks:
if block[0] < 0:
raise ValueError('negative block start')
super().validate()
[docs]
def values(
self,
start: Optional[Address] = None,
endex: Optional[Union[Address, EllipsisType]] = None,
pattern: Optional[Union[AnyBytes, Value]] = None,
) -> Iterator[Optional[Value]]:
endex_ = endex # backup
if endex is Ellipsis:
endex = None
start, endex = self._rectify_span(start, endex)
if endex_ is Ellipsis:
endex = endex_ # restore
yield from super().values(start=start, endex=endex, pattern=pattern)
[docs]
def view(
self,
start: Optional[Address] = None,
endex: Optional[Address] = None,
) -> memoryview:
start, endex = self._rectify_span(start, endex)
return super().view(start=start, endex=endex)
[docs]
def write(
self,
address: Address,
data: Union[AnyBytes, Value, ImmutableMemory],
clear: bool = False,
) -> None:
address = self._rectify_address(address)
super().write(address, data, clear=clear)
[docs]
def write_backup(
self,
address: Address,
data: Union[AnyBytes, Value, ImmutableMemory],
clear: bool = False,
) -> List[ImmutableMemory]:
address = self._rectify_address(address)
return super().write_backup(address, data, clear=clear)