Source code for b2sdk.sync.sync

######################################################################
#
# File: b2sdk/sync/sync.py
#
# Copyright 2019 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################

from __future__ import division

import logging
import six

from ..bounded_queue_executor import BoundedQueueExecutor
from ..exception import CommandError
from ..utils import trace_call
from .policy_manager import POLICY_MANAGER
from .scan_policies import DEFAULT_SCAN_MANAGER
from .report import SyncReport

try:
    import concurrent.futures as futures
except ImportError:
    import futures

logger = logging.getLogger(__name__)


[docs]def next_or_none(iterator): """ Returns the next item from the iterator, or None if there are no more. """ try: return six.advance_iterator(iterator) except StopIteration: return None
[docs]def zip_folders(folder_a, folder_b, reporter, policies_manager=DEFAULT_SCAN_MANAGER): """ An iterator over all of the files in the union of two folders, matching file names. Each item is a pair (file_a, file_b) with the corresponding file in both folders. Either file (but not both) will be None if the file is in only one folder. :param folder_a: first folder object. :type folder_a: b2sdk.sync.folder.AbstractFolder :param folder_b: second folder object. :type folder_b: b2sdk.sync.folder.AbstractFolder :param reporter: reporter object :param policies_manager: policies manager object :return: yields two element tuples """ iter_a = folder_a.all_files(reporter, policies_manager) iter_b = folder_b.all_files(reporter) current_a = next_or_none(iter_a) current_b = next_or_none(iter_b) while current_a is not None or current_b is not None: if current_a is None: yield (None, current_b) current_b = next_or_none(iter_b) elif current_b is None: yield (current_a, None) current_a = next_or_none(iter_a) elif current_a.name < current_b.name: yield (current_a, None) current_a = next_or_none(iter_a) elif current_b.name < current_a.name: yield (None, current_b) current_b = next_or_none(iter_b) else: assert current_a.name == current_b.name yield (current_a, current_b) current_a = next_or_none(iter_a) current_b = next_or_none(iter_b)
[docs]def make_file_sync_actions( sync_type, source_file, dest_file, source_folder, dest_folder, args, now_millis ): """ Yields the sequence of actions needed to sync the two files :param sync_type: synchronization type :type sync_type: str :param source_file: source file object :type source_folder: b2sdk.sync.folder.AbstractFolder :param dest_file: destination file object :type dest_file: b2sdk.sync.file.File :param source_folder: a source folder object :type source_folder: b2sdk.sync.folder.AbstractFolder :param dest_folder: a destination folder object :type dest_folder: b2sdk.sync.folder.AbstractFolder :param args: an object which holds command line arguments :param now_millis: current time in milliseconds :type now_millis: int """ policy = POLICY_MANAGER.get_policy( sync_type, source_file, source_folder, dest_file, dest_folder, now_millis, args ) for action in policy.get_all_actions(): yield action
[docs]def make_folder_sync_actions( source_folder, dest_folder, args, now_millis, reporter, policies_manager=DEFAULT_SCAN_MANAGER ): """ Yields a sequence of actions that will sync the destination folder to the source folder. :param source_folder: source folder object :type source_folder: b2sdk.sync.folder.AbstractFolder :param dest_folder: destination folder object :type dest_folder: b2sdk.sync.folder.AbstractFolder :param args: an object which holds command line arguments :param now_millis: current time in milliseconds :type now_millis: int :param reporter: reporter object :param policies_manager: policies manager object """ if args.skipNewer and args.replaceNewer: raise CommandError('--skipNewer and --replaceNewer are incompatible') if args.delete and (args.keepDays is not None): raise CommandError('--delete and --keepDays are incompatible') if (args.keepDays is not None) and (dest_folder.folder_type() == 'local'): raise CommandError('--keepDays cannot be used for local files') source_type = source_folder.folder_type() dest_type = dest_folder.folder_type() sync_type = '%s-to-%s' % (source_type, dest_type) if (source_folder.folder_type(), dest_folder.folder_type()) not in [ ('b2', 'local'), ('local', 'b2') ]: raise NotImplementedError("Sync support only local-to-b2 and b2-to-local") for source_file, dest_file in zip_folders( source_folder, dest_folder, reporter, policies_manager ): if source_file is None: logger.debug('determined that %s is not present on source', dest_file) elif dest_file is None: logger.debug('determined that %s is not present on destination', source_file) if source_folder.folder_type() == 'local': if source_file is not None: reporter.update_compare(1) else: if dest_file is not None: reporter.update_compare(1) for action in make_file_sync_actions( sync_type, source_file, dest_file, source_folder, dest_folder, args, now_millis ): yield action
[docs]def count_files(local_folder, reporter): """ Counts all of the files in a local folder. :param local_folder: a folder object. :type local_folder: b2sdk.sync.folder.AbstractFolder :param reporter: reporter object """ # Don't pass in a reporter to all_files. Broken symlinks will be reported # during the next pass when the source and dest files are compared. for _ in local_folder.all_files(None): reporter.update_local(1) reporter.end_local()
[docs]@trace_call(logger) def sync_folders( source_folder, dest_folder, args, now_millis, stdout, no_progress, max_workers, policies_manager=DEFAULT_SCAN_MANAGER, dry_run=False, allow_empty_source=False ): """ Syncs two folders. Always ensures that every file in the source is also in the destination. Deletes any file versions in the destination older than history_days. :param source_folder: source folder object :type source_folder: b2sdk.sync.folder.AbstractFolder :param dest_folder: destination folder object :type dest_folder: b2sdk.sync.folder.AbstractFolder :param args: an object which holds command line arguments :param now_millis: current time in milliseconds :type now_millis: int :param stdout: standard output file object :param no_progress: if True, do not show progress :type no_progress: bool :param max_workers: max number of workers :type max_workers: int :param policies_manager: policies manager object :param dry_run: :type dry_run: bool :param allow_empty_source: if True, do not check whether source folder is empty :type allow_empty_source: bool """ # For downloads, make sure that the target directory is there. if dest_folder.folder_type() == 'local' and not dry_run: dest_folder.ensure_present() if source_folder.folder_type() == 'local' and not allow_empty_source: source_folder.ensure_non_empty() # Make a reporter to report progress. with SyncReport(stdout, no_progress) as reporter: # Make an executor to count files and run all of the actions. This is # not the same as the executor in the API object, which is used for # uploads. The tasks in this executor wait for uploads. Putting them # in the same thread pool could lead to deadlock. # # We use an executor with a bounded queue to avoid using up lots of memory # when syncing lots of files. unbounded_executor = futures.ThreadPoolExecutor(max_workers=max_workers) queue_limit = max_workers + 1000 sync_executor = BoundedQueueExecutor(unbounded_executor, queue_limit=queue_limit) # First, start the thread that counts the local files. That's the operation # that should be fastest, and it provides scale for the progress reporting. local_folder = None if source_folder.folder_type() == 'local': local_folder = source_folder if dest_folder.folder_type() == 'local': local_folder = dest_folder if local_folder is None: raise ValueError('neither folder is a local folder') sync_executor.submit(count_files, local_folder, reporter) # Schedule each of the actions bucket = None if source_folder.folder_type() == 'b2': bucket = source_folder.bucket if dest_folder.folder_type() == 'b2': bucket = dest_folder.bucket if bucket is None: raise ValueError('neither folder is a b2 folder') total_files = 0 total_bytes = 0 for action in make_folder_sync_actions( source_folder, dest_folder, args, now_millis, reporter, policies_manager ): logging.debug('scheduling action %s on bucket %s', action, bucket) sync_executor.submit(action.run, bucket, reporter, dry_run) total_files += 1 total_bytes += action.get_bytes() reporter.end_compare(total_files, total_bytes) # Wait for everything to finish sync_executor.shutdown() if sync_executor.get_num_exceptions() != 0: raise CommandError('sync is incomplete')