######################################################################
#
# File: b2sdk/v1/api.py
#
# Copyright 2021 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from typing import Any, Dict, Optional, overload, Tuple, List
from .download_dest import AbstractDownloadDestination
from b2sdk import _v2 as v2
from b2sdk.api import Services
from .account_info import AbstractAccountInfo
from .bucket import Bucket, BucketFactory, download_file_and_return_info_dict
from .cache import AbstractCache
from .file_version import FileVersionInfo, FileVersionInfoFactory, file_version_info_from_id_and_name
from .session import B2Session
# override to use legacy no-request method of creating a bucket from bucket_id and retain `check_bucket_restrictions`
# public API method
# and to use v1.Bucket
# and to retain cancel_large_file return type
# and to retain old style download_file_by_id signature (allowing for the new one as well) and exception
# and to retain old style get_file_info return type
# and to accept old-style raw_api argument
# and to retain old style create_key, delete_key and list_keys interfaces and behaviour
[docs]class B2Api(v2.B2Api):
SESSION_CLASS = staticmethod(B2Session)
BUCKET_FACTORY_CLASS = staticmethod(BucketFactory)
BUCKET_CLASS = staticmethod(Bucket)
FILE_VERSION_FACTORY_CLASS = staticmethod(FileVersionInfoFactory)
[docs] def __init__(
self,
account_info: Optional[AbstractAccountInfo] = None,
cache: Optional[AbstractCache] = None,
raw_api: v2.B2RawHTTPApi = None,
max_upload_workers: int = 10,
max_copy_workers: int = 10,
api_config: Optional[v2.B2HttpApiConfig] = None,
):
"""
Initialize the API using the given account info.
:param account_info: To learn more about Account Info objects, see here
:class:`~b2sdk.v1.SqliteAccountInfo`
:param cache: It is used by B2Api to cache the mapping between bucket name and bucket ids.
default is :class:`~b2sdk.cache.DummyCache`
:param max_upload_workers: a number of upload threads
:param max_copy_workers: a number of copy threads
:param raw_api:
:param api_config:
"""
self.session = self.SESSION_CLASS(
account_info=account_info,
cache=cache,
raw_api=raw_api,
api_config=api_config,
)
self.file_version_factory = self.FILE_VERSION_FACTORY_CLASS(self)
self.download_version_factory = self.DOWNLOAD_VERSION_FACTORY_CLASS(self)
self.services = Services(
self,
max_upload_workers=max_upload_workers,
max_copy_workers=max_copy_workers,
)
[docs] def get_file_info(self, file_id: str) -> Dict[str, Any]:
"""
Gets info about file version.
:param str file_id: the id of the file.
"""
return self.session.get_file_info_by_id(file_id)
[docs] def get_bucket_by_id(self, bucket_id):
"""
Return a bucket object with a given ID. Unlike ``get_bucket_by_name``, this method does not need to make any API calls.
:param str bucket_id: a bucket ID
:return: a Bucket object
:rtype: b2sdk.v1.Bucket
"""
return self.BUCKET_CLASS(self, bucket_id)
[docs] def check_bucket_restrictions(self, bucket_name):
"""
Check to see if the allowed field from authorize-account has a bucket restriction.
If it does, checks if the bucket_name for a given api call matches that.
If not, it raises a :py:exc:`b2sdk.v1.exception.RestrictedBucket` error.
:param str bucket_name: a bucket name
:raises b2sdk.v1.exception.RestrictedBucket: if the account is not allowed to use this bucket
"""
self.check_bucket_name_restrictions(bucket_name)
[docs] def cancel_large_file(self, file_id: str) -> FileVersionInfo:
file_id_and_name = super().cancel_large_file(file_id)
return file_version_info_from_id_and_name(file_id_and_name, self)
@overload
def download_file_by_id(
self,
file_id: str,
download_dest: AbstractDownloadDestination,
progress_listener: Optional[v2.AbstractProgressListener] = None,
range_: Optional[Tuple[int, int]] = None,
encryption: Optional[v2.EncryptionSetting] = None,
) -> dict:
...
@overload
def download_file_by_id(
self,
file_id: str,
progress_listener: Optional[v2.AbstractProgressListener] = None,
range_: Optional[Tuple[int, int]] = None,
encryption: Optional[v2.EncryptionSetting] = None,
) -> v2.DownloadedFile:
...
[docs] def download_file_by_id(
self,
file_id: str,
download_dest: Optional[AbstractDownloadDestination] = None,
progress_listener: Optional[v2.AbstractProgressListener] = None,
range_: Optional[Tuple[int, int]] = None,
encryption: Optional[v2.EncryptionSetting] = None,
):
"""
Download a file with the given ID.
:param file_id: a file ID
:param download_dest: an instance of the one of the following classes: \
:class:`~b2sdk.v1.DownloadDestLocalFile`,\
:class:`~b2sdk.v1.DownloadDestBytes`,\
:class:`~b2sdk.v1.PreSeekedDownloadDest`,\
or any sub class of :class:`~b2sdk.v1.AbstractDownloadDestination`
:param progress_listener: an instance of the one of the following classes: \
:class:`~b2sdk.v1.PartProgressReporter`,\
:class:`~b2sdk.v1.TqdmProgressListener`,\
:class:`~b2sdk.v1.SimpleProgressListener`,\
:class:`~b2sdk.v1.DoNothingProgressListener`,\
:class:`~b2sdk.v1.ProgressListenerForTest`,\
:class:`~b2sdk.v1.SyncFileReporter`,\
or any sub class of :class:`~b2sdk.v1.AbstractProgressListener`
:param range_: a list of two integers, the first one is a start\
position, and the second one is the end position in the file
:param encryption: encryption settings (``None`` if unknown)
"""
downloaded_file = super().download_file_by_id(
file_id=file_id,
progress_listener=progress_listener,
range_=range_,
encryption=encryption,
)
if download_dest is not None:
try:
return download_file_and_return_info_dict(downloaded_file, download_dest, range_)
except ValueError as ex:
if ex.args == ('no strategy suitable for download was found!',):
raise AssertionError('no strategy suitable for download was found!')
raise
else:
return downloaded_file
[docs] def list_keys(self, start_application_key_id=None) -> dict:
"""
List application keys. Perform a single request and return at most ``self.DEFAULT_LIST_KEY_COUNT`` keys, as
well as the value to supply to the next call as ``start_application_key_id``, if not all keys were retrieved.
:param start_application_key_id: an :term:`application key ID` to start from or ``None`` to start from the beginning
"""
account_id = self.account_info.get_account_id()
return self.session.list_keys(
account_id,
max_key_count=self.DEFAULT_LIST_KEY_COUNT,
start_application_key_id=start_application_key_id
)
[docs] def create_key(
self,
capabilities: List[str],
key_name: str,
valid_duration_seconds: Optional[int] = None,
bucket_id: Optional[str] = None,
name_prefix: Optional[str] = None,
):
return super().create_key(
capabilities=capabilities,
key_name=key_name,
valid_duration_seconds=valid_duration_seconds,
bucket_id=bucket_id,
name_prefix=name_prefix,
).as_dict()
[docs] def delete_key(self, application_key_id):
return super().delete_key_by_id(application_key_id).as_dict()