Source code for b2sdk.sync.action

######################################################################
#
# File: b2sdk/sync/action.py
#
# Copyright 2019 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations

import contextlib
import logging
import os
from abc import ABCMeta, abstractmethod
from contextlib import suppress

from ..bucket import Bucket
from ..file_version import FileVersion
from ..http_constants import SRC_LAST_MODIFIED_MILLIS
from ..scan.path import B2Path
from ..sync.report import ProgressReport, SyncReport
from ..transfer.outbound.outbound_source import OutboundTransferSource
from ..transfer.outbound.upload_source import UploadSourceLocalFile
from .encryption_provider import AbstractSyncEncryptionSettingsProvider
from .report import SyncFileReporter

logger = logging.getLogger(__name__)


[docs]class AbstractAction(metaclass=ABCMeta): """ An action to take, such as uploading, downloading, or deleting a file. Multi-threaded tasks create a sequence of Actions which are then run by a pool of threads. An action can depend on other actions completing. An example of this is making sure a CreateBucketAction happens before an UploadFileAction. """
[docs] def run(self, bucket: Bucket, reporter: ProgressReport, dry_run: bool = False): """ Main action routine. :param bucket: a Bucket object :type bucket: b2sdk.bucket.Bucket :param reporter: a place to report errors :param dry_run: if True, perform a dry run :type dry_run: bool """ try: if not dry_run: self.do_action(bucket, reporter) self.do_report(bucket, reporter) except Exception as e: logger.exception('an exception occurred in a sync action') reporter.error(str(self) + ": " + repr(e) + ' ' + str(e)) raise # Re-throw so we can identify failed actions
[docs] @abstractmethod def get_bytes(self) -> int: """ Return the number of bytes to transfer for this action. """
[docs] @abstractmethod def do_action(self, bucket: Bucket, reporter: ProgressReport) -> None: """ Perform the action, returning only after the action is completed. :param bucket: a Bucket object :param reporter: a place to report errors """
[docs] @abstractmethod def do_report(self, bucket: Bucket, reporter: ProgressReport) -> None: """ Report the action performed. :param bucket: a Bucket object :param reporter: a place to report errors """
[docs]class B2UploadAction(AbstractAction): """ File uploading action. """
[docs] def __init__( self, local_full_path: str, relative_name: str, b2_file_name: str, mod_time_millis: int, size: int, encryption_settings_provider: AbstractSyncEncryptionSettingsProvider, ): """ :param local_full_path: a local file path :param relative_name: a relative file name :param b2_file_name: a name of a new remote file :param mod_time_millis: file modification time in milliseconds :param size: a file size :param encryption_settings_provider: encryption setting provider """ self.local_full_path = local_full_path self.relative_name = relative_name self.b2_file_name = b2_file_name self.mod_time_millis = mod_time_millis self.size = size self.encryption_settings_provider = encryption_settings_provider self.large_file_sha1 = None # TODO: Remove once we drop Python 3.7 support self.cached_upload_source = None
[docs] def get_bytes(self) -> int: """ Return file size. """ return self.size
@property # TODO: Use @functools.cached_property once we drop Python 3.7 support def _upload_source(self) -> UploadSourceLocalFile: """ Upload source if the file was to be uploaded in full """ # NOTE: We're caching this to ensure that sha1 is not recalculated. if self.cached_upload_source is None: self.cached_upload_source = UploadSourceLocalFile(self.local_full_path) return self.cached_upload_source
[docs] def get_all_sources(self) -> list[OutboundTransferSource]: """ Get list of sources required to complete this upload """ return [self._upload_source]
[docs] def do_action(self, bucket: Bucket, reporter: ProgressReport) -> None: """ Perform the uploading action, returning only after the action is completed. :param bucket: a Bucket object :param reporter: a place to report errors """ if reporter: progress_listener = SyncFileReporter(reporter) else: progress_listener = None file_info = {SRC_LAST_MODIFIED_MILLIS: str(self.mod_time_millis)} encryption = self.encryption_settings_provider.get_setting_for_upload( bucket=bucket, b2_file_name=self.b2_file_name, file_info=file_info, length=self.size, ) sources = self.get_all_sources() large_file_sha1 = None if len(sources) > 1: # The upload will be incremental, calculate the large_file_sha1 large_file_sha1 = self._upload_source.get_content_sha1() with contextlib.ExitStack() as exit_stack: if progress_listener: exit_stack.enter_context(progress_listener) bucket.concatenate( sources, self.b2_file_name, progress_listener=progress_listener, file_info=file_info, encryption=encryption, large_file_sha1=large_file_sha1, )
[docs] def do_report(self, bucket: Bucket, reporter: ProgressReport) -> None: """ Report the uploading action performed. :param bucket: a Bucket object :param reporter: a place to report errors """ reporter.print_completion('upload ' + self.relative_name)
def __str__(self) -> str: return f'b2_upload({self.local_full_path}, {self.b2_file_name}, {self.mod_time_millis})'
[docs]class B2IncrementalUploadAction(B2UploadAction):
[docs] def __init__( self, local_full_path: str, relative_name: str, b2_file_name: str, mod_time_millis: int, size: int, encryption_settings_provider: AbstractSyncEncryptionSettingsProvider, file_version: FileVersion | None = None, absolute_minimum_part_size: int | None = None, ): """ :param local_full_path: a local file path :param relative_name: a relative file name :param b2_file_name: a name of a new remote file :param mod_time_millis: file modification time in milliseconds :param size: a file size :param encryption_settings_provider: encryption setting provider :param file_version: version of file currently on the server :param absolute_minimum_part_size: minimum file part size for large files """ super().__init__( local_full_path, relative_name, b2_file_name, mod_time_millis, size, encryption_settings_provider ) self.file_version = file_version self.absolute_minimum_part_size = absolute_minimum_part_size
[docs] def get_all_sources(self) -> list[OutboundTransferSource]: return self._upload_source.get_incremental_sources( self.file_version, self.absolute_minimum_part_size )
[docs]class B2HideAction(AbstractAction):
[docs] def __init__(self, relative_name: str, b2_file_name: str): """ :param relative_name: a relative file name :param b2_file_name: a name of a remote file """ self.relative_name = relative_name self.b2_file_name = b2_file_name
[docs] def get_bytes(self) -> int: """ Return file size. :return: always zero :rtype: int """ return 0
[docs] def do_action(self, bucket: Bucket, reporter: ProgressReport) -> None: """ Perform the hiding action, returning only after the action is completed. :param bucket: a Bucket object :param reporter: a place to report errors """ bucket.hide_file(self.b2_file_name)
[docs] def do_report(self, bucket: Bucket, reporter: SyncReport): """ Report the hiding action performed. :param bucket: a Bucket object :param reporter: a place to report errors """ reporter.update_transfer(1, 0) reporter.print_completion('hide ' + self.relative_name)
def __str__(self) -> str: return f'b2_hide({self.b2_file_name})'
[docs]class B2DownloadAction(AbstractAction):
[docs] def __init__( self, source_path: B2Path, b2_file_name: str, local_full_path: str, encryption_settings_provider: AbstractSyncEncryptionSettingsProvider, ): """ :param source_path: the file to be downloaded :param b2_file_name: b2_file_name :param local_full_path: a local file path :param encryption_settings_provider: encryption setting provider """ self.source_path = source_path self.b2_file_name = b2_file_name self.local_full_path = local_full_path self.encryption_settings_provider = encryption_settings_provider
[docs] def get_bytes(self) -> int: """ Return file size. """ return self.source_path.size
def _ensure_directory_existence(self) -> None: # TODO: this can fail to multiple reasons (e.g. path is a file, permissions etc). # We could provide nice exceptions for it. parent_dir = os.path.dirname(self.local_full_path) if not os.path.isdir(parent_dir): with suppress(OSError): os.makedirs(parent_dir) if not os.path.isdir(parent_dir): raise Exception(f'could not create directory {parent_dir}')
[docs] def do_action(self, bucket: Bucket, reporter: ProgressReport) -> None: """ Perform the downloading action, returning only after the action is completed. :param bucket: a Bucket object :param reporter: a place to report errors """ self._ensure_directory_existence() if reporter: progress_listener = SyncFileReporter(reporter) else: progress_listener = None # Download the file to a .tmp file download_path = self.local_full_path + '.b2.sync.tmp' encryption = self.encryption_settings_provider.get_setting_for_download( bucket=bucket, file_version=self.source_path.selected_version, ) with contextlib.ExitStack() as exit_stack: if progress_listener: exit_stack.enter_context(progress_listener) downloaded_file = bucket.download_file_by_id( self.source_path.selected_version.id_, progress_listener=progress_listener, encryption=encryption, ) downloaded_file.save_to(download_path) # Move the file into place with suppress(OSError): os.unlink(self.local_full_path) os.rename(download_path, self.local_full_path)
[docs] def do_report(self, bucket: Bucket, reporter: ProgressReport) -> None: """ Report the downloading action performed. :param bucket: a Bucket object :param reporter: a place to report errors """ reporter.print_completion('dnload ' + self.source_path.relative_path)
def __str__(self) -> str: return ( 'b2_download(%s, %s, %s, %d)' % ( self.b2_file_name, self.source_path.selected_version.id_, self.local_full_path, self.source_path.mod_time ) )
[docs]class B2CopyAction(AbstractAction): """ File copying action. """
[docs] def __init__( self, b2_file_name: str, source_path: B2Path, dest_b2_file_name, source_bucket: Bucket, destination_bucket: Bucket, encryption_settings_provider: AbstractSyncEncryptionSettingsProvider, ): """ :param b2_file_name: a b2_file_name :param source_path: the file to be copied :param dest_b2_file_name: a name of a destination remote file :param source_bucket: bucket to copy from :param destination_bucket: bucket to copy to :param encryption_settings_provider: encryption setting provider """ self.b2_file_name = b2_file_name self.source_path = source_path self.dest_b2_file_name = dest_b2_file_name self.encryption_settings_provider = encryption_settings_provider self.source_bucket = source_bucket self.destination_bucket = destination_bucket
[docs] def get_bytes(self) -> int: """ Return file size. """ return self.source_path.size
[docs] def do_action(self, bucket: Bucket, reporter: ProgressReport) -> None: """ Perform the copying action, returning only after the action is completed. :param bucket: a Bucket object :param reporter: a place to report errors """ if reporter: progress_listener = SyncFileReporter(reporter) else: progress_listener = None source_encryption = self.encryption_settings_provider.get_source_setting_for_copy( bucket=self.source_bucket, source_file_version=self.source_path.selected_version, ) destination_encryption = self.encryption_settings_provider.get_destination_setting_for_copy( bucket=self.destination_bucket, source_file_version=self.source_path.selected_version, dest_b2_file_name=self.dest_b2_file_name, ) with contextlib.ExitStack() as exit_stack: if progress_listener: exit_stack.enter_context(progress_listener) bucket.copy( self.source_path.selected_version.id_, self.dest_b2_file_name, length=self.source_path.size, progress_listener=progress_listener, destination_encryption=destination_encryption, source_encryption=source_encryption, source_file_info=self.source_path.selected_version.file_info, source_content_type=self.source_path.selected_version.content_type, )
[docs] def do_report(self, bucket: Bucket, reporter: ProgressReport) -> None: """ Report the copying action performed. :param bucket: a Bucket object :param reporter: a place to report errors """ reporter.print_completion('copy ' + self.source_path.relative_path)
def __str__(self) -> str: return ( 'b2_copy(%s, %s, %s, %d)' % ( self.b2_file_name, self.source_path.selected_version.id_, self.dest_b2_file_name, self.source_path.mod_time ) )
[docs]class B2DeleteAction(AbstractAction):
[docs] def __init__(self, relative_name: str, b2_file_name: str, file_id: str, note: str): """ :param relative_name: a relative file name :param b2_file_name: a name of a remote file :param file_id: a file ID :param note: a deletion note """ self.relative_name = relative_name self.b2_file_name = b2_file_name self.file_id = file_id self.note = note
[docs] def get_bytes(self) -> int: """ Return file size. :return: always zero """ return 0
[docs] def do_action(self, bucket: Bucket, reporter: ProgressReport): """ Perform the deleting action, returning only after the action is completed. :param bucket: a Bucket object :param reporter: a place to report errors """ bucket.api.delete_file_version(self.file_id, self.b2_file_name)
[docs] def do_report(self, bucket: Bucket, reporter: SyncReport): """ Report the deleting action performed. :param bucket: a Bucket object :param reporter: a place to report errors """ reporter.update_transfer(1, 0) reporter.print_completion('delete ' + self.relative_name + ' ' + self.note)
def __str__(self) -> str: return f'b2_delete({self.b2_file_name}, {self.file_id}, {self.note})'
[docs]class LocalDeleteAction(AbstractAction):
[docs] def __init__(self, relative_name: str, full_path: str): """ :param relative_name: a relative file name :param full_path: a full local path """ self.relative_name = relative_name self.full_path = full_path
[docs] def get_bytes(self) -> int: """ Return file size. :return: always zero """ return 0
[docs] def do_action(self, bucket: Bucket, reporter: ProgressReport): """ Perform the deleting of a local file action, returning only after the action is completed. :param bucket: a Bucket object :param reporter: a place to report errors """ os.unlink(self.full_path)
[docs] def do_report(self, bucket: Bucket, reporter: SyncReport): """ Report the deleting of a local file action performed. :param bucket: a Bucket object :param reporter: a place to report errors """ reporter.update_transfer(1, 0) reporter.print_completion('delete ' + self.relative_name)
def __str__(self) -> str: return f'local_delete({self.full_path})'