Source code for b2sdk.session

######################################################################
#
# File: b2sdk/session.py
#
# Copyright 2019 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################

import functools

from b2sdk.exception import (InvalidAuthToken, Unauthorized)
from b2sdk.raw_api import ALL_CAPABILITIES


[docs]class B2Session(object): """ A *magic* facade that supplies the correct api_url and account_auth_token to methods of underlying raw_api and reauthorizes if necessary """
[docs] def __init__(self, api, raw_api): self._api = api # for reauthorization self.raw_api = raw_api
def __getattr__(self, name): f = getattr(self.raw_api, name) @functools.wraps(f) def wrapper(*args, **kwargs): auth_failure_encountered = False # download_by_name uses different URLs url_factory = kwargs.pop('url_factory', self._api.account_info.get_api_url) while 1: api_url = url_factory() account_auth_token = self._api.account_info.get_account_auth_token() try: return f(api_url, account_auth_token, *args, **kwargs) except InvalidAuthToken: if not auth_failure_encountered: auth_failure_encountered = True reauthorization_success = self._api.authorize_automatically() if reauthorization_success: continue # TODO: exception chaining could be added here # to help debug reauthorization failures raise except Unauthorized as e: raise self._add_app_key_info_to_unauthorized(e) return wrapper def _add_app_key_info_to_unauthorized(self, unauthorized): """ Takes an Unauthorized error and adds information from the application key about why it might have failed. """ # What's allowed? allowed = self._api.account_info.get_allowed() capabilities = allowed['capabilities'] bucket_name = allowed['bucketName'] name_prefix = allowed['namePrefix'] # Make a list of messages about the application key restrictions key_messages = [] if set(capabilities) != set(ALL_CAPABILITIES): key_messages.append("with capabilities '" + ','.join(capabilities) + "'") if bucket_name is not None: key_messages.append("restricted to bucket '" + bucket_name + "'") if name_prefix is not None: key_messages.append("restricted to files that start with '" + name_prefix + "'") if not key_messages: key_messages.append('with no restrictions') # Make a new message new_message = unauthorized.message if new_message == '': new_message = 'unauthorized' new_message += ' for application key ' + ', '.join(key_messages) return Unauthorized(new_message, unauthorized.code)