######################################################################
#
# File: b2sdk/api.py
#
# Copyright 2019 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from typing import Any, Dict, Optional
from .bucket import Bucket, BucketFactory
from .encryption.setting import EncryptionSetting
from .exception import NonExistentBucket, RestrictedBucket
from .file_version import FileIdAndName
from .large_file.services import LargeFileServices
from .raw_api import API_VERSION
from .session import B2Session
from .transfer import (
CopyManager,
DownloadManager,
Emerger,
UploadManager,
)
from .utils import B2TraceMeta, b2_url_encode, limit_trace_arguments
def url_for_api(info, api_name):
"""
Return URL for an API endpoint.
:param info: account info
:param str api_nam:
:rtype: str
"""
if api_name in ['b2_download_file_by_id']:
base = info.get_download_url()
else:
base = info.get_api_url()
return '%s/b2api/%s/%s' % (base, API_VERSION, api_name)
class Services(object):
""" Gathers objects that provide high level logic over raw api usage. """
def __init__(self, session, max_upload_workers=10, max_copy_workers=10):
"""
Initialize Services object using given session.
:param b2sdk.v1.Session session:
:param int max_upload_workers: a number of upload threads
:param int max_copy_workers: a number of copy threads
"""
self.session = session
self.large_file = LargeFileServices(self)
self.download_manager = DownloadManager(self)
self.upload_manager = UploadManager(self, max_upload_workers=max_upload_workers)
self.copy_manager = CopyManager(self, max_copy_workers=max_copy_workers)
self.emerger = Emerger(self)
[docs]class B2Api(metaclass=B2TraceMeta):
"""
Provide file-level access to B2 services.
While :class:`b2sdk.v1.B2RawApi` provides direct access to the B2 web APIs, this
class handles several things that simplify the task of uploading
and downloading files:
- re-acquires authorization tokens when they expire
- retrying uploads when an upload URL is busy
- breaking large files into parts
- emulating a directory structure (B2 buckets are flat)
Adds an object-oriented layer on top of the raw API, so that
buckets and files returned are Python objects with accessor
methods.
The class also keeps a cache of information needed to access the
service, such as auth tokens and upload URLs.
"""
BUCKET_FACTORY_CLASS = staticmethod(BucketFactory)
BUCKET_CLASS = staticmethod(Bucket)
[docs] def __init__(
self,
account_info=None,
cache=None,
raw_api=None,
max_upload_workers=10,
max_copy_workers=10
):
"""
Initialize the API using the given account info.
:param account_info: an instance of :class:`~b2sdk.v1.UrlPoolAccountInfo`,
or any custom class derived from
:class:`~b2sdk.v1.AbstractAccountInfo`
To learn more about Account Info objects, see here
:class:`~b2sdk.v1.SqliteAccountInfo`
:param cache: an instance of the one of the following classes:
:class:`~b2sdk.cache.DummyCache`, :class:`~b2sdk.cache.InMemoryCache`,
:class:`~b2sdk.cache.AuthInfoCache`,
or any custom class derived from :class:`~b2sdk.cache.AbstractCache`
It is used by B2Api to cache the mapping between bucket name and bucket ids.
default is :class:`~b2sdk.cache.DummyCache`
:param raw_api: an instance of one of the following classes:
:class:`~b2sdk.raw_api.B2RawApi`, :class:`~b2sdk.raw_simulator.RawSimulator`,
or any custom class derived from :class:`~b2sdk.raw_api.AbstractRawApi`
It makes network-less unit testing simple by using :class:`~b2sdk.raw_simulator.RawSimulator`,
in tests and :class:`~b2sdk.raw_api.B2RawApi` in production.
default is :class:`~b2sdk.raw_api.B2RawApi`
:param int max_upload_workers: a number of upload threads, default is 10
:param int max_copy_workers: a number of copy threads, default is 10
"""
self.session = B2Session(account_info=account_info, cache=cache, raw_api=raw_api)
self.services = Services(
self.session,
max_upload_workers=max_upload_workers,
max_copy_workers=max_copy_workers,
)
@property
def account_info(self):
return self.session.account_info
@property
def cache(self):
return self.session.cache
@property
def raw_api(self):
"""
.. warning::
:class:`~b2sdk.raw_api.B2RawApi` attribute is deprecated.
:class:`~b2sdk.session.B2Session` expose all :class:`~b2sdk.raw_api.B2RawApi` methods now."""
return self.session.raw_api
[docs] def authorize_automatically(self):
"""
Perform automatic account authorization, retrieving all account data
from account info object passed during initialization.
"""
return self.session.authorize_automatically()
[docs] @limit_trace_arguments(only=('self', 'realm'))
def authorize_account(self, realm, application_key_id, application_key):
"""
Perform account authorization.
:param str realm: a realm to authorize account in (usually just "production")
:param str application_key_id: :term:`application key ID`
:param str application_key: user's :term:`application key`
"""
self.session.authorize_account(realm, application_key_id, application_key)
[docs] def get_account_id(self):
"""
Return the account ID.
:rtype: str
"""
return self.account_info.get_account_id()
# buckets
[docs] def create_bucket(
self,
name,
bucket_type,
bucket_info=None,
cors_rules=None,
lifecycle_rules=None,
default_server_side_encryption: Optional[EncryptionSetting] = None,
):
"""
Create a bucket.
:param str name: bucket name
:param str bucket_type: a bucket type, could be one of the following values: ``"allPublic"``, ``"allPrivate"``
:param dict bucket_info: additional bucket info to store with the bucket
:param dict cors_rules: bucket CORS rules to store with the bucket
:param dict lifecycle_rules: bucket lifecycle rules to store with the bucket
:param b2sdk.v1.EncryptionSetting default_server_side_encryption: default server side encryption settings (``None`` if unknown)
:return: a Bucket object
:rtype: b2sdk.v1.Bucket
"""
account_id = self.account_info.get_account_id()
response = self.session.create_bucket(
account_id,
name,
bucket_type,
bucket_info=bucket_info,
cors_rules=cors_rules,
lifecycle_rules=lifecycle_rules,
default_server_side_encryption=default_server_side_encryption,
)
bucket = self.BUCKET_FACTORY_CLASS.from_api_bucket_dict(self, response)
assert name == bucket.name, 'API created a bucket with different name\
than requested: %s != %s' % (name, bucket.name)
assert bucket_type == bucket.type_, 'API created a bucket with different type\
than requested: %s != %s' % (
bucket_type, bucket.type_
)
self.cache.save_bucket(bucket)
return bucket
[docs] def download_file_by_id(
self,
file_id,
download_dest,
progress_listener=None,
range_=None,
encryption: Optional[EncryptionSetting] = None,
):
"""
Download a file with the given ID.
:param str file_id: a file ID
:param download_dest: an instance of the one of the following classes: \
:class:`~b2sdk.v1.DownloadDestLocalFile`,\
:class:`~b2sdk.v1.DownloadDestBytes`,\
:class:`~b2sdk.v1.DownloadDestProgressWrapper`,\
:class:`~b2sdk.v1.PreSeekedDownloadDest`,\
or any sub class of :class:`~b2sdk.v1.AbstractDownloadDestination`
:param progress_listener: an instance of the one of the following classes: \
:class:`~b2sdk.v1.PartProgressReporter`,\
:class:`~b2sdk.v1.TqdmProgressListener`,\
:class:`~b2sdk.v1.SimpleProgressListener`,\
:class:`~b2sdk.v1.DoNothingProgressListener`,\
:class:`~b2sdk.v1.ProgressListenerForTest`,\
:class:`~b2sdk.v1.SyncFileReporter`,\
or any sub class of :class:`~b2sdk.v1.AbstractProgressListener`
:param list range_: a list of two integers, the first one is a start\
position, and the second one is the end position in the file
:param b2sdk.v1.EncryptionSetting encryption: encryption settings (``None`` if unknown)
:return: context manager that returns an object that supports iter_content()
"""
url = self.session.get_download_url_by_id(file_id)
return self.services.download_manager.download_file_from_url(
url,
download_dest,
progress_listener,
range_,
encryption,
)
[docs] def get_bucket_by_id(self, bucket_id):
"""
Return a bucket object with a given ID. Unlike ``get_bucket_by_name``, this method does not need to make any API calls.
:param str bucket_id: a bucket ID
:return: a Bucket object
:rtype: b2sdk.v1.Bucket
"""
return self.BUCKET_CLASS(self, bucket_id)
[docs] def get_bucket_by_name(self, bucket_name):
"""
Return the Bucket matching the given bucket_name.
:param str bucket_name: the name of the bucket to return
:return: a Bucket object
:rtype: b2sdk.v1.Bucket
:raises b2sdk.v1.exception.NonExistentBucket: if the bucket does not exist in the account
"""
# Give a useful warning if the current application key does not
# allow access to the named bucket.
self.check_bucket_restrictions(bucket_name)
# First, try the cache.
id_ = self.cache.get_bucket_id_or_none_from_bucket_name(bucket_name)
if id_ is not None:
return self.BUCKET_CLASS(self, id_, name=bucket_name)
# Second, ask the service
for bucket in self.list_buckets(bucket_name=bucket_name):
assert bucket.name.lower() == bucket_name.lower()
return bucket
# There is no such bucket.
raise NonExistentBucket(bucket_name)
[docs] def delete_bucket(self, bucket):
"""
Delete a chosen bucket.
:param b2sdk.v1.Bucket bucket: a :term:`bucket` to delete
:rtype: None
"""
account_id = self.account_info.get_account_id()
self.session.delete_bucket(account_id, bucket.id_)
[docs] def list_buckets(self, bucket_name=None, bucket_id=None):
"""
Call ``b2_list_buckets`` and return a list of buckets.
When no bucket name nor ID is specified, returns *all* of the buckets
in the account. When a bucket name or ID is given, returns just that
bucket. When authorized with an :term:`application key` restricted to
one :term:`bucket`, you must specify the bucket name, or the request
will be unauthorized.
:param str bucket_name: the name of the one bucket to return
:param str bucket_id: the ID of the one bucket to return
:rtype: list[b2sdk.v1.Bucket]
"""
# Give a useful warning if the current application key does not
# allow access to the named bucket.
self.check_bucket_restrictions(bucket_name)
account_id = self.account_info.get_account_id()
self.check_bucket_restrictions(bucket_name)
response = self.session.list_buckets(
account_id, bucket_name=bucket_name, bucket_id=bucket_id
)
buckets = self.BUCKET_FACTORY_CLASS.from_api_response(self, response)
if bucket_name is not None:
# If a bucket_name is specified we don't clear the cache because the other buckets could still
# be valid. So we save the one bucket returned from the list_buckets call.
for bucket in buckets:
self.cache.save_bucket(bucket)
else:
# Otherwise we want to clear the cache and save the buckets returned from list_buckets
# since we just got a new list of all the buckets for this account.
self.cache.set_bucket_name_cache(buckets)
return buckets
[docs] def list_parts(self, file_id, start_part_number=None, batch_size=None):
"""
Generator that yields a :py:class:`b2sdk.v1.Part` for each of the parts that have been uploaded.
:param str file_id: the ID of the large file that is not finished
:param int start_part_number: the first part number to return; defaults to the first part
:param int batch_size: the number of parts to fetch at a time from the server
:rtype: generator
"""
return self.services.large_file.list_parts(
file_id, start_part_number=start_part_number, batch_size=batch_size
)
# delete/cancel
[docs] def cancel_large_file(self, file_id):
"""
Cancel a large file upload.
:param str file_id: a file ID
:rtype: None
"""
return self.services.large_file.cancel_large_file(file_id)
[docs] def delete_file_version(self, file_id, file_name):
"""
Permanently and irrevocably delete one version of a file.
:param str file_id: a file ID
:param str file_name: a file name
:rtype: FileIdAndName
"""
# filename argument is not first, because one day it may become optional
response = self.session.delete_file_version(file_id, file_name)
assert response['fileId'] == file_id
assert response['fileName'] == file_name
return FileIdAndName(file_id, file_name)
# download
[docs] def get_download_url_for_fileid(self, file_id):
"""
Return a URL to download the given file by ID.
:param str file_id: a file ID
"""
url = url_for_api(self.account_info, 'b2_download_file_by_id')
return '%s?fileId=%s' % (url, file_id)
[docs] def get_download_url_for_file_name(self, bucket_name, file_name):
"""
Return a URL to download the given file by name.
:param str bucket_name: a bucket name
:param str file_name: a file name
"""
self.check_bucket_restrictions(bucket_name)
return '%s/file/%s/%s' % (
self.account_info.get_download_url(), bucket_name, b2_url_encode(file_name)
)
# keys
[docs] def create_key(
self,
capabilities,
key_name,
valid_duration_seconds=None,
bucket_id=None,
name_prefix=None,
):
"""
Create a new :term:`application key`.
:param list capabilities: a list of capabilities
:param str key_name: a name of a key
:param int,None valid_duration_seconds: key auto-expire time after it is created, in seconds, or ``None`` to not expire
:param str,None bucket_id: a bucket ID to restrict the key to, or ``None`` to not restrict
:param str,None name_prefix: a remote filename prefix to restrict the key to or ``None`` to not restrict
"""
account_id = self.account_info.get_account_id()
response = self.session.create_key(
account_id,
capabilities=capabilities,
key_name=key_name,
valid_duration_seconds=valid_duration_seconds,
bucket_id=bucket_id,
name_prefix=name_prefix
)
assert set(response['capabilities']) == set(capabilities)
assert response['keyName'] == key_name
return response
[docs] def delete_key(self, application_key_id):
"""
Delete :term:`application key`.
:param str application_key_id: an :term:`application key ID`
"""
response = self.session.delete_key(application_key_id=application_key_id)
return response
[docs] def list_keys(self, start_application_key_id=None):
"""
List application keys.
:param str,None start_application_key_id: an :term:`application key ID` to start from or ``None`` to start from the beginning
"""
account_id = self.account_info.get_account_id()
return self.session.list_keys(
account_id, max_key_count=1000, start_application_key_id=start_application_key_id
)
# other
[docs] def get_file_info(self, file_id: str) -> Dict[str, Any]:
"""
Legacy interface which just returns whatever remote API returns.
.. todo::
get_file_info() should return a File with .delete(), copy(), rename(), read() and so on
:param str file_id: the id of the file who's info will be retrieved.
:return: The parsed response
:rtype: dict
"""
return self.session.get_file_info_by_id(file_id)
[docs] def check_bucket_restrictions(self, bucket_name):
"""
Check to see if the allowed field from authorize-account has a bucket restriction.
If it does, checks if the bucket_name for a given api call matches that.
If not, it raises a :py:exc:`b2sdk.v1.exception.RestrictedBucket` error.
:param str bucket_name: a bucket name
:raises b2sdk.v1.exception.RestrictedBucket: if the account is not allowed to use this bucket
"""
allowed = self.account_info.get_allowed()
allowed_bucket_name = allowed['bucketName']
if allowed_bucket_name is not None:
if allowed_bucket_name != bucket_name:
raise RestrictedBucket(allowed_bucket_name)