Source code for b2sdk.progress

######################################################################
#
# File: b2sdk/progress.py
#
# Copyright 2019 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################

from abc import ABCMeta, abstractmethod
import six
import sys
import time
import hashlib

# tqdm doesn't work on 2.6 with at least some encodings
# on sys.stderr.  See: https://github.com/Backblaze/B2_Command_Line_Tool/issues/272
if sys.version_info < (2, 7):
    tqdm = None  # will fall back to simple progress reporting
else:
    try:
        from tqdm import tqdm  # displays a nice progress bar
    except ImportError:
        tqdm = None  # noqa


[docs]@six.add_metaclass(ABCMeta) class AbstractProgressListener(object): """ Interface expected by B2Api upload and download methods to report on progress. This interface just accepts the number of bytes transferred so far. Subclasses will need to know the total size if they want to report a percent done. """ def __init__(self): self._closed = False
[docs] @abstractmethod def set_total_bytes(self, total_byte_count): """ Always called before __enter__ to set the expected total number of bytes. May be called more than once if an upload is retried. :param int total_byte_count: expected total number of bytes """
[docs] @abstractmethod def bytes_completed(self, byte_count): """ Report the given number of bytes that have been transferred so far. This is not a delta, it is the total number of bytes transferred so far. :param int byte_count: number of bytes have been transferred """
[docs] @abstractmethod def close(self): """ Must be called when you're done with the listener. In well-structured code, should be called only once. """ #import traceback, sys; traceback.print_stack(file=sys.stdout) assert self._closed is False, 'progress listener was closed twice! uncomment the line above to debug this' self._closed = True
def __enter__(self): """ A standard context manager method. """ return self def __exit__(self, exc_type, exc_val, exc_tb): """ A standard context manager method. """ self.close()
[docs]class TqdmProgressListener(AbstractProgressListener): """ Progress listener based on tqdm library. """ def __init__(self, description, *args, **kwargs): self.description = description self.tqdm = None # set in set_total_bytes() self.prev_value = 0 super(TqdmProgressListener, self).__init__(*args, **kwargs) def set_total_bytes(self, total_byte_count): if self.tqdm is None: self.tqdm = tqdm( desc=self.description, total=total_byte_count, unit='B', unit_scale=True, leave=True, miniters=1, ) def bytes_completed(self, byte_count): # tqdm doesn't support running the progress bar backwards, # so on an upload retry, it just won't move until it gets # past the point where it failed. if self.prev_value < byte_count: self.tqdm.update(byte_count - self.prev_value) self.prev_value = byte_count def close(self): if self.tqdm is not None: self.tqdm.close() super(TqdmProgressListener, self).close()
[docs]class SimpleProgressListener(AbstractProgressListener): """ Just a simple progress listener which prints info on a console. """ def __init__(self, description, *args, **kwargs): self.desc = description self.complete = 0 self.last_time = time.time() self.any_printed = False super(SimpleProgressListener, self).__init__(*args, **kwargs) def set_total_bytes(self, total_byte_count): self.total = total_byte_count def bytes_completed(self, byte_count): now = time.time() elapsed = now - self.last_time if 3 <= elapsed and self.total != 0: if not self.any_printed: print(self.desc) print(' %d%%' % int(100.0 * byte_count / self.total)) self.last_time = now self.any_printed = True def close(self): if self.any_printed: print(' DONE.') super(SimpleProgressListener, self).close()
[docs]class DoNothingProgressListener(AbstractProgressListener): """ This listener gives no output whatsoever. """ def set_total_bytes(self, total_byte_count): pass def bytes_completed(self, byte_count): pass def close(self): super(DoNothingProgressListener, self).close()
[docs]class ProgressListenerForTest(AbstractProgressListener): """ Capture all of the calls so they can be checked. """ def __init__(self, *args, **kwargs): self.calls = [] super(ProgressListenerForTest, self).__init__(*args, **kwargs) def set_total_bytes(self, total_byte_count): self.calls.append('set_total_bytes(%d)' % (total_byte_count,)) def bytes_completed(self, byte_count): self.calls.append('bytes_completed(%d)' % (byte_count,)) def close(self): self.calls.append('close()') super(ProgressListenerForTest, self).close() def get_calls(self): return self.calls
def make_progress_listener(description, quiet): """ Return a progress listener object depending on some conditions. :param str description: listener description :param bool quiet: if ``True``, do not output anything :return: a listener object """ if quiet: return DoNothingProgressListener() elif tqdm is not None: return TqdmProgressListener(description) else: return SimpleProgressListener(description) class RangeOfInputStream(object): """ Wrap a file-like object (read only) and read the selected range of the file. """ def __init__(self, stream, offset, length): """ :param stream: a seekable stream :param int offset: offset in the stream :param int length: max number of bytes to read """ self.stream = stream self.offset = offset self.remaining = length def __enter__(self): self.stream.__enter__() return self def __exit__(self, exc_type, exc_val, exc_tb): return self.stream.__exit__(exc_type, exc_val, exc_tb) def seek(self, pos): """ Seek to a given position in the stream. :param int pos: position in the stream """ self.stream.seek(self.offset + pos) def read(self, size=None): """ Read data from the stream. :param int size: number of bytes to read :return: data read from the stream """ if size is None: to_read = self.remaining else: to_read = min(size, self.remaining) data = self.stream.read(to_read) self.remaining -= len(data) return data class AbstractStreamWithProgress(object): """ Wrap a file-like object and updates a ProgressListener as data is read / written. In the abstract class, read and write methods do not update the progress - child classes shall do it. """ def __init__(self, stream, progress_listener, offset=0): """ :param stream: the stream to read from or write to :param b2sdk.v1.AbstractProgressListener progress_listener: the listener that we tell about progress :param int offset: the starting byte offset in the file """ assert progress_listener is not None self.stream = stream self.progress_listener = progress_listener self.bytes_completed = 0 self.offset = offset def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): return self.stream.__exit__(exc_type, exc_val, exc_tb) def seek(self, pos): """ Seek to a given position in the stream. :param int pos: position in the stream """ return self.stream.seek(pos) def tell(self): """ Return current stream position. :rtype: int """ return self.stream.tell() def flush(self): """ Flush the stream. """ self.stream.flush() def read(self, size=None): """ Read data from the stream. :param int size: number of bytes to read :return: data read from the stream """ if size is None: data = self.stream.read() else: data = self.stream.read(size) return data def write(self, data): """ Write data to the stream. :param data: a data to write to the stream """ self.stream.write(data) def _update(self, delta): self.bytes_completed += delta self.progress_listener.bytes_completed(self.bytes_completed + self.offset) class ReadingStreamWithProgress(AbstractStreamWithProgress): """ Wrap a file-like object, updates progress while reading. """ def read(self, size=None): """ Read data from the stream. :param int size: number of bytes to read :return: data read from the stream """ data = super(ReadingStreamWithProgress, self).read(size) self._update(len(data)) return data class WritingStreamWithProgress(AbstractStreamWithProgress): """ Wrap a file-like object; updates progress while writing. """ def write(self, data): """ Write data to the stream. :param bytes data: data to write to the stream """ self._update(len(data)) super(WritingStreamWithProgress, self).write(data) class StreamWithHash(object): """ Wrap a file-like object, calculates SHA1 while reading and appends hash at the end. """ def __init__(self, stream): """ :param stream: the stream to read from """ self.stream = stream self.digest = hashlib.sha1() self.hash = None self.hash_read = 0 def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): return self.stream.__exit__(exc_type, exc_val, exc_tb) def seek(self, pos): """ Seek to a given position in the stream. :param int pos: position in the stream """ assert pos == 0 self.stream.seek(0) self.digest = hashlib.sha1() self.hash = None self.hash_read = 0 def read(self, size=None): """ Read data from the stream. :param int size: number of bytes to read :return: read data :rtype: bytes|None """ data = b'' if self.hash is None: # Read some bytes from stream if size is None: data = self.stream.read() else: data = self.stream.read(size) # Update hash self.digest.update(data) # Check for end of stream if size is None or len(data) < size: self.hash = self.digest.hexdigest() if size is not None: size -= len(data) if self.hash is not None: # The end of stream was reached, return hash now size = size or len(self.hash) data += str.encode(self.hash[self.hash_read:self.hash_read + size]) self.hash_read += size return data def hash_size(self): """ Calculate the size of a hash string. :rtype: int """ return self.digest.digest_size * 2