######################################################################
#
# File: b2sdk/_internal/api.py
#
# Copyright 2019 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations
import logging
from collections.abc import Sequence
from contextlib import suppress
from typing import Generator
from .account_info.abstract import AbstractAccountInfo
from .account_info.exception import MissingAccountData
from .api_config import DEFAULT_HTTP_API_CONFIG, B2HttpApiConfig
from .application_key import ApplicationKey, BaseApplicationKey, FullApplicationKey
from .bucket import Bucket, BucketFactory
from .cache import AbstractCache
from .encryption.setting import EncryptionSetting
from .exception import (
BucketIdNotFound,
NonExistentBucket,
RestrictedBucket,
RestrictedBucketMissing,
)
from .file_lock import FileRetentionSetting, LegalHold
from .file_version import (
DownloadVersion,
DownloadVersionFactory,
FileIdAndName,
FileVersion,
FileVersionFactory,
)
from .large_file.services import LargeFileServices
from .progress import AbstractProgressListener
from .raw_api import API_VERSION, LifecycleRule
from .replication.setting import ReplicationConfiguration
from .session import B2Session
from .transfer import (
CopyManager,
DownloadManager,
Emerger,
UploadManager,
)
from .transfer.inbound.downloaded_file import DownloadedFile
from .utils import B2TraceMeta, b2_url_encode, limit_trace_arguments
logger = logging.getLogger(__name__)
def url_for_api(info, api_name):
"""
Return URL for an API endpoint.
:param info: account info
:param str api_nam:
:rtype: str
"""
if api_name in ['b2_download_file_by_id']:
base = info.get_download_url()
else:
base = info.get_api_url()
return f'{base}/b2api/{API_VERSION}/{api_name}'
class Services:
""" Gathers objects that provide high level logic over raw api usage. """
UPLOAD_MANAGER_CLASS = staticmethod(UploadManager)
COPY_MANAGER_CLASS = staticmethod(CopyManager)
DOWNLOAD_MANAGER_CLASS = staticmethod(DownloadManager)
LARGE_FILE_SERVICES_CLASS = staticmethod(LargeFileServices)
def __init__(
self,
api,
max_upload_workers: int | None = None,
max_copy_workers: int | None = None,
max_download_workers: int | None = None,
save_to_buffer_size: int | None = None,
check_download_hash: bool = True,
max_download_streams_per_file: int | None = None,
):
"""
Initialize Services object using given session.
:param b2sdk.v2.B2Api api:
:param max_upload_workers: a number of upload threads
:param max_copy_workers: a number of copy threads
:param max_download_workers: maximum number of download threads
:param save_to_buffer_size: buffer size to use when writing files using DownloadedFile.save_to
:param check_download_hash: whether to check hash of downloaded files. Can be disabled for files with internal checksums, for example, or to forcefully retrieve objects with corrupted payload or hash value
:param max_download_streams_per_file: how many streams to use for parallel downloader
"""
self.api = api
self.session = api.session
self.large_file = self.LARGE_FILE_SERVICES_CLASS(self)
self.upload_manager = self.UPLOAD_MANAGER_CLASS(
services=self, max_workers=max_upload_workers
)
self.copy_manager = self.COPY_MANAGER_CLASS(services=self, max_workers=max_copy_workers)
assert max_download_streams_per_file is None or max_download_streams_per_file >= 1
self.download_manager = self.DOWNLOAD_MANAGER_CLASS(
services=self,
max_workers=max_download_workers,
write_buffer_size=save_to_buffer_size,
check_hash=check_download_hash,
max_download_streams_per_file=max_download_streams_per_file,
)
self.emerger = Emerger(self)
class B2Api(metaclass=B2TraceMeta):
"""
Provide file-level access to B2 services.
While :class:`b2sdk.v2.B2RawHTTPApi` provides direct access to the B2 web APIs, this
class handles several things that simplify the task of uploading
and downloading files:
- re-acquires authorization tokens when they expire
- retrying uploads when an upload URL is busy
- breaking large files into parts
- emulating a directory structure (B2 buckets are flat)
Adds an object-oriented layer on top of the raw API, so that
buckets and files returned are Python objects with accessor
methods.
The class also keeps a cache of information needed to access the
service, such as auth tokens and upload URLs.
"""
BUCKET_FACTORY_CLASS = staticmethod(BucketFactory)
BUCKET_CLASS = staticmethod(Bucket)
SESSION_CLASS = staticmethod(B2Session)
FILE_VERSION_FACTORY_CLASS = staticmethod(FileVersionFactory)
DOWNLOAD_VERSION_FACTORY_CLASS = staticmethod(DownloadVersionFactory)
SERVICES_CLASS = staticmethod(Services)
DEFAULT_LIST_KEY_COUNT = 1000
def __init__(
self,
account_info: AbstractAccountInfo | None = None,
cache: AbstractCache | None = None,
max_upload_workers: int | None = None,
max_copy_workers: int | None = None,
api_config: B2HttpApiConfig = DEFAULT_HTTP_API_CONFIG,
max_download_workers: int | None = None,
save_to_buffer_size: int | None = None,
check_download_hash: bool = True,
max_download_streams_per_file: int | None = None,
):
"""
Initialize the API using the given account info.
:param account_info: To learn more about Account Info objects, see here
:class:`~b2sdk.v2.SqliteAccountInfo`
:param cache: It is used by B2Api to cache the mapping between bucket name and bucket ids.
default is :class:`~b2sdk._internal.cache.DummyCache`
:param max_upload_workers: a number of upload threads
:param max_copy_workers: a number of copy threads
:param api_config:
:param max_download_workers: maximum number of download threads
:param save_to_buffer_size: buffer size to use when writing files using DownloadedFile.save_to
:param check_download_hash: whether to check hash of downloaded files. Can be disabled for files with internal checksums, for example, or to forcefully retrieve objects with corrupted payload or hash value
:param max_download_streams_per_file: number of streams for parallel download manager
"""
self.session = self.SESSION_CLASS(
account_info=account_info, cache=cache, api_config=api_config
)
self.api_config = api_config
self.file_version_factory = self.FILE_VERSION_FACTORY_CLASS(self)
self.download_version_factory = self.DOWNLOAD_VERSION_FACTORY_CLASS(self)
self.services = self.SERVICES_CLASS(
api=self,
max_upload_workers=max_upload_workers,
max_copy_workers=max_copy_workers,
max_download_workers=max_download_workers,
save_to_buffer_size=save_to_buffer_size,
check_download_hash=check_download_hash,
max_download_streams_per_file=max_download_streams_per_file,
)
@property
def account_info(self):
return self.session.account_info
@property
def cache(self):
return self.session.cache
@property
def raw_api(self):
"""
.. warning::
:class:`~b2sdk._internal.raw_api.B2RawHTTPApi` attribute is deprecated.
:class:`~b2sdk._internal.session.B2Session` expose all
:class:`~b2sdk._internal.raw_api.B2RawHTTPApi` methods now.
"""
return self.session.raw_api
[docs] def authorize_automatically(self):
"""
Perform automatic account authorization, retrieving all account data
from account info object passed during initialization.
"""
return self.session.authorize_automatically()
@limit_trace_arguments(only=('self', 'realm'))
def authorize_account(self, application_key_id, application_key, realm='production'):
"""
Perform account authorization.
:param str application_key_id: :term:`application key ID`
:param str application_key: user's :term:`application key`
:param str realm: a realm to authorize account in (usually just "production")
"""
self.session.authorize_account(realm, application_key_id, application_key)
self._populate_bucket_cache_from_key()
[docs] def get_account_id(self):
"""
Return the account ID.
:rtype: str
"""
return self.account_info.get_account_id()
# buckets
[docs] def create_bucket(
self,
name,
bucket_type,
bucket_info=None,
cors_rules=None,
lifecycle_rules: list[LifecycleRule] | None = None,
default_server_side_encryption: EncryptionSetting | None = None,
is_file_lock_enabled: bool | None = None,
replication: ReplicationConfiguration | None = None,
) -> Bucket:
"""
Create a bucket.
:param str name: bucket name
:param str bucket_type: a bucket type, could be one of the following values: ``"allPublic"``, ``"allPrivate"``
:param dict bucket_info: additional bucket info to store with the bucket
:param dict cors_rules: bucket CORS rules to store with the bucket
:param lifecycle_rules: bucket lifecycle rules to store with the bucket
:param b2sdk.v2.EncryptionSetting default_server_side_encryption: default server side encryption settings (``None`` if unknown)
:param bool is_file_lock_enabled: boolean value specifies whether bucket is File Lock-enabled
:param b2sdk.v2.ReplicationConfiguration replication: bucket replication rules or ``None``
:return: a Bucket object
:rtype: b2sdk.v2.Bucket
"""
account_id = self.account_info.get_account_id()
response = self.session.create_bucket(
account_id,
name,
bucket_type,
bucket_info=bucket_info,
cors_rules=cors_rules,
lifecycle_rules=lifecycle_rules,
default_server_side_encryption=default_server_side_encryption,
is_file_lock_enabled=is_file_lock_enabled,
replication=replication,
)
bucket = self.BUCKET_FACTORY_CLASS.from_api_bucket_dict(self, response)
assert name == bucket.name, f'API created a bucket with different name than requested: {name} != {name}'
assert bucket_type == bucket.type_, f'API created a bucket with different type than requested: {bucket_type} != {bucket.type_}'
self.cache.save_bucket(bucket)
return bucket
[docs] def download_file_by_id(
self,
file_id: str,
progress_listener: AbstractProgressListener | None = None,
range_: tuple[int, int] | None = None,
encryption: EncryptionSetting | None = None,
) -> DownloadedFile:
"""
Download a file with the given ID.
:param str file_id: a file ID
:param progress_listener: a progress listener object to use, or ``None`` to not track progress
:param range_: a list of two integers, the first one is a start\
position, and the second one is the end position in the file
:param encryption: encryption settings (``None`` if unknown)
"""
url = self.session.get_download_url_by_id(file_id)
return self.services.download_manager.download_file_from_url(
url,
progress_listener,
range_,
encryption,
)
[docs] def update_file_retention(
self,
file_id: str,
file_name: str,
file_retention: FileRetentionSetting,
bypass_governance: bool = False,
) -> FileRetentionSetting:
return FileRetentionSetting.from_server_response(
self.session.update_file_retention(
file_id,
file_name,
file_retention,
bypass_governance,
)
)
[docs] def update_file_legal_hold(
self,
file_id: str,
file_name: str,
legal_hold: LegalHold,
) -> LegalHold:
return LegalHold.from_server_response(
self.session.update_file_legal_hold(
file_id,
file_name,
legal_hold,
)
)
def get_bucket_by_id(self, bucket_id: str) -> Bucket:
"""
Return the Bucket matching the given bucket_id.
:raises b2sdk.v2.exception.BucketIdNotFound: if the bucket does not exist in the account
"""
# Give a useful warning if the current application key does not
# allow access to bucket.
self.check_bucket_id_restrictions(bucket_id)
# First, try the cache.
bucket_name = self.cache.get_bucket_name_or_none_from_bucket_id(bucket_id)
if bucket_name is not None:
return self.BUCKET_CLASS(self, bucket_id, name=bucket_name)
# Second, ask the service
for bucket in self.list_buckets(bucket_id=bucket_id):
assert bucket.id_ == bucket_id
return bucket
# There is no such bucket.
raise BucketIdNotFound(bucket_id)
[docs] def get_bucket_by_name(self, bucket_name: str) -> Bucket:
"""
Return the Bucket matching the given bucket_name.
:param str bucket_name: the name of the bucket to return
:return: a Bucket object
:rtype: b2sdk.v2.Bucket
:raises b2sdk.v2.exception.NonExistentBucket: if the bucket does not exist in the account
"""
# Give a useful warning if the current application key does not
# allow access to the named bucket.
self.check_bucket_name_restrictions(bucket_name)
# First, try the cache.
id_ = self.cache.get_bucket_id_or_none_from_bucket_name(bucket_name)
if id_ is not None:
return self.BUCKET_CLASS(self, id_, name=bucket_name)
# Second, ask the service
for bucket in self.list_buckets(bucket_name=bucket_name):
assert bucket.name.lower() == bucket_name.lower()
return bucket
# There is no such bucket.
raise NonExistentBucket(bucket_name)
[docs] def delete_bucket(self, bucket):
"""
Delete a chosen bucket.
:param b2sdk.v2.Bucket bucket: a :term:`bucket` to delete
:rtype: None
"""
account_id = self.account_info.get_account_id()
self.session.delete_bucket(account_id, bucket.id_)
[docs] def list_buckets(self, bucket_name=None, bucket_id=None, *,
use_cache: bool = False) -> Sequence[Bucket]:
"""
Call ``b2_list_buckets`` and return a list of buckets.
When no bucket name nor ID is specified, returns *all* of the buckets
in the account. When a bucket name or ID is given, returns just that
bucket. When authorized with an :term:`application key` restricted to
one :term:`bucket`, you must specify the bucket name or bucket id, or
the request will be unauthorized.
:param str bucket_name: the name of the one bucket to return
:param str bucket_id: the ID of the one bucket to return
:param bool use_cache: if ``True`` use cached bucket list if available and not empty
:rtype: list[b2sdk.v2.Bucket]
"""
# Give a useful warning if the current application key does not
# allow access to the named bucket.
if bucket_name is not None and bucket_id is not None:
raise ValueError('Provide either bucket_name or bucket_id, not both')
if bucket_id:
self.check_bucket_id_restrictions(bucket_id)
else:
self.check_bucket_name_restrictions(bucket_name)
if use_cache:
cached_list = self.cache.list_bucket_names_ids()
buckets = [
self.BUCKET_CLASS(self, cache_b_id, name=cached_b_name)
for cached_b_name, cache_b_id in cached_list if (
(bucket_name is None or bucket_name == cached_b_name) and
(bucket_id is None or bucket_id == cache_b_id)
)
]
if buckets:
logger.debug("Using cached bucket list as it is not empty")
return buckets
account_id = self.account_info.get_account_id()
response = self.session.list_buckets(
account_id, bucket_name=bucket_name, bucket_id=bucket_id
)
buckets = self.BUCKET_FACTORY_CLASS.from_api_response(self, response)
if bucket_name or bucket_id:
# If a bucket_name or bucket_id is specified we don't clear the cache because the other buckets could still
# be valid. So we save the one bucket returned from the list_buckets call.
for bucket in buckets:
self.cache.save_bucket(bucket)
else:
# Otherwise we want to clear the cache and save the buckets returned from list_buckets
# since we just got a new list of all the buckets for this account.
self.cache.set_bucket_name_cache(buckets)
return buckets
[docs] def list_parts(self, file_id, start_part_number=None, batch_size=None):
"""
Generator that yields a :py:class:`b2sdk.v2.Part` for each of the parts that have been uploaded.
:param str file_id: the ID of the large file that is not finished
:param int start_part_number: the first part number to return; defaults to the first part
:param int batch_size: the number of parts to fetch at a time from the server
:rtype: generator
"""
return self.services.large_file.list_parts(
file_id, start_part_number=start_part_number, batch_size=batch_size
)
# delete/cancel
[docs] def cancel_large_file(self, file_id: str) -> FileIdAndName:
"""
Cancel a large file upload.
"""
return self.services.large_file.cancel_large_file(file_id)
[docs] def delete_file_version(
self, file_id: str, file_name: str, bypass_governance: bool = False
) -> FileIdAndName:
"""
Permanently and irrevocably delete one version of a file. bypass_governance must be set to true if deleting a
file version protected by Object Lock governance mode retention settings (unless its retention period expired)
"""
# filename argument is not first, because one day it may become optional
response = self.session.delete_file_version(file_id, file_name, bypass_governance)
return FileIdAndName.from_cancel_or_delete_response(response)
# download
[docs] def get_download_url_for_fileid(self, file_id):
"""
Return a URL to download the given file by ID.
:param str file_id: a file ID
"""
url = url_for_api(self.account_info, 'b2_download_file_by_id')
return f'{url}?fileId={file_id}'
[docs] def get_download_url_for_file_name(self, bucket_name, file_name):
"""
Return a URL to download the given file by name.
:param str bucket_name: a bucket name
:param str file_name: a file name
"""
self.check_bucket_name_restrictions(bucket_name)
return '{}/file/{}/{}'.format(
self.account_info.get_download_url(), bucket_name, b2_url_encode(file_name)
)
# keys
[docs] def create_key(
self,
capabilities: list[str],
key_name: str,
valid_duration_seconds: int | None = None,
bucket_id: str | None = None,
name_prefix: str | None = None,
) -> FullApplicationKey:
"""
Create a new :term:`application key`.
:param capabilities: a list of capabilities
:param key_name: a name of a key
:param valid_duration_seconds: key auto-expire time after it is created, in seconds, or ``None`` to not expire
:param bucket_id: a bucket ID to restrict the key to, or ``None`` to not restrict
:param name_prefix: a remote filename prefix to restrict the key to or ``None`` to not restrict
"""
account_id = self.account_info.get_account_id()
response = self.session.create_key(
account_id,
capabilities=capabilities,
key_name=key_name,
valid_duration_seconds=valid_duration_seconds,
bucket_id=bucket_id,
name_prefix=name_prefix
)
assert set(response['capabilities']) == set(capabilities)
assert response['keyName'] == key_name
return FullApplicationKey.from_create_response(response)
[docs] def delete_key(self, application_key: BaseApplicationKey):
"""
Delete :term:`application key`.
:param application_key: an :term:`application key`
"""
return self.delete_key_by_id(application_key.id_)
[docs] def delete_key_by_id(self, application_key_id: str) -> ApplicationKey:
"""
Delete :term:`application key`.
:param application_key_id: an :term:`application key ID`
"""
response = self.session.delete_key(application_key_id=application_key_id)
return ApplicationKey.from_api_response(response)
[docs] def list_keys(self, start_application_key_id: str | None = None
) -> Generator[ApplicationKey, None, None]:
"""
List application keys. Lazily perform requests to B2 cloud and return all keys.
:param start_application_key_id: an :term:`application key ID` to start from or ``None`` to start from the beginning
"""
account_id = self.account_info.get_account_id()
while True:
response = self.session.list_keys(
account_id,
max_key_count=self.DEFAULT_LIST_KEY_COUNT,
start_application_key_id=start_application_key_id,
)
for entry in response['keys']:
yield ApplicationKey.from_api_response(entry)
next_application_key_id = response['nextApplicationKeyId']
if next_application_key_id is None:
return
start_application_key_id = next_application_key_id
[docs] def get_key(self, key_id: str) -> ApplicationKey | None:
"""
Gets information about a single key: it's capabilities, prefix, name etc
Returns `None` if the key does not exist.
Raises an exception if profile is not permitted to list keys.
"""
with suppress(StopIteration):
key = next(self.list_keys(start_application_key_id=key_id))
# list_keys() may return some other key if `key_id` does not exist;
# thus manually check that we retrieved the right key
if key.id_ == key_id:
return key
# other
[docs] def get_file_info(self, file_id: str) -> FileVersion:
"""
Gets info about file version.
:param str file_id: the id of the file whose info will be retrieved.
"""
return self.file_version_factory.from_api_response(
self.session.get_file_info_by_id(file_id)
)
[docs] def get_file_info_by_name(self, bucket_name: str, file_name: str) -> DownloadVersion:
"""
Gets info about a file version. Similar to `get_file_info` but
takes the bucket name and file name instead of file id.
:param str bucket_name: The name of the bucket where the file resides.
:param str file_name: The name of the file whose info will be retrieved.
"""
bucket = self.get_bucket_by_name(bucket_name)
return bucket.get_file_info_by_name(file_name)
[docs] def check_bucket_name_restrictions(self, bucket_name: str):
"""
Check to see if the allowed field from authorize-account has a bucket restriction.
If it does, checks if the bucket_name for a given api call matches that.
If not, it raises a :py:exc:`b2sdk.v2.exception.RestrictedBucket` error.
:raises b2sdk.v2.exception.RestrictedBucket: if the account is not allowed to use this bucket
"""
self._check_bucket_restrictions('bucketName', bucket_name)
[docs] def check_bucket_id_restrictions(self, bucket_id: str):
"""
Check to see if the allowed field from authorize-account has a bucket restriction.
If it does, checks if the bucket_id for a given api call matches that.
If not, it raises a :py:exc:`b2sdk.v2.exception.RestrictedBucket` error.
:raises b2sdk.v2.exception.RestrictedBucket: if the account is not allowed to use this bucket
"""
self._check_bucket_restrictions('bucketId', bucket_id)
def _check_bucket_restrictions(self, key, value):
allowed = self.account_info.get_allowed()
allowed_bucket_identifier = allowed[key]
if allowed_bucket_identifier is not None:
if allowed_bucket_identifier != value:
raise RestrictedBucket(allowed_bucket_identifier)
def _populate_bucket_cache_from_key(self):
# If the key is restricted to the bucket, pre-populate the cache with it
try:
allowed = self.account_info.get_allowed()
except MissingAccountData:
return
allowed_bucket_id = allowed.get('bucketId')
if allowed_bucket_id is None:
return
allowed_bucket_name = allowed.get('bucketName')
# If we have bucketId set we still need to check bucketName. If the bucketName is None,
# it means that the bucketId belongs to a bucket that was already removed.
if allowed_bucket_name is None:
raise RestrictedBucketMissing()
self.cache.save_bucket(self.BUCKET_CLASS(self, allowed_bucket_id, name=allowed_bucket_name))