######################################################################
#
# File: b2sdk/progress.py
#
# Copyright 2019 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from abc import ABCMeta, abstractmethod
import six
import sys
import time
import hashlib
# tqdm doesn't work on 2.6 with at least some encodings
# on sys.stderr. See: https://github.com/Backblaze/B2_Command_Line_Tool/issues/272
if sys.version_info < (2, 7):
tqdm = None # will fall back to simple progress reporting
else:
try:
from tqdm import tqdm # displays a nice progress bar
except ImportError:
tqdm = None # noqa
[docs]@six.add_metaclass(ABCMeta)
class AbstractProgressListener(object):
"""
Interface expected by B2Api upload and download methods to report
on progress.
This interface just accepts the number of bytes transferred so far.
Subclasses will need to know the total size if they want to report
a percent done.
"""
[docs] def __init__(self):
self._closed = False
[docs] @abstractmethod
def set_total_bytes(self, total_byte_count):
"""
Always called before __enter__ to set the expected total number of bytes.
May be called more than once if an upload is retried.
:param total_byte_count: expected total number of bytes
:type total_byte_count: int
"""
[docs] @abstractmethod
def bytes_completed(self, byte_count):
"""
Reports that the given number of bytes have been transferred
so far. This is not a delta, it is the total number of bytes
transferred so far.
:param byte_count: number of bytes have been transferred
:type byte_count: int
"""
[docs] @abstractmethod
def close(self):
"""
Must be called when you're done with the listener.
In well-structured code, should be called only once.
"""
#import traceback, sys; traceback.print_stack(file=sys.stdout)
assert self._closed is False, 'progress listener was closed twice! uncomment the line above to debug this'
self._closed = True
def __enter__(self):
"""
standard context manager method
"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""
standard context manager method
"""
self.close()
[docs]class TqdmProgressListener(AbstractProgressListener):
"""
Progress listener based on tqdm library
"""
[docs] def __init__(self, description, *args, **kwargs):
self.description = description
self.tqdm = None # set in set_total_bytes()
self.prev_value = 0
super(TqdmProgressListener, self).__init__(*args, **kwargs)
[docs] def set_total_bytes(self, total_byte_count):
"""
Set the expected total number of bytes.
:param total_byte_count: expected total number of bytes
:type total_byte_count: int
"""
if self.tqdm is None:
self.tqdm = tqdm(
desc=self.description,
total=total_byte_count,
unit='B',
unit_scale=True,
leave=True,
miniters=1
)
[docs] def bytes_completed(self, byte_count):
"""
Reports that the given number of bytes have been transferred
so far. This is not a delta, it is the total number of bytes
transferred so far.
:param byte_count: number of bytes have been transferred
:type byte_count: int
"""
# tqdm doesn't support running the progress bar backwards,
# so on an upload retry, it just won't move until it gets
# past the point where it failed.
if self.prev_value < byte_count:
self.tqdm.update(byte_count - self.prev_value)
self.prev_value = byte_count
[docs] def close(self):
"""
Perform clean up operations
"""
if self.tqdm is not None:
self.tqdm.close()
super(TqdmProgressListener, self).close()
[docs]class SimpleProgressListener(AbstractProgressListener):
"""
Just a simple progress listener which prints info on a console
"""
[docs] def __init__(self, description, *args, **kwargs):
self.desc = description
self.complete = 0
self.last_time = time.time()
self.any_printed = False
super(SimpleProgressListener, self).__init__(*args, **kwargs)
[docs] def set_total_bytes(self, total_byte_count):
"""
Set the expected total number of bytes.
:param total_byte_count: expected total number of bytes
:type total_byte_count: int
"""
self.total = total_byte_count
[docs] def bytes_completed(self, byte_count):
"""
Reports that the given number of bytes have been transferred
so far. This is not a delta, it is the total number of bytes
transferred so far.
:param byte_count: number of bytes have been transferred
:type byte_count: int
"""
now = time.time()
elapsed = now - self.last_time
if 3 <= elapsed and self.total != 0:
if not self.any_printed:
print(self.desc)
print(' %d%%' % int(100.0 * byte_count / self.total))
self.last_time = now
self.any_printed = True
[docs] def close(self):
"""
Perform clean up operations
"""
if self.any_printed:
print(' DONE.')
super(SimpleProgressListener, self).close()
[docs]class DoNothingProgressListener(AbstractProgressListener):
"""
This listener performs no any output
"""
[docs] def set_total_bytes(self, total_byte_count):
"""
Set the expected total number of bytes.
:param total_byte_count: expected total number of bytes
:type total_byte_count: int
"""
pass
[docs] def bytes_completed(self, byte_count):
"""
Reports that the given number of bytes have been transferred
so far. This is not a delta, it is the total number of bytes
transferred so far.
:param byte_count: number of bytes have been transferred
:type byte_count: int
"""
pass
[docs] def close(self):
"""
Perform clean up operations
"""
super(DoNothingProgressListener, self).close()
[docs]class ProgressListenerForTest(AbstractProgressListener):
"""
Captures all of the calls so they can be checked.
"""
[docs] def __init__(self, *args, **kwargs):
self.calls = []
super(ProgressListenerForTest, self).__init__(*args, **kwargs)
[docs] def set_total_bytes(self, total_byte_count):
self.calls.append('set_total_bytes(%d)' % (total_byte_count,))
[docs] def bytes_completed(self, byte_count):
self.calls.append('bytes_completed(%d)' % (byte_count,))
[docs] def close(self):
self.calls.append('close()')
super(ProgressListenerForTest, self).close()
[docs] def get_calls(self):
return self.calls
[docs]def make_progress_listener(description, quiet):
"""
Returns a progress listener object depending on some conditions
:param description: listener description
:type description: str
:param quiet: if True, do not output anything
:type quiet: bool
:return: a listener object
"""
if quiet:
return DoNothingProgressListener()
elif tqdm is not None:
return TqdmProgressListener(description)
else:
return SimpleProgressListener(description)
[docs]class AbstractStreamWithProgress(object):
"""
Wraps a file-like object and updates a ProgressListener
as data is read / written.
In the abstract class, read and write methods do not update
the progress - child classes shall do it
"""
[docs] def __init__(self, stream, progress_listener, offset=0):
"""
:param stream: the stream to read from or write to
:param progress_listener: the listener that we tell about progress
:type progress_listener: b2sdk.progress.AbstractProgressListener
:param offset: the starting byte offset in the file
:type offset: int
"""
assert progress_listener is not None
self.stream = stream
self.progress_listener = progress_listener
self.bytes_completed = 0
self.offset = offset
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
return self.stream.__exit__(exc_type, exc_val, exc_tb)
[docs] def seek(self, pos):
"""
Seek to a given position in the stream
:param pos: position in the stream
:type pos: int
"""
return self.stream.seek(pos)
[docs] def tell(self):
"""
Return current stream position
:rtype: int
"""
return self.stream.tell()
[docs] def flush(self):
"""
Flush the stream
"""
self.stream.flush()
[docs] def read(self, size=None):
"""
Read data from the stream
:param size: number of bytes to read
:type size: int
:return: data read from the stream
"""
if size is None:
data = self.stream.read()
else:
data = self.stream.read(size)
return data
[docs] def write(self, data):
"""
Write data to the stream
:param data: a data to write to the stream
"""
self.stream.write(data)
def _update(self, delta):
self.bytes_completed += delta
self.progress_listener.bytes_completed(self.bytes_completed + self.offset)
[docs]class ReadingStreamWithProgress(AbstractStreamWithProgress):
"""
Wraps a file-like object, updates progress while reading
"""
[docs] def read(self, size=None):
"""
Read data from the stream
:param size: number of bytes to read
:type size: int
:return: data read from the stream
"""
data = super(ReadingStreamWithProgress, self).read(size)
self._update(len(data))
return data
[docs]class WritingStreamWithProgress(AbstractStreamWithProgress):
"""
Wraps a file-like object, updates progress while writing
"""
[docs] def write(self, data):
"""
Write data to the stream
:param data: a data to write to the stream
"""
self._update(len(data))
super(WritingStreamWithProgress, self).write(data)
[docs]class StreamWithHash(object):
"""
Wraps a file-like object, calculates SHA1 while reading
and appends hash at the end
"""
[docs] def __init__(self, stream):
"""
:param stream: the stream to read from
:return: None
"""
self.stream = stream
self.digest = hashlib.sha1()
self.hash = None
self.hash_read = 0
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
return self.stream.__exit__(exc_type, exc_val, exc_tb)
[docs] def seek(self, pos):
"""
Seek to a given position in the stream
:param pos: position in the stream
:type pos: int
"""
assert pos == 0
self.stream.seek(0)
self.digest = hashlib.sha1()
self.hash = None
self.hash_read = 0
[docs] def read(self, size=None):
"""
Read data from the stream
:param size: number of bytes to read
:type size: int
:return: data read from the stream
"""
data = b''
if self.hash is None:
# Read some bytes from stream
if size is None:
data = self.stream.read()
else:
data = self.stream.read(size)
# Update hash
self.digest.update(data)
# Check for end of stream
if size is None or len(data) < size:
self.hash = self.digest.hexdigest()
if size is not None:
size -= len(data)
if self.hash is not None:
# The end of stream was reached, return hash now
size = size or len(self.hash)
data += str.encode(self.hash[self.hash_read:self.hash_read + size])
self.hash_read += size
return data
[docs] def hash_size(self):
"""
Calculate size of a hash string
:rtype: int
"""
return self.digest.digest_size * 2