######################################################################
#
# File: b2sdk/sync/action.py
#
# Copyright 2019 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
import logging
import os
from abc import ABCMeta, abstractmethod
from ..bucket import Bucket
from ..http_constants import SRC_LAST_MODIFIED_MILLIS
from ..scan.path import B2Path
from ..transfer.outbound.upload_source import UploadSourceLocalFile
from .encryption_provider import AbstractSyncEncryptionSettingsProvider
from .report import SyncFileReporter
logger = logging.getLogger(__name__)
[docs]class AbstractAction(metaclass=ABCMeta):
"""
An action to take, such as uploading, downloading, or deleting
a file. Multi-threaded tasks create a sequence of Actions which
are then run by a pool of threads.
An action can depend on other actions completing. An example of
this is making sure a CreateBucketAction happens before an
UploadFileAction.
"""
[docs] def run(self, bucket, reporter, dry_run=False):
"""
Main action routine.
:param bucket: a Bucket object
:type bucket: b2sdk.bucket.Bucket
:param reporter: a place to report errors
:param dry_run: if True, perform a dry run
:type dry_run: bool
"""
try:
if not dry_run:
self.do_action(bucket, reporter)
self.do_report(bucket, reporter)
except Exception as e:
logger.exception('an exception occurred in a sync action')
reporter.error(str(self) + ": " + repr(e) + ' ' + str(e))
raise # Re-throw so we can identify failed actions
[docs] @abstractmethod
def get_bytes(self):
"""
Return the number of bytes to transfer for this action.
:rtype: int
"""
[docs] @abstractmethod
def do_action(self, bucket, reporter):
"""
Perform the action, returning only after the action is completed.
:param bucket: a Bucket object
:type bucket: b2sdk.bucket.Bucket
:param reporter: a place to report errors
"""
[docs] @abstractmethod
def do_report(self, bucket, reporter):
"""
Report the action performed.
:param bucket: a Bucket object
:type bucket: b2sdk.bucket.Bucket
:param reporter: a place to report errors
"""
[docs]class B2UploadAction(AbstractAction):
"""
File uploading action.
"""
[docs] def __init__(
self,
local_full_path,
relative_name,
b2_file_name,
mod_time_millis,
size,
encryption_settings_provider: AbstractSyncEncryptionSettingsProvider,
):
"""
:param str local_full_path: a local file path
:param str relative_name: a relative file name
:param str b2_file_name: a name of a new remote file
:param int mod_time_millis: file modification time in milliseconds
:param int size: a file size
:param b2sdk.v2.AbstractSyncEncryptionSettingsProvider encryption_settings_provider: encryption setting provider
"""
self.local_full_path = local_full_path
self.relative_name = relative_name
self.b2_file_name = b2_file_name
self.mod_time_millis = mod_time_millis
self.size = size
self.encryption_settings_provider = encryption_settings_provider
[docs] def get_bytes(self):
"""
Return file size.
:rtype: int
"""
return self.size
[docs] def do_action(self, bucket, reporter):
"""
Perform the uploading action, returning only after the action is completed.
:param b2sdk.v2.Bucket bucket: a Bucket object
:param reporter: a place to report errors
"""
if reporter:
progress_listener = SyncFileReporter(reporter)
else:
progress_listener = None
file_info = {SRC_LAST_MODIFIED_MILLIS: str(self.mod_time_millis)}
encryption = self.encryption_settings_provider.get_setting_for_upload(
bucket=bucket,
b2_file_name=self.b2_file_name,
file_info=file_info,
length=self.size,
)
bucket.upload(
UploadSourceLocalFile(self.local_full_path),
self.b2_file_name,
file_info=file_info,
progress_listener=progress_listener,
encryption=encryption,
)
[docs] def do_report(self, bucket, reporter):
"""
Report the uploading action performed.
:param bucket: a Bucket object
:type bucket: b2sdk.bucket.Bucket
:param reporter: a place to report errors
"""
reporter.print_completion('upload ' + self.relative_name)
def __str__(self):
return 'b2_upload(%s, %s, %s)' % (
self.local_full_path, self.b2_file_name, self.mod_time_millis
)
[docs]class B2HideAction(AbstractAction):
[docs] def __init__(self, relative_name, b2_file_name):
"""
:param relative_name: a relative file name
:type relative_name: str
:param b2_file_name: a name of a remote file
:type b2_file_name: str
"""
self.relative_name = relative_name
self.b2_file_name = b2_file_name
[docs] def get_bytes(self):
"""
Return file size.
:return: always zero
:rtype: int
"""
return 0
[docs] def do_action(self, bucket, reporter):
"""
Perform the hiding action, returning only after the action is completed.
:param bucket: a Bucket object
:type bucket: b2sdk.bucket.Bucket
:param reporter: a place to report errors
"""
bucket.hide_file(self.b2_file_name)
[docs] def do_report(self, bucket, reporter):
"""
Report the hiding action performed.
:param bucket: a Bucket object
:type bucket: b2sdk.bucket.Bucket
:param reporter: a place to report errors
"""
reporter.update_transfer(1, 0)
reporter.print_completion('hide ' + self.relative_name)
def __str__(self):
return 'b2_hide(%s)' % (self.b2_file_name,)
[docs]class B2DownloadAction(AbstractAction):
[docs] def __init__(
self,
source_path: B2Path,
b2_file_name: str,
local_full_path: str,
encryption_settings_provider: AbstractSyncEncryptionSettingsProvider,
):
"""
:param b2sdk.v2.B2Path source_path: the file to be downloaded
:param str b2_file_name: b2_file_name
:param str local_full_path: a local file path
:param b2sdk.v2.AbstractSyncEncryptionSettingsProvider encryption_settings_provider: encryption setting provider
"""
self.source_path = source_path
self.b2_file_name = b2_file_name
self.local_full_path = local_full_path
self.encryption_settings_provider = encryption_settings_provider
[docs] def get_bytes(self):
"""
Return file size.
:rtype: int
"""
return self.source_path.size
def _ensure_directory_existence(self):
parent_dir = os.path.dirname(self.local_full_path)
if not os.path.isdir(parent_dir):
try:
os.makedirs(parent_dir)
except OSError:
pass
if not os.path.isdir(parent_dir):
raise Exception('could not create directory %s' % (parent_dir,))
[docs] def do_action(self, bucket, reporter):
"""
Perform the downloading action, returning only after the action is completed.
:param b2sdk.v2.Bucket bucket: a Bucket object
:param reporter: a place to report errors
"""
self._ensure_directory_existence()
if reporter:
progress_listener = SyncFileReporter(reporter)
else:
progress_listener = None
# Download the file to a .tmp file
download_path = self.local_full_path + '.b2.sync.tmp'
encryption = self.encryption_settings_provider.get_setting_for_download(
bucket=bucket,
file_version=self.source_path.selected_version,
)
downloaded_file = bucket.download_file_by_id(
self.source_path.selected_version.id_,
progress_listener=progress_listener,
encryption=encryption,
)
downloaded_file.save_to(download_path)
# Move the file into place
try:
os.unlink(self.local_full_path)
except OSError:
pass
os.rename(download_path, self.local_full_path)
[docs] def do_report(self, bucket, reporter):
"""
Report the downloading action performed.
:param bucket: a Bucket object
:type bucket: b2sdk.bucket.Bucket
:param reporter: a place to report errors
"""
reporter.print_completion('dnload ' + self.source_path.relative_path)
def __str__(self):
return (
'b2_download(%s, %s, %s, %d)' % (
self.b2_file_name, self.source_path.selected_version.id_, self.local_full_path,
self.source_path.mod_time
)
)
[docs]class B2CopyAction(AbstractAction):
"""
File copying action.
"""
[docs] def __init__(
self,
b2_file_name: str,
source_path: B2Path,
dest_b2_file_name,
source_bucket: Bucket,
destination_bucket: Bucket,
encryption_settings_provider: AbstractSyncEncryptionSettingsProvider,
):
"""
:param str b2_file_name: a b2_file_name
:param b2sdk.v2.B2Path source_path: the file to be copied
:param str dest_b2_file_name: a name of a destination remote file
:param Bucket source_bucket: bucket to copy from
:param Bucket destination_bucket: bucket to copy to
:param b2sdk.v2.AbstractSyncEncryptionSettingsProvider encryption_settings_provider: encryption setting provider
"""
self.b2_file_name = b2_file_name
self.source_path = source_path
self.dest_b2_file_name = dest_b2_file_name
self.encryption_settings_provider = encryption_settings_provider
self.source_bucket = source_bucket
self.destination_bucket = destination_bucket
[docs] def get_bytes(self):
"""
Return file size.
:rtype: int
"""
return self.source_path.size
[docs] def do_action(self, bucket, reporter):
"""
Perform the copying action, returning only after the action is completed.
:param bucket: a Bucket object
:type bucket: b2sdk.bucket.Bucket
:param reporter: a place to report errors
"""
if reporter:
progress_listener = SyncFileReporter(reporter)
else:
progress_listener = None
source_encryption = self.encryption_settings_provider.get_source_setting_for_copy(
bucket=self.source_bucket,
source_file_version=self.source_path.selected_version,
)
destination_encryption = self.encryption_settings_provider.get_destination_setting_for_copy(
bucket=self.destination_bucket,
source_file_version=self.source_path.selected_version,
dest_b2_file_name=self.dest_b2_file_name,
)
bucket.copy(
self.source_path.selected_version.id_,
self.dest_b2_file_name,
length=self.source_path.size,
progress_listener=progress_listener,
destination_encryption=destination_encryption,
source_encryption=source_encryption,
source_file_info=self.source_path.selected_version.file_info,
source_content_type=self.source_path.selected_version.content_type,
)
[docs] def do_report(self, bucket, reporter):
"""
Report the copying action performed.
:param bucket: a Bucket object
:type bucket: b2sdk.bucket.Bucket
:param reporter: a place to report errors
"""
reporter.print_completion('copy ' + self.source_path.relative_path)
def __str__(self):
return (
'b2_copy(%s, %s, %s, %d)' % (
self.b2_file_name, self.source_path.selected_version.id_, self.dest_b2_file_name,
self.source_path.mod_time
)
)
[docs]class B2DeleteAction(AbstractAction):
[docs] def __init__(self, relative_name, b2_file_name, file_id, note):
"""
:param str relative_name: a relative file name
:param str b2_file_name: a name of a remote file
:param str file_id: a file ID
:param str note: a deletion note
"""
self.relative_name = relative_name
self.b2_file_name = b2_file_name
self.file_id = file_id
self.note = note
[docs] def get_bytes(self):
"""
Return file size.
:return: always zero
:rtype: int
"""
return 0
[docs] def do_action(self, bucket, reporter):
"""
Perform the deleting action, returning only after the action is completed.
:param bucket: a Bucket object
:type bucket: b2sdk.bucket.Bucket
:param reporter: a place to report errors
"""
bucket.api.delete_file_version(self.file_id, self.b2_file_name)
[docs] def do_report(self, bucket, reporter):
"""
Report the deleting action performed.
:param bucket: a Bucket object
:type bucket: b2sdk.bucket.Bucket
:param reporter: a place to report errors
"""
reporter.update_transfer(1, 0)
reporter.print_completion('delete ' + self.relative_name + ' ' + self.note)
def __str__(self):
return 'b2_delete(%s, %s, %s)' % (self.b2_file_name, self.file_id, self.note)
[docs]class LocalDeleteAction(AbstractAction):
[docs] def __init__(self, relative_name, full_path):
"""
:param relative_name: a relative file name
:type relative_name: str
:param full_path: a full local path
:type: str
"""
self.relative_name = relative_name
self.full_path = full_path
[docs] def get_bytes(self):
"""
Return file size.
:return: always zero
:rtype: int
"""
return 0
[docs] def do_action(self, bucket, reporter):
"""
Perform the deleting of a local file action,
returning only after the action is completed.
:param bucket: a Bucket object
:type bucket: b2sdk.bucket.Bucket
:param reporter: a place to report errors
"""
os.unlink(self.full_path)
[docs] def do_report(self, bucket, reporter):
"""
Report the deleting of a local file action performed.
:param bucket: a Bucket object
:type bucket: b2sdk.bucket.Bucket
:param reporter: a place to report errors
"""
reporter.update_transfer(1, 0)
reporter.print_completion('delete ' + self.relative_name)
def __str__(self):
return 'local_delete(%s)' % (self.full_path)