Source code for b2sdk.stream.chained

######################################################################
#
# File: b2sdk/stream/chained.py
#
# Copyright 2020 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################

import io

from abc import ABCMeta, abstractmethod

from b2sdk.stream.base import ReadOnlyStreamMixin


[docs]class ChainedStream(ReadOnlyStreamMixin, io.IOBase): """ Chains multiple streams in single stream, sort of what :py:class:`itertools.chain` does for iterators. Cleans up buffers of underlying streams when closed. Can be seeked to beginning (when retrying upload, for example). Closes underlying streams as soon as they reaches EOF, but clears their buffers when the chained stream is closed for underlying streams that follow :py:class:`b2sdk.v2.StreamOpener` cleanup interface, for example :py:class:`b2sdk.v2.CachedBytesStreamOpener` """
[docs] def __init__(self, stream_openers): """ :param list stream_openeres: list of callables that return opened streams """ stream_openers = list(stream_openers) if not stream_openers: raise ValueError('chain_links cannot be empty') self.stream_openers = stream_openers self._stream_openers_iterator = iter(self.stream_openers) self._current_stream = None self._pos = 0 super(ChainedStream, self).__init__()
@property def stream(self): """ Return currently processed stream. """ if self._current_stream is None: self._next_stream() return self._current_stream def _reset_chain(self): if self._current_stream is not None: self._current_stream.close() self._current_stream = None self._stream_openers_iterator = iter(self.stream_openers) self._pos = 0 def _next_stream(self): next_stream_opener = next(self._stream_openers_iterator, None) if next_stream_opener is not None: if self._current_stream is not None: self._current_stream.close() self._current_stream = next_stream_opener()
[docs] def seekable(self): return True
[docs] def tell(self): return self._pos
[docs] def seek(self, pos, whence=0): """ Resets stream to the beginning. :param int pos: only allowed value is ``0`` :param int whence: only allowed value is ``0`` """ if pos != 0 or whence != 0: raise io.UnsupportedOperation('Chained stream can only be seeked to beginning') self._reset_chain() return self.tell()
[docs] def readable(self): return True
[docs] def read(self, size=None): """ Read at most `size` bytes from underlying streams, or all available data, if `size` is None or negative. Open the streams only when their data is needed, and possibly leave them open and part-way read for further reading - by subsequent calls to this method. :param int,None size: number of bytes to read. If omitted, ``None``, or negative data is read and returned until EOF from final stream is reached :return: data read from the stream """ byte_arrays = [] if size < 0 or size is None: while 1: current_stream = self.stream buff = current_stream.read() byte_arrays.append(buff) if not buff: self._next_stream() if self.stream is current_stream: break else: remaining = size while 1: current_stream = self.stream buff = current_stream.read(remaining) byte_arrays.append(buff) remaining -= len(buff) if remaining == 0: # no need to open any other streams - we're satisfied break if not buff: self._next_stream() if self.stream is current_stream: break if not byte_arrays: data = byte_arrays[0] else: data = bytes().join(byte_arrays) self._pos += len(data) return data
[docs] def close(self): if self._current_stream is not None: self._current_stream.close() for stream_opener in self.stream_openers: if hasattr(stream_opener, 'cleanup'): stream_opener.cleanup() super(ChainedStream, self).close()
[docs]class StreamOpener(metaclass=ABCMeta): """ Abstract class to define stream opener with cleanup. """ @abstractmethod def __call__(self): """ Create or open the stream to read and return. Can be called multiple times, but streamed data may be cached and reused. """
[docs] def cleanup(self): """ Clean up stream opener after chained stream closes. Can be used for cleaning cached data that are stored in memory to allow resetting chained stream without getting this data more than once, eg. data downloaded from external source. """