import functools
import itertools
import operator
from collections import defaultdict
from typing import TypeVar, overload
from collections.abc import Iterable, Sequence, Iterator
T = TypeVar('T')
@overload
def chunks(iterable: list[T], chunk_size: int) -> Iterable[list[T]]:
pass
[docs]
def chunks(iterable: Sequence[T], chunk_size: int) -> Iterable[Sequence[T]]:
"""
Split the given sequence into multiple chunks.
:param iterable: Iterable sequence - must support ``len()`` and index access.
:param chunk_size: Chunk size.
:return: Iterable over chunks of given size.
"""
for i in range(0, len(iterable), chunk_size):
yield iterable[i : i + chunk_size]
[docs]
def merged_generator(*generators: Iterable[T]) -> Iterable[T]:
"""
Merges multiple generators into one. Variation of :py:class:`itertools.chain`, but for generators.
:param generators: Generators to merge.
:return: Iterable over
"""
for generator in generators:
yield from generator
[docs]
def unique(iterable: Iterable[T]) -> Iterable[T]:
"""
List unique elements, preserving order. Remember all elements ever seen.
:param iterable: Iterable used to filter values.
:return Generator of unique values.
"""
seen = set()
seen_add = seen.add
for element in itertools.filterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
[docs]
def filter_not_none(iterable: Iterable[T]) -> Iterable[T]:
"""
Filter out all elements equal to None.
:param iterable: Iterable used to filter values.
:return: Generator of values not equal to None.
"""
return filter(functools.partial(operator.is_not, None), iterable)
[docs]
def try_slice_iterable(iterable: Iterable[T], max_size: int) -> Iterable[T]:
"""
Try lazily slice iterator or raise `IterableTooLong` if `iterable` exceeds ``max_size``.
Equivalent to `itertools.islice` but raises `IterableTooLong` exception.
"""
for count, item in enumerate(iterable, start=1):
if count == max_size:
raise IterableTooLong()
yield item
T_co = TypeVar('T_co', covariant=True)
[docs]
class ChunkedIterator(Iterator[T_co]):
"""
Iterator that lazily splits passed iterator into smaller chunks with size
specified in ``chunk_size`` arg and providing some statistics.
:param include_empty: if true then iterator always yields at least one chunk.
"""
_CHUNKS_COUNT = 'chunks'
def __init__(self, iterable: Iterable[T_co], chunk_size: int, include_empty=True):
self._chunk_size = chunk_size
self._iterable = counted(iterable, self._CHUNKS_COUNT)
self._chunks = self._iter_chunks()
self._include_empty = include_empty
@property
def items_count(self) -> int:
return self._iterable.count
@property
def chunks_count(self) -> int:
return self._iterable[self._CHUNKS_COUNT]
def __iter__(self) -> Iterator[list[T_co]]:
return self
def __next__(self) -> list[T_co]:
return next(self._chunks)
def _iter_chunks(self) -> Iterator[list[T_co]]:
chunk: list[T_co] = []
for item in self._iterable:
chunk.append(item)
# chunk full-filled
if self.items_count % self._chunk_size == 0:
self._iterable.increment(self._CHUNKS_COUNT)
yield chunk
chunk.clear()
if self._include_empty or chunk:
if chunk:
self._iterable.increment(self._CHUNKS_COUNT)
yield chunk
class _Count(Iterable[T]):
"""
Iterable wrapper that counts number of entries (iterations)
while yields entries from iterable.
Optionally you can register custom labels to count while iterating,
use :meth:`increment`.
"""
_TOTAL = 'count'
def __init__(self, iterable: Iterable[T], labels: Iterable[str] = None):
self.iterable = iterable
self._counter = defaultdict(int, dict.fromkeys([self._TOTAL, *(labels or [])], 0))
@property
def count(self):
return self[self._TOTAL]
def __getitem__(self, label):
return self._counter[label]
def increment(self, *labels: str) -> None:
for label in labels:
self._counter[label] += 1
def __iter__(self) -> Iterator[T]:
for t in self.iterable:
self.increment(self._TOTAL)
yield t
def __format__(self, format_spec) -> str:
format_spec = str(format_spec)
if format_spec.startswith('stats'):
label_value_sep = (format_spec.split(':', 1)[1:] or [' '])[0]
return ', '.join([f'{label}{label_value_sep}{count}' for label, count in self._counter.items()])
else:
return str(self)
def __str__(self) -> str:
return str(dict(self._counter))
counted = _Count
chunked = ChunkedIterator