Source code for b2sdk.stream.wrapper

######################################################################
#
# File: b2sdk/stream/wrapper.py
#
# Copyright 2020 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################

import io


[docs]class StreamWrapper(io.IOBase): """ Wrapper for a file-like object. """
[docs] def __init__(self, stream): """ :param stream: the stream to read from or write to """ self.stream = stream super(StreamWrapper, self).__init__()
[docs] def seekable(self): return self.stream.seekable()
[docs] def seek(self, pos, whence=0): """ Seek to a given position in the stream. :param int pos: position in the stream :return: new absolute position :rtype: int """ return self.stream.seek(pos, whence)
[docs] def tell(self): """ Return current stream position. :rtype: int """ return self.stream.tell()
[docs] def truncate(self, size=None): return self.stream.truncate(size)
[docs] def flush(self): """ Flush the stream. """ self.stream.flush()
[docs] def readable(self): return self.stream.readable()
[docs] def read(self, size=None): """ Read data from the stream. :param int size: number of bytes to read :return: data read from the stream """ if size is not None: return self.stream.read(size) else: return self.stream.read()
[docs] def writable(self): return self.stream.writable()
[docs] def write(self, data): """ Write data to the stream. :param data: a data to write to the stream """ return self.stream.write(data)
[docs]class StreamWithLengthWrapper(StreamWrapper): """ Wrapper for a file-like object that supports `__len__` interface """
[docs] def __init__(self, stream, length=None): """ :param stream: the stream to read from or write to :param int length: length of the stream """ super(StreamWithLengthWrapper, self).__init__(stream) self.length = length
def __len__(self): return self.length