Source code for b2sdk._internal.transfer.outbound.upload_source

######################################################################
#
# File: b2sdk/_internal/transfer/outbound/upload_source.py
#
# Copyright 2020 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations

import hashlib
import io
import logging
import os
from abc import abstractmethod
from enum import Enum, auto, unique
from typing import Callable

from b2sdk._internal.exception import InvalidUploadSource
from b2sdk._internal.file_version import BaseFileVersion
from b2sdk._internal.http_constants import DEFAULT_MIN_PART_SIZE
from b2sdk._internal.stream.range import RangeOfInputStream, wrap_with_range
from b2sdk._internal.transfer.outbound.copy_source import CopySource
from b2sdk._internal.transfer.outbound.outbound_source import OutboundTransferSource
from b2sdk._internal.utils import (
    IncrementalHexDigester,
    Sha1HexDigest,
    hex_sha1_of_stream,
    hex_sha1_of_unlimited_stream,
)

logger = logging.getLogger(__name__)


[docs]@unique class UploadMode(Enum): """ Mode of file uploads """ FULL = auto() #: always upload the whole file INCREMENTAL = auto() #: use incremental uploads when possible
[docs]class AbstractUploadSource(OutboundTransferSource): """ The source of data for uploading to b2. """
[docs] @abstractmethod def get_content_sha1(self) -> Sha1HexDigest | None: """ Returns a 40-character string containing the hex SHA1 checksum of the data in the file. """
[docs] @abstractmethod def open(self) -> io.IOBase: """ Returns a binary file-like object from which the data can be read. """
[docs] def is_upload(self) -> bool: return True
[docs] def is_copy(self) -> bool: return False
[docs] def is_sha1_known(self) -> bool: """ Returns information whether SHA1 of the source is currently available. Note that negative result doesn't mean that SHA1 is not available. Calling ``get_content_sha1`` can still provide a valid digest. """ return False
[docs]class UploadSourceBytes(AbstractUploadSource):
[docs] def __init__( self, data_bytes: bytes | bytearray, content_sha1: Sha1HexDigest | None = None, ): """ Initialize upload source using given bytes. :param data_bytes: Data that is to be uploaded. :param content_sha1: SHA1 hexdigest of the data, or ``None``. """ self.data_bytes = data_bytes self.content_sha1 = content_sha1
def __repr__(self) -> str: return '<{classname} data={data} id={id}>'.format( classname=self.__class__.__name__, data=str(self.data_bytes[:20]) + '...' if len(self.data_bytes) > 20 else self.data_bytes, id=id(self), )
[docs] def get_content_length(self) -> int: return len(self.data_bytes)
[docs] def get_content_sha1(self) -> Sha1HexDigest | None: if self.content_sha1 is None: self.content_sha1 = hashlib.sha1(self.data_bytes).hexdigest() return self.content_sha1
[docs] def open(self): return io.BytesIO(self.data_bytes)
[docs] def is_sha1_known(self) -> bool: return self.content_sha1 is not None
[docs]class UploadSourceLocalFileBase(AbstractUploadSource):
[docs] def __init__( self, local_path: os.PathLike | str, content_sha1: Sha1HexDigest | None = None, ): """ Initialize upload source using provided path. :param local_path: Any path-like object that points to a file to be uploaded. :param content_sha1: SHA1 hexdigest of the data, or ``None``. """ self.local_path = local_path self.content_length = 0 self.content_sha1 = content_sha1 self.check_path_and_get_size()
[docs] def check_path_and_get_size(self) -> None: if not os.path.isfile(self.local_path): raise InvalidUploadSource(self.local_path) self.content_length = os.path.getsize(self.local_path)
def __repr__(self) -> str: return ( '<{classname} local_path={local_path} content_length={content_length} ' 'content_sha1={content_sha1} id={id}>' ).format( classname=self.__class__.__name__, local_path=self.local_path, content_length=self.content_length, content_sha1=self.content_sha1, id=id(self), )
[docs] def get_content_length(self) -> int: return self.content_length
[docs] def get_content_sha1(self) -> Sha1HexDigest | None: if self.content_sha1 is None: self.content_sha1 = self._hex_sha1_of_file() return self.content_sha1
[docs] def open(self): return open(self.local_path, 'rb')
def _hex_sha1_of_file(self) -> Sha1HexDigest: with self.open() as f: return hex_sha1_of_stream(f, self.content_length)
[docs] def is_sha1_known(self) -> bool: return self.content_sha1 is not None
[docs]class UploadSourceLocalFileRange(UploadSourceLocalFileBase):
[docs] def __init__( self, local_path: os.PathLike | str, content_sha1: Sha1HexDigest | None = None, offset: int = 0, length: int | None = None, ): """ Initialize upload source using provided path. :param local_path: Any path-like object that points to a file to be uploaded. :param content_sha1: SHA1 hexdigest of the data, or ``None``. :param offset: Position in the file where upload should start from. :param length: Amount of data to be uploaded. If ``None``, length of the remainder of the file is taken. """ super().__init__(local_path, content_sha1) self.file_size = self.content_length self.offset = offset if length is None: self.content_length = self.file_size - self.offset else: if length + self.offset > self.file_size: raise ValueError('Range length overflow file size') self.content_length = length
def __repr__(self) -> str: return ( f'<{self.__class__.__name__} local_path={self.local_path} offset={self.offset} ' f'content_length={self.content_length} content_sha1={self.content_sha1} id={id(self)}>' )
[docs] def open(self): fp = super().open() return wrap_with_range(fp, self.file_size, self.offset, self.content_length)
[docs]class UploadSourceLocalFile(UploadSourceLocalFileBase):
[docs] def get_incremental_sources( self, file_version: BaseFileVersion, min_part_size: int | None = None, ) -> list[OutboundTransferSource]: """ Split the upload into a copy and upload source constructing an incremental upload This will return a list of upload sources. If the upload cannot split, the method will return [self]. """ if not file_version: logger.debug( "Fallback to full upload for %s -- no matching file on server", self.local_path ) return [self] min_part_size = min_part_size or DEFAULT_MIN_PART_SIZE if file_version.size < min_part_size: # existing file size below minimal large file part size logger.debug( "Fallback to full upload for %s -- remote file is smaller than %i bytes", self.local_path, min_part_size ) return [self] if self.get_content_length() < file_version.size: logger.debug( "Fallback to full upload for %s -- local file is smaller than remote", self.local_path ) return [self] content_sha1 = file_version.get_content_sha1() if not content_sha1: logger.debug( "Fallback to full upload for %s -- remote file content SHA1 unknown", self.local_path ) return [self] # We're calculating hexdigest of the first N bytes of the file. However, if the sha1 differs, # we'll be needing the whole hash of the file anyway. So we can use this partial information. with self.open() as fp: digester = IncrementalHexDigester(fp) hex_digest = digester.update_from_stream(file_version.size) if hex_digest != content_sha1: logger.debug( "Fallback to full upload for %s -- content in common range differs", self.local_path, ) # Calculate SHA1 of the remainder of the file and set it. self.content_sha1 = digester.update_from_stream() return [self] logger.debug("Incremental upload of %s is possible.", self.local_path) if file_version.server_side_encryption and file_version.server_side_encryption.is_unknown(): source_encryption = None else: source_encryption = file_version.server_side_encryption sources = [ CopySource( file_version.id_, offset=0, length=file_version.size, encryption=source_encryption, source_file_info=file_version.file_info, source_content_type=file_version.content_type, ), UploadSourceLocalFileRange(self.local_path, offset=file_version.size), ] return sources
[docs]class UploadSourceStream(AbstractUploadSource):
[docs] def __init__( self, stream_opener: Callable[[], io.IOBase], stream_length: int | None = None, stream_sha1: Sha1HexDigest | None = None, ): """ Initialize upload source using arbitrary function. :param stream_opener: A function that opens a stream for uploading. :param stream_length: Length of the stream. If ``None``, data will be calculated from the stream the first time it's required. :param stream_sha1: SHA1 of the stream. If ``None``, data will be calculated from the stream the first time it's required. """ self.stream_opener = stream_opener self._content_length = stream_length self._content_sha1 = stream_sha1
def __repr__(self) -> str: return ( '<{classname} stream_opener={stream_opener} content_length={content_length} ' 'content_sha1={content_sha1} id={id}>' ).format( classname=self.__class__.__name__, stream_opener=repr(self.stream_opener), content_length=self._content_length, content_sha1=self._content_sha1, id=id(self), )
[docs] def get_content_length(self) -> int: if self._content_length is None: self._set_content_length_and_sha1() return self._content_length
[docs] def get_content_sha1(self) -> Sha1HexDigest | None: if self._content_sha1 is None: self._set_content_length_and_sha1() return self._content_sha1
[docs] def open(self): return self.stream_opener()
def _set_content_length_and_sha1(self) -> None: sha1, content_length = hex_sha1_of_unlimited_stream(self.open()) self._content_length = content_length self._content_sha1 = sha1
[docs] def is_sha1_known(self) -> bool: return self._content_sha1 is not None
[docs]class UploadSourceStreamRange(UploadSourceStream):
[docs] def __init__( self, stream_opener: Callable[[], io.IOBase], offset: int = 0, stream_length: int | None = None, stream_sha1: Sha1HexDigest | None = None, ): """ Initialize upload source using arbitrary function. :param stream_opener: A function that opens a stream for uploading. :param offset: Offset from which stream should be uploaded. :param stream_length: Length of the stream. If ``None``, data will be calculated from the stream the first time it's required. :param stream_sha1: SHA1 of the stream. If ``None``, data will be calculated from the stream the first time it's required. """ super().__init__( stream_opener, stream_length=stream_length, stream_sha1=stream_sha1, ) self._offset = offset
def __repr__(self) -> str: return ( '<{classname} stream_opener={stream_opener} offset={offset} ' 'content_length={content_length} content_sha1={content_sha1} id={id}>' ).format( classname=self.__class__.__name__, stream_opener=repr(self.stream_opener), offset=self._offset, content_length=self._content_length, content_sha1=self._content_sha1, id=id(self), )
[docs] def open(self): return RangeOfInputStream(super().open(), self._offset, self._content_length)