Source code for ppc_robot_lib.utils.iter

import functools
import itertools
import operator
from collections import defaultdict
from typing import TypeVar, overload
from collections.abc import Iterable, Sequence, Iterator

T = TypeVar('T')


@overload
def chunks(iterable: list[T], chunk_size: int) -> Iterable[list[T]]:
    pass


[docs] def chunks(iterable: Sequence[T], chunk_size: int) -> Iterable[Sequence[T]]: """ Split the given sequence into multiple chunks. :param iterable: Iterable sequence - must support ``len()`` and index access. :param chunk_size: Chunk size. :return: Iterable over chunks of given size. """ for i in range(0, len(iterable), chunk_size): yield iterable[i : i + chunk_size]
[docs] def merged_generator(*generators: Iterable[T]) -> Iterable[T]: """ Merges multiple generators into one. Variation of :py:class:`itertools.chain`, but for generators. :param generators: Generators to merge. :return: Iterable over """ for generator in generators: yield from generator
[docs] def unique(iterable: Iterable[T]) -> Iterable[T]: """ List unique elements, preserving order. Remember all elements ever seen. :param iterable: Iterable used to filter values. :return Generator of unique values. """ seen = set() seen_add = seen.add for element in itertools.filterfalse(seen.__contains__, iterable): seen_add(element) yield element
[docs] def filter_not_none(iterable: Iterable[T]) -> Iterable[T]: """ Filter out all elements equal to None. :param iterable: Iterable used to filter values. :return: Generator of values not equal to None. """ return filter(functools.partial(operator.is_not, None), iterable)
[docs] def try_slice_iterable(iterable: Iterable[T], max_size: int) -> Iterable[T]: """ Try lazily slice iterator or raise `IterableTooLong` if `iterable` exceeds ``max_size``. Equivalent to `itertools.islice` but raises `IterableTooLong` exception. """ for count, item in enumerate(iterable, start=1): if count == max_size: raise IterableTooLong() yield item
[docs] class IterableTooLong(Exception): pass
T_co = TypeVar('T_co', covariant=True)
[docs] class ChunkedIterator(Iterator[T_co]): """ Iterator that lazily splits passed iterator into smaller chunks with size specified in ``chunk_size`` arg and providing some statistics. :param include_empty: if true then iterator always yields at least one chunk. """ _CHUNKS_COUNT = 'chunks' def __init__(self, iterable: Iterable[T_co], chunk_size: int, include_empty=True): self._chunk_size = chunk_size self._iterable = counted(iterable, self._CHUNKS_COUNT) self._chunks = self._iter_chunks() self._include_empty = include_empty @property def items_count(self) -> int: return self._iterable.count @property def chunks_count(self) -> int: return self._iterable[self._CHUNKS_COUNT] def __iter__(self) -> Iterator[list[T_co]]: return self def __next__(self) -> list[T_co]: return next(self._chunks) def _iter_chunks(self) -> Iterator[list[T_co]]: chunk: list[T_co] = [] for item in self._iterable: chunk.append(item) # chunk full-filled if self.items_count % self._chunk_size == 0: self._iterable.increment(self._CHUNKS_COUNT) yield chunk chunk.clear() if self._include_empty or chunk: if chunk: self._iterable.increment(self._CHUNKS_COUNT) yield chunk
class _Count(Iterable[T]): """ Iterable wrapper that counts number of entries (iterations) while yields entries from iterable. Optionally you can register custom labels to count while iterating, use :meth:`increment`. """ _TOTAL = 'count' def __init__(self, iterable: Iterable[T], labels: Iterable[str] = None): self.iterable = iterable self._counter = defaultdict(int, dict.fromkeys([self._TOTAL, *(labels or [])], 0)) @property def count(self): return self[self._TOTAL] def __getitem__(self, label): return self._counter[label] def increment(self, *labels: str) -> None: for label in labels: self._counter[label] += 1 def __iter__(self) -> Iterator[T]: for t in self.iterable: self.increment(self._TOTAL) yield t def __format__(self, format_spec) -> str: format_spec = str(format_spec) if format_spec.startswith('stats'): label_value_sep = (format_spec.split(':', 1)[1:] or [' '])[0] return ', '.join([f'{label}{label_value_sep}{count}' for label, count in self._counter.items()]) else: return str(self) def __str__(self) -> str: return str(dict(self._counter)) counted = _Count chunked = ChunkedIterator