######################################################################
#
# File: b2sdk/utils/__init__.py
#
# Copyright 2019 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
import base64
import hashlib
import os
import platform
import re
import shutil
import tempfile
import time
import concurrent.futures as futures
from decimal import Decimal
from urllib.parse import quote, unquote_plus
from logfury.v1 import DefaultTraceAbstractMeta, DefaultTraceMeta, limit_trace_arguments, disable_trace, trace_call
[docs]def interruptible_get_result(future):
"""
Wait for the result of a future in a way that can be interrupted
by a KeyboardInterrupt.
This is not necessary in Python 3, but is needed for Python 2.
:param future: a future to get result of
:type future: Future
"""
while True:
try:
return future.result(timeout=1.0)
except futures.TimeoutError:
pass
[docs]def b2_url_encode(s):
"""
URL-encode a unicode string to be sent to B2 in an HTTP header.
:param s: a unicode string to encode
:type s: str
:return: URL-encoded string
:rtype: str
"""
return quote(s.encode('utf-8'))
[docs]def b2_url_decode(s):
"""
Decode a Unicode string returned from B2 in an HTTP header.
:param s: a unicode string to decode
:type s: str
:return: a Python unicode string.
:rtype: str
"""
return unquote_plus(s)
[docs]def choose_part_ranges(content_length, minimum_part_size):
"""
Return a list of (offset, length) for the parts of a large file.
:param content_length: content length value
:type content_length: int
:param minimum_part_size: a minimum file part size
:type minimum_part_size: int
:rtype: list
"""
# If the file is at least twice the minimum part size, we are guaranteed
# to be able to break it into multiple parts that are all at least
# the minimum part size.
assert minimum_part_size * 2 <= content_length
# How many parts can we make?
part_count = min(content_length // minimum_part_size, 10000)
assert 2 <= part_count
# All of the parts, except the last, are the same size. The
# last one may be bigger.
part_size = content_length // part_count
last_part_size = content_length - (part_size * (part_count - 1))
assert minimum_part_size <= last_part_size
# Make all of the parts except the last
parts = [(i * part_size, part_size) for i in range(part_count - 1)]
# Add the last part
start_of_last = (part_count - 1) * part_size
last_part = (start_of_last, content_length - start_of_last)
parts.append(last_part)
return parts
[docs]def hex_sha1_of_stream(input_stream, content_length):
"""
Return the 40-character hex SHA1 checksum of the first content_length
bytes in the input stream.
:param input_stream: stream object, which exposes read() method
:param content_length: expected length of the stream
:type content_length: int
:rtype: str
"""
remaining = content_length
block_size = 1024 * 1024
digest = hashlib.sha1()
while remaining != 0:
to_read = min(remaining, block_size)
data = input_stream.read(to_read)
if len(data) != to_read:
raise ValueError(
'content_length(%s) is more than the size of the file' % content_length
)
digest.update(data)
remaining -= to_read
return digest.hexdigest()
[docs]def hex_sha1_of_unlimited_stream(input_stream, limit=None):
block_size = 1024 * 1024
content_length = 0
digest = hashlib.sha1()
while True:
if limit is not None:
to_read = min(limit - content_length, block_size)
else:
to_read = block_size
data = input_stream.read(to_read)
data_len = len(data)
if data_len > 0:
digest.update(data)
content_length += data_len
if data_len < to_read:
return digest.hexdigest(), content_length
[docs]def hex_sha1_of_file(path_):
with open(
str(path_), 'rb'
) as file: # TODO: remove str() after dropping python 3.5 support, it's here in
# case someone uses pathlib.Paths
return hex_sha1_of_unlimited_stream(file)
[docs]def hex_sha1_of_bytes(data: bytes) -> str:
"""
Return the 40-character hex SHA1 checksum of the data.
"""
return hashlib.sha1(data).hexdigest()
[docs]def hex_md5_of_bytes(data: bytes) -> str:
"""
Return the 32-character hex MD5 checksum of the data.
"""
return hashlib.md5(data).hexdigest()
[docs]def md5_of_bytes(data: bytes) -> bytes:
"""
Return the 16-byte MD5 checksum of the data.
"""
return hashlib.md5(data).digest()
[docs]def b64_of_bytes(data: bytes) -> str:
"""
Return the base64 encoded represtantion of the data.
"""
return base64.b64encode(data).decode()
[docs]def validate_b2_file_name(name):
"""
Raise a ValueError if the name is not a valid B2 file name.
:param name: a string to check
:type name: str
"""
if not isinstance(name, str):
raise ValueError('file name must be a string, not bytes')
name_utf8 = name.encode('utf-8')
if len(name_utf8) < 1:
raise ValueError('file name too short (0 utf-8 bytes)')
if 1000 < len(name_utf8):
raise ValueError('file name too long (more than 1000 utf-8 bytes)')
if name[0] == '/':
raise ValueError("file names must not start with '/'")
if name[-1] == '/':
raise ValueError("file names must not end with '/'")
if '\\' in name:
raise ValueError("file names must not contain '\\'")
if '//' in name:
raise ValueError("file names must not contain '//'")
if chr(127) in name:
raise ValueError("file names must not contain DEL")
if any(250 < len(segment) for segment in name_utf8.split(b'/')):
raise ValueError("file names segments (between '/') can be at most 250 utf-8 bytes")
[docs]def is_file_readable(local_path, reporter=None):
"""
Check if the local file has read permissions.
:param local_path: a file path
:type local_path: str
:param reporter: reporter object to put errors on
:rtype: bool
"""
if not os.path.exists(local_path):
if reporter is not None:
reporter.local_access_error(local_path)
return False
elif not os.access(local_path, os.R_OK):
if reporter is not None:
reporter.local_permission_error(local_path)
return False
return True
[docs]def get_file_mtime(local_path):
"""
Get modification time of a file in milliseconds.
:param local_path: a file path
:type local_path: str
:rtype: int
"""
mod_time = os.path.getmtime(local_path) * 1000
return int(mod_time)
[docs]def set_file_mtime(local_path, mod_time_millis):
"""
Set modification time of a file in milliseconds.
:param local_path: a file path
:type local_path: str
:param mod_time_millis: time to be set
:type mod_time_millis: int
"""
mod_time = mod_time_millis / 1000.0
# We have to convert it this way to avoid differences when mtime
# is read from the local file in the next iterations, and time is fetched
# without rounding.
# This is caused by floating point arithmetic as POSIX systems
# represents mtime as floats and B2 as integers.
# E.g. for 1093258377393, it would be converted to 1093258377.393
# which is actually represented by 1093258377.3929998874664306640625.
# When we save mtime and read it again, we will end up with 1093258377392.
# See #617 for details.
mod_time = float(Decimal('%.3f5' % mod_time))
os.utime(local_path, (mod_time, mod_time))
[docs]def fix_windows_path_limit(path):
"""
Prefix paths when running on Windows to overcome 260 character path length limit.
See https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
:param path: a path to prefix
:type path: str
:return: a prefixed path
:rtype: str
"""
if platform.system() == 'Windows':
if path.startswith('\\\\'):
# UNC network path
return '\\\\?\\UNC\\' + path[2:]
elif os.path.isabs(path):
# local absolute path
return '\\\\?\\' + path
else:
# relative path, don't alter
return path
else:
return path
[docs]class TempDir(object):
"""
Context manager that creates and destroys a temporary directory.
"""
[docs] def __enter__(self):
"""
Return the unicode path to the temp dir.
"""
dirpath_bytes = tempfile.mkdtemp()
self.dirpath = str(dirpath_bytes.replace('\\', '\\\\'))
return self.dirpath
[docs] def __exit__(self, exc_type, exc_val, exc_tb):
shutil.rmtree(self.dirpath)
return None # do not hide exception
def _pick_scale_and_suffix(x):
# suffixes for different scales
suffixes = ' kMGTP'
# We want to use the biggest suffix that makes sense.
ref_digits = str(int(x))
index = (len(ref_digits) - 1) // 3
suffix = suffixes[index]
if suffix == ' ':
suffix = ''
scale = 1000**index
return (scale, suffix)
_CAMELCASE_TO_UNDERSCORE_RE = re.compile('((?<=[a-z0-9])[A-Z]|(?!^)[A-Z](?=[a-z]))')
[docs]def camelcase_to_underscore(input_):
"""
Convert a camel-cased string to a string with underscores.
:param input_: an input string
:type input_: str
:return: string with underscores
:rtype: str
"""
return _CAMELCASE_TO_UNDERSCORE_RE.sub(r'_\1', input_).lower()
[docs]class ConcurrentUsedAuthTokenGuard(object):
"""
Context manager preventing two tokens being used simultaneously.
Throws UploadTokenUsedConcurrently when unable to acquire a lock
Sample usage:
with ConcurrentUsedAuthTokenGuard(lock_for_token, token):
# code that uses the token exclusively
"""
[docs] def __init__(self, lock, token):
self.lock = lock
self.token = token
def __enter__(self):
if not self.lock.acquire(False):
from b2sdk.exception import UploadTokenUsedConcurrently
raise UploadTokenUsedConcurrently(self.token)
def __exit__(self, exc_type, exc_val, exc_tb):
try:
self.lock.release()
except RuntimeError:
# guard against releasing a non-acquired lock
pass
[docs]def current_time_millis():
"""
File times are in integer milliseconds, to avoid roundoff errors.
"""
return int(round(time.time() * 1000))
assert disable_trace
assert limit_trace_arguments
assert trace_call