Source code for b2sdk.bucket

######################################################################
#
# File: b2sdk/bucket.py
#
# Copyright 2019 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################

import logging
import six
import threading

from .exception import (
    AlreadyFailed, B2Error, MaxFileSizeExceeded, MaxRetriesExceeded, UnrecognizedBucketType
)
from .file_version import FileVersionInfoFactory
from .progress import DoNothingProgressListener, AbstractProgressListener, RangeOfInputStream, ReadingStreamWithProgress, StreamWithHash
from .unfinished_large_file import UnfinishedLargeFile
from .upload_source import UploadSourceBytes, UploadSourceLocalFile
from .utils import b2_url_encode, choose_part_ranges, hex_sha1_of_stream, interruptible_get_result, validate_b2_file_name
from .utils import B2TraceMeta, disable_trace, limit_trace_arguments
from .raw_api import HEX_DIGITS_AT_END

logger = logging.getLogger(__name__)


class LargeFileUploadState(object):
    """
    Track the status of uploading a large file, accepting updates
    from the tasks that upload each of the parts.

    The aggregated progress is passed on to a ProgressListener that
    reports the progress for the file as a whole.

    This class is THREAD SAFE.
    """

    def __init__(self, file_progress_listener):
        """
        :param b2sdk.v1.AbstractProgressListener file_progress_listener: a progress listener object to use. Use :py:class:`b2sdk.v1.DoNothingProgressListener` to disable.
        """
        self.lock = threading.RLock()
        self.error_message = None
        self.file_progress_listener = file_progress_listener
        self.part_number_to_part_state = {}
        self.bytes_completed = 0

    def set_error(self, message):
        """
        Set an error message.

        :param str message: an error message
        """
        with self.lock:
            self.error_message = message

    def has_error(self):
        """
        Check whether an error occured.

        :rtype: bool
        """
        with self.lock:
            return self.error_message is not None

    def get_error_message(self):
        """
        Fetche an error message.

        :return: an error message
        :rtype: str
        """
        with self.lock:
            assert self.has_error()
            return self.error_message

    def update_part_bytes(self, bytes_delta):
        """
        Update listener progress info.

        :param int bytes_delta: number of bytes to increase a progress for
        """
        with self.lock:
            self.bytes_completed += bytes_delta
            self.file_progress_listener.bytes_completed(self.bytes_completed)


class PartProgressReporter(AbstractProgressListener):
    """
    An adapter that listens to the progress of upload a part and
    gives the information to a :py:class:`b2sdk.bucket.LargeFileUploadState`.

    Accepts absolute bytes_completed from the uploader, and reports
    deltas to the :py:class:`b2sdk.bucket.LargeFileUploadState`.  The bytes_completed for the
    part will drop back to 0 on a retry, which will result in a
    negative delta.
    """

    def __init__(self, large_file_upload_state, *args, **kwargs):
        """
        :param b2sdk.bucket.LargeFileUploadState large_file_upload_state: object to relay the progress to
        """
        super(PartProgressReporter, self).__init__(*args, **kwargs)
        self.large_file_upload_state = large_file_upload_state
        self.prev_byte_count = 0

    def bytes_completed(self, byte_count):
        self.large_file_upload_state.update_part_bytes(byte_count - self.prev_byte_count)
        self.prev_byte_count = byte_count

    def close(self):
        pass

    def set_total_bytes(self, total_byte_count):
        pass


[docs]@six.add_metaclass(B2TraceMeta) class Bucket(object): """ Provide access to a bucket in B2: listing files, uploading and downloading. """ DEFAULT_CONTENT_TYPE = 'b2/x-auto' MAX_UPLOAD_ATTEMPTS = 5 MAX_LARGE_FILE_SIZE = 10 * 1000 * 1000 * 1000 * 1000 # 10 TB
[docs] def __init__( self, api, id_, name=None, type_=None, bucket_info=None, cors_rules=None, lifecycle_rules=None, revision=None, bucket_dict=None, ): """ :param b2sdk.v1.B2Api api: an API object :param str id_: a bucket id :param str name: a bucket name :param str type_: a bucket type :param dict bucket_info: an info to store with a bucket :param dict cors_rules: CORS rules to store with a bucket :param dict lifecycle_rules: lifecycle rules to store with a bucket :param int revision: a bucket revision number :param dict bucket_dict: a dictionary which contains bucket parameters """ self.api = api self.id_ = id_ self.name = name self.type_ = type_ self.bucket_info = bucket_info or {} self.cors_rules = cors_rules or [] self.lifecycle_rules = lifecycle_rules or [] self.revision = revision self.bucket_dict = bucket_dict or {}
[docs] def get_id(self): """ Return bucket ID. :rtype: str """ return self.id_
[docs] def set_info(self, new_bucket_info, if_revision_is=None): """ Update bucket info. :param dict new_bucket_info: new bucket info dictionary :param int if_revision_is: revision number, update the info **only if** *revision* equals to *if_revision_is* """ return self.update(bucket_info=new_bucket_info, if_revision_is=if_revision_is)
[docs] def set_type(self, bucket_type): """ Update bucket type. :param str bucket_type: a bucket type ("allPublic" or "allPrivate") """ return self.update(bucket_type=bucket_type)
[docs] def update( self, bucket_type=None, bucket_info=None, cors_rules=None, lifecycle_rules=None, if_revision_is=None, ): """ Update various bucket parameters. :param str bucket_type: a bucket type :param dict bucket_info: an info to store with a bucket :param dict cors_rules: CORS rules to store with a bucket :param dict lifecycle_rules: lifecycle rules to store with a bucket :param int if_revision_is: revision number, update the info **only if** *revision* equals to *if_revision_is* """ account_id = self.api.account_info.get_account_id() return self.api.session.update_bucket( account_id, self.id_, bucket_type=bucket_type, bucket_info=bucket_info, cors_rules=cors_rules, lifecycle_rules=lifecycle_rules, if_revision_is=if_revision_is )
[docs] def cancel_large_file(self, file_id): """ Cancel a large file transfer. :param str file_id: a file ID """ return self.api.cancel_large_file(file_id)
[docs] def download_file_by_id(self, file_id, download_dest, progress_listener=None, range_=None): """ Download a file by ID. .. note:: download_file_by_id actually belongs in :py:class:`b2sdk.v1.B2Api`, not in :py:class:`b2sdk.v1.Bucket`; we just provide a convenient redirect here :param str file_id: a file ID :param str download_dest: a local file path :param b2sdk.v1.AbstractProgressListener, None progress_listener: a progress listener object to use, or ``None`` to not report progress :param tuple[int, int] range_: two integer values, start and end offsets """ return self.api.download_file_by_id( file_id, download_dest, progress_listener, range_=range_ )
[docs] def download_file_by_name(self, file_name, download_dest, progress_listener=None, range_=None): """ Download a file by name. .. seealso:: :ref:`Synchronizer <sync>`, a *high-performance* utility that synchronizes a local folder with a Bucket. :param str file_id: a file ID :param str download_dest: a local file path :param b2sdk.v1.AbstractProgressListener, None progress_listener: a progress listener object to use, or ``None`` to not track progress :param tuple[int, int] range_: two integer values, start and end offsets """ url = self.api.session.get_download_url_by_name( self.name, file_name, url_factory=self.api.account_info.get_download_url, ) return self.api.transferer.download_file_from_url( url, download_dest, progress_listener, range_ )
[docs] def get_download_authorization(self, file_name_prefix, valid_duration_in_seconds): """ Return an authorization token that is valid only for downloading files from the given bucket. :param str file_name_prefix: a file name prefix, only files that match it could be downloaded :param int valid_duration_in_seconds: a token is valid only during this amount of seconds """ response = self.api.session.get_download_authorization( self.id_, file_name_prefix, valid_duration_in_seconds ) return response['authorizationToken']
[docs] def list_parts(self, file_id, start_part_number=None, batch_size=None): """ Get a list of all parts that have been uploaded for a given file. :param str file_id: a file ID :param int start_part_number: the first part number to return. defaults to the first part. :param int batch_size: the number of parts to fetch at a time from the server """ return self.api.list_parts(file_id, start_part_number, batch_size)
[docs] def ls(self, folder_to_list='', show_versions=False, recursive=False, fetch_count=None): """ Pretend that folders exist and yields the information about the files in a folder. B2 has a flat namespace for the files in a bucket, but there is a convention of using "/" as if there were folders. This method searches through the flat namespace to find the files and "folders" that live within a given folder. When the `recursive` flag is set, lists all of the files in the given folder, and all of its sub-folders. :param str folder_to_list: the name of the folder to list; must not start with "/". Empty string means top-level folder :param bool show_versions: when ``True`` returns info about all versions of a file, when ``False``, just returns info about the most recent versions :param bool recursive: if ``True``, list folders recursively :param int,None fetch_count: how many entries to return or ``None`` to use the default. Acceptable values: 1 - 1000 :rtype: generator[tuple[b2sdk.v1.FileVersionInfo, str]] :returns: generator of (file_version_info, folder_name) tuples .. note:: In case of `recursive=True`, folder_name is returned only for first file in the folder. """ # Every file returned must have a name that starts with the # folder name and a "/". prefix = folder_to_list if prefix != '' and not prefix.endswith('/'): prefix += '/' # Loop until all files in the named directory have been listed. # The starting point of the first list_file_names request is the # prefix we're looking for. The prefix ends with '/', which is # now allowed for file names, so no file name will match exactly, # but the first one after that point is the first file in that # "folder". If the first search doesn't produce enough results, # then we keep calling list_file_names until we get all of the # names in this "folder". current_dir = None start_file_name = prefix start_file_id = None session = self.api.session while True: if show_versions: response = session.list_file_versions( self.id_, start_file_name, start_file_id, fetch_count, prefix ) else: response = session.list_file_names(self.id_, start_file_name, fetch_count, prefix) for entry in response['files']: file_version_info = FileVersionInfoFactory.from_api_response(entry) if not file_version_info.file_name.startswith(prefix): # We're past the files we care about return after_prefix = file_version_info.file_name[len(prefix):] if '/' not in after_prefix or recursive: # This is not a folder, so we'll print it out and # continue on. yield file_version_info, None current_dir = None else: # This is a folder. If it's different than the folder # we're already in, then we can print it. This check # is needed, because all of the files in the folder # will be in the list. folder_with_slash = after_prefix.split('/')[0] + '/' if folder_with_slash != current_dir: folder_name = prefix + folder_with_slash yield file_version_info, folder_name current_dir = folder_with_slash if response['nextFileName'] is None: # The response says there are no more files in the bucket, # so we can stop. return # Now we need to set up the next search. The response from # B2 has the starting point to continue with the next file, # but if we're in the middle of a "folder", we can skip ahead # to the end of the folder. The character after '/' is '0', # so we'll replace the '/' with a '0' and start there. # # When recursive is True, current_dir is always None. if current_dir is None: start_file_name = response.get('nextFileName') start_file_id = response.get('nextFileId') else: start_file_name = max( response['nextFileName'], prefix + current_dir[:-1] + '0', )
[docs] def list_unfinished_large_files(self, start_file_id=None, batch_size=None): """ A generator that yields an :py:class:`b2sdk.v1.UnfinishedLargeFile` for each unfinished large file in the bucket, starting at the given file. :param str,None start_file_id: a file ID to start from or None to start from the beginning :param int,None batch_size: max file count :rtype: generator[b2sdk.v1.UnfinishedLargeFile] """ batch_size = batch_size or 100 while True: batch = self.api.session.list_unfinished_large_files( self.id_, start_file_id, batch_size ) for file_dict in batch['files']: yield UnfinishedLargeFile(file_dict) start_file_id = batch.get('nextFileId') if start_file_id is None: break
[docs] def start_large_file(self, file_name, content_type=None, file_info=None): """ Start a large file transfer. :param str file_name: a file name :param str,None content_type: the MIME type, or ``None`` to accept the default based on file extension of the B2 file name :param dict,None file_infos: a file info to store with the file or ``None`` to not store anything """ return UnfinishedLargeFile( self.api.session.start_large_file(self.id_, file_name, content_type, file_info) )
[docs] @limit_trace_arguments(skip=('data_bytes',)) def upload_bytes( self, data_bytes, file_name, content_type=None, file_infos=None, progress_listener=None, ): """ Upload bytes in memory to a B2 file. :param bytes data_bytes: a byte array to upload :param str file_name: a file name to upload bytes to :param str,None content_type: the MIME type, or ``None`` to accept the default based on file extension of the B2 file name :param dict,None file_infos: a file info to store with the file or ``None`` to not store anything :param b2sdk.v1.AbstractProgressListener,None progress_listener: a progress listener object to use, or ``None`` to not track progress """ upload_source = UploadSourceBytes(data_bytes) return self.upload( upload_source, file_name, content_type=content_type, file_info=file_infos, progress_listener=progress_listener, )
[docs] def upload_local_file( self, local_file, file_name, content_type=None, file_infos=None, sha1_sum=None, min_part_size=None, progress_listener=None, ): """ Upload a file on local disk to a B2 file. .. seealso:: :ref:`Synchronizer <sync>`, a *high-performance* utility that synchronizes a local folder with a :term:`Bucket`. :param str local_file: a path to a file on local disk :param str file_name: a file name of the new B2 file :param str,None content_type: the MIME type, or ``None`` to accept the default based on file extension of the B2 file name :param dict,None file_infos: a file info to store with the file or ``None`` to not store anything :param str,None sha1_sum: file SHA1 hash or ``None`` to compute it automatically :param int min_part_size: a minimum size of a part :param b2sdk.v1.AbstractProgressListener,None progress_listener: a progress listener object to use, or ``None`` to not report progress """ upload_source = UploadSourceLocalFile(local_path=local_file, content_sha1=sha1_sum) return self.upload( upload_source, file_name, content_type=content_type, file_info=file_infos, min_part_size=min_part_size, progress_listener=progress_listener, )
[docs] def upload( self, upload_source, file_name, content_type=None, file_info=None, min_part_size=None, progress_listener=None ): """ Upload a file to B2, retrying as needed. The source of the upload is an UploadSource object that can be used to open (and re-open) the file. The result of opening should be a binary file whose read() method returns bytes. :param b2sdk.v1.UploadSource upload_source: an object that opens the source of the upload :param str file_name: the file name of the new B2 file :param str,None content_type: the MIME type, or ``None`` to accept the default based on file extension of the B2 file name :param dict,None file_infos: a file info to store with the file or ``None`` to not store anything :param int,None min_part_size: the smallest part size to use or ``None`` to determine automatically :param b2sdk.v1.AbstractProgressListener,None progress_listener: a progress listener object to use, or ``None`` to not report progress The function `opener` should return a file-like object, and it must be possible to call it more than once in case the upload is retried. """ validate_b2_file_name(file_name) file_info = file_info or {} content_type = content_type or self.DEFAULT_CONTENT_TYPE progress_listener = progress_listener or DoNothingProgressListener() # We don't upload any large files unless all of the parts can be at least # the minimum part size. min_part_size = max(min_part_size or 0, self.api.account_info.get_minimum_part_size()) min_large_file_size = min_part_size * 2 if upload_source.get_content_length() < min_large_file_size: # Run small uploads in the same thread pool as large file uploads, # so that they share resources during a sync. f = self.api.get_thread_pool().submit( self._upload_small_file, upload_source, file_name, content_type, file_info, progress_listener ) return f.result() else: return self._upload_large_file( upload_source, file_name, content_type, file_info, progress_listener )
def _upload_small_file( self, upload_source, file_name, content_type, file_info, progress_listener ): content_length = upload_source.get_content_length() upload_url = None exception_info_list = [] progress_listener.set_total_bytes(content_length) with progress_listener: for _ in six.moves.xrange(self.MAX_UPLOAD_ATTEMPTS): # refresh upload data in every attempt to work around a "busy storage pod" upload_url, upload_auth_token = self._get_upload_data() try: with upload_source.open() as file: input_stream = ReadingStreamWithProgress(file, progress_listener) hashing_stream = StreamWithHash(input_stream) length_with_hash = content_length + hashing_stream.hash_size() response = self.api.raw_api.upload_file( upload_url, upload_auth_token, file_name, length_with_hash, content_type, HEX_DIGITS_AT_END, file_info, hashing_stream ) assert hashing_stream.hash == response['contentSha1'] self.api.account_info.put_bucket_upload_url( self.id_, upload_url, upload_auth_token ) return FileVersionInfoFactory.from_api_response(response) except B2Error as e: logger.exception('error when uploading, upload_url was %s', upload_url) if not e.should_retry_upload(): raise exception_info_list.append(e) self.api.account_info.clear_bucket_upload_data(self.id_) raise MaxRetriesExceeded(self.MAX_UPLOAD_ATTEMPTS, exception_info_list) def _upload_large_file( self, upload_source, file_name, content_type, file_info, progress_listener ): content_length = upload_source.get_content_length() if self.MAX_LARGE_FILE_SIZE < content_length: raise MaxFileSizeExceeded(content_length, self.MAX_LARGE_FILE_SIZE) minimum_part_size = self.api.account_info.get_minimum_part_size() # Set up the progress reporting for the parts progress_listener.set_total_bytes(content_length) # Select the part boundaries part_ranges = choose_part_ranges(content_length, minimum_part_size) # Check for unfinished files with same name unfinished_file, finished_parts = self._find_unfinished_file_if_possible( upload_source, file_name, file_info, part_ranges, ) # Tell B2 we're going to upload a file if necessary if unfinished_file is None: unfinished_file = self.start_large_file(file_name, content_type, file_info) file_id = unfinished_file.file_id with progress_listener: large_file_upload_state = LargeFileUploadState(progress_listener) # Tell the executor to upload each of the parts part_futures = [ self.api.get_thread_pool().submit( self._upload_part, file_id, part_index + 1, # part number part_range, upload_source, large_file_upload_state, finished_parts ) for (part_index, part_range) in enumerate(part_ranges) ] # Collect the sha1 checksums of the parts as the uploads finish. # If any of them raised an exception, that same exception will # be raised here by result() part_sha1_array = [interruptible_get_result(f)['contentSha1'] for f in part_futures] # Finish the large file response = self.api.session.finish_large_file(file_id, part_sha1_array) return FileVersionInfoFactory.from_api_response(response) def _find_unfinished_file_if_possible(self, upload_source, file_name, file_info, part_ranges): """ Find an unfinished file that may be used to resume a large file upload. The file is found using the filename and comparing the uploaded parts against the local file. This is only possible if the application key being used allows ``listFiles`` access. """ if 'listFiles' in self.api.account_info.get_allowed()['capabilities']: for file_ in self.list_unfinished_large_files(): if file_.file_name == file_name and file_.file_info == file_info: files_match = True finished_parts = {} for part in self.list_parts(file_.file_id): # Compare part sizes offset, part_length = part_ranges[part.part_number - 1] if part_length != part.content_length: files_match = False break # Compare hash with upload_source.open() as f: f.seek(offset) sha1_sum = hex_sha1_of_stream(f, part_length) if sha1_sum != part.content_sha1: files_match = False break # Save part finished_parts[part.part_number] = part # Skip not matching files or unfinished files with no uploaded parts if not files_match or not finished_parts: continue # Return first matched file return file_, finished_parts return None, {} def _upload_part( self, file_id, part_number, part_range, upload_source, large_file_upload_state, finished_parts=None ): # Check if this part was uploaded before if finished_parts is not None and part_number in finished_parts: # Report this part finished part = finished_parts[part_number] large_file_upload_state.update_part_bytes(part.content_length) # Return SHA1 hash return {'contentSha1': part.content_sha1} # Set up a progress listener part_progress_listener = PartProgressReporter(large_file_upload_state) upload_url = None # Retry the upload as needed exception_list = [] for _ in six.moves.xrange(self.MAX_UPLOAD_ATTEMPTS): # refresh upload data in every attempt to work around a "busy storage pod" upload_url, upload_auth_token = self._get_upload_part_data(file_id) # if another part has already had an error there's no point in # uploading this part if large_file_upload_state.has_error(): raise AlreadyFailed(large_file_upload_state.get_error_message()) try: with upload_source.open() as file: offset, content_length = part_range file.seek(offset) range_stream = RangeOfInputStream(file, offset, content_length) input_stream = ReadingStreamWithProgress(range_stream, part_progress_listener) hashing_stream = StreamWithHash(input_stream) length_with_hash = content_length + hashing_stream.hash_size() response = self.api.raw_api.upload_part( upload_url, upload_auth_token, part_number, length_with_hash, HEX_DIGITS_AT_END, hashing_stream ) assert hashing_stream.hash == response['contentSha1'] self.api.account_info.put_large_file_upload_url( file_id, upload_url, upload_auth_token ) return response except B2Error as e: logger.exception('error when uploading, upload_url was %s', upload_url) if not e.should_retry_upload(): raise exception_list.append(e) self.api.account_info.clear_bucket_upload_data(self.id_) large_file_upload_state.set_error(str(exception_list[-1])) raise MaxRetriesExceeded(self.MAX_UPLOAD_ATTEMPTS, exception_list) def _get_upload_data(self): """ Take ownership of an upload URL / auth token for the bucket and return it. """ account_info = self.api.account_info upload_url, upload_auth_token = account_info.take_bucket_upload_url(self.id_) if None not in (upload_url, upload_auth_token): return upload_url, upload_auth_token response = self.api.session.get_upload_url(self.id_) return response['uploadUrl'], response['authorizationToken'] def _get_upload_part_data(self, file_id): """ Make sure that we have an upload URL and auth token for the given bucket and return it. """ account_info = self.api.account_info upload_url, upload_auth_token = account_info.take_large_file_upload_url(file_id) if None not in (upload_url, upload_auth_token): return upload_url, upload_auth_token response = self.api.session.get_upload_part_url(file_id) return (response['uploadUrl'], response['authorizationToken'])
[docs] def get_download_url(self, filename): """ Get file download URL. :param str filename: a file name :rtype: str """ return "%s/file/%s/%s" % ( self.api.account_info.get_download_url(), b2_url_encode(self.name), b2_url_encode(filename), )
[docs] def hide_file(self, file_name): """ Hide a file. :param str file_name: a file name :rtype: b2sdk.v1.FileVersionInfo """ response = self.api.session.hide_file(self.id_, file_name) return FileVersionInfoFactory.from_api_response(response)
[docs] def copy_file( self, file_id, new_file_name, bytes_range=None, metadata_directive=None, content_type=None, file_info=None, ): """ Creates a new file in this bucket by (server-side) copying from an existing file. :param str file_id: file ID of existing file :param str new_file_name: file name of the new file :param tuple[int,int],None bytes_range: start and end offsets (**inclusive!**), default is the entire file :param b2sdk.v1.MetadataDirectiveMode,None metadata_directive: default is :py:attr:`b2sdk.v1.MetadataDirectiveMode.COPY` :param str,None content_type: content_type for the new file if metadata_directive is set to :py:attr:`b2sdk.v1.MetadataDirectiveMode.REPLACE`, default will copy the content_type of old file :param dict,None file_info: file_info for the new file if metadata_directive is set to :py:attr:`b2sdk.v1.MetadataDirectiveMode.REPLACE`, default will copy the file_info of old file """ return self.api.session.copy_file( file_id, new_file_name, bytes_range, metadata_directive, content_type, file_info, self.id_, )
[docs] def delete_file_version(self, file_id, file_name): """ Delete a file version. :param str file_id: a file ID :param str file_name: a file name """ # filename argument is not first, because one day it may become optional return self.api.delete_file_version(file_id, file_name)
[docs] @disable_trace def as_dict(self): # TODO: refactor with other as_dict() """ Return bucket representation as a dictionary. :rtype: dict """ result = { 'accountId': self.api.account_info.get_account_id(), 'bucketId': self.id_, } if self.name is not None: result['bucketName'] = self.name if self.type_ is not None: result['bucketType'] = self.type_ return result
def __repr__(self): return 'Bucket<%s,%s,%s>' % (self.id_, self.name, self.type_)
class BucketFactory(object): """ This is a factory for creating bucket objects from different kind of objects. """ BUCKET_CLASS = staticmethod(Bucket) @classmethod def from_api_response(cls, api, response): """ Create a Bucket object from API response. :param b2sdk.v1.B2Api api: API object :param requests.Response response: response object :rtype: b2sdk.v1.Bucket """ return [cls.from_api_bucket_dict(api, bucket_dict) for bucket_dict in response['buckets']] @classmethod def from_api_bucket_dict(cls, api, bucket_dict): """ Turn a dictionary, like this: .. code-block:: python { "bucketType": "allPrivate", "bucketId": "a4ba6a39d8b6b5fd561f0010", "bucketName": "zsdfrtsazsdfafr", "accountId": "4aa9865d6f00", "bucketInfo": {}, "revision": 1 } into a Bucket object. :param b2sdk.v1.B2Api api: API lient :param dict bucket_dict: a dictionary with bucket properties :rtype: b2sdk.v1.Bucket """ bucket_name = bucket_dict['bucketName'] bucket_id = bucket_dict['bucketId'] type_ = bucket_dict['bucketType'] bucket_info = bucket_dict['bucketInfo'] cors_rules = bucket_dict['corsRules'] lifecycle_rules = bucket_dict['lifecycleRules'] revision = bucket_dict['revision'] if type_ is None: raise UnrecognizedBucketType(bucket_dict['bucketType']) return cls.BUCKET_CLASS( api, bucket_id, bucket_name, type_, bucket_info, cors_rules, lifecycle_rules, revision, bucket_dict )