######################################################################
#
# File: b2sdk/_internal/file_version.py
#
# Copyright 2019 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations
import datetime as dt
import re
from copy import deepcopy
from typing import TYPE_CHECKING, Any
from .encryption.setting import EncryptionSetting, EncryptionSettingFactory
from .file_lock import NO_RETENTION_FILE_SETTING, FileRetentionSetting, LegalHold
from .http_constants import FILE_INFO_HEADER_PREFIX_LOWER, LARGE_FILE_SHA1, SRC_LAST_MODIFIED_MILLIS
from .progress import AbstractProgressListener
from .replication.types import ReplicationStatus
from .utils import Sha1HexDigest, b2_url_decode
from .utils.http_date import parse_http_date
from .utils.range_ import Range
if TYPE_CHECKING:
from .api import B2Api
from .transfer.inbound.downloaded_file import DownloadedFile
UNVERIFIED_CHECKSUM_PREFIX = 'unverified:'
class BaseFileVersion:
"""
Base class for representing file metadata in B2 cloud.
:ivar size - size of the whole file (for "upload" markers)
"""
__slots__ = [
'id_',
'api',
'file_name',
'size',
'content_type',
'content_sha1',
'content_sha1_verified',
'file_info',
'upload_timestamp',
'server_side_encryption',
'legal_hold',
'file_retention',
'mod_time_millis',
'replication_status',
]
_TYPE_MATCHER = re.compile('[a-z0-9]+_[a-z0-9]+_f([0-9]).*')
_FILE_TYPE = {
1: 'small',
2: 'large',
3: 'part',
4: 'tiny',
}
def __init__(
self,
api: B2Api,
id_: str,
file_name: str,
size: int,
content_type: str | None,
content_sha1: str | None,
file_info: dict[str, str] | None,
upload_timestamp: int,
server_side_encryption: EncryptionSetting,
file_retention: FileRetentionSetting = NO_RETENTION_FILE_SETTING,
legal_hold: LegalHold = LegalHold.UNSET,
replication_status: ReplicationStatus | None = None,
):
self.api = api
self.id_ = id_
self.file_name = file_name
self.size = size
self.content_type = content_type
self.content_sha1, self.content_sha1_verified = self._decode_content_sha1(content_sha1)
self.file_info = file_info or {}
self.upload_timestamp = upload_timestamp
self.server_side_encryption = server_side_encryption
self.file_retention = file_retention
self.legal_hold = legal_hold
self.replication_status = replication_status
if SRC_LAST_MODIFIED_MILLIS in self.file_info:
self.mod_time_millis = int(self.file_info[SRC_LAST_MODIFIED_MILLIS])
else:
self.mod_time_millis = self.upload_timestamp
@classmethod
def _decode_content_sha1(cls, content_sha1):
if content_sha1.startswith(UNVERIFIED_CHECKSUM_PREFIX):
return content_sha1[len(UNVERIFIED_CHECKSUM_PREFIX):], False
return content_sha1, True
@classmethod
def _encode_content_sha1(cls, content_sha1, content_sha1_verified):
if not content_sha1_verified:
return f'{UNVERIFIED_CHECKSUM_PREFIX}{content_sha1}'
return content_sha1
def _clone(self, **new_attributes: Any):
"""
Create new instance based on the old one, overriding attributes with :code:`new_attributes`
(only applies to arguments passed to __init__)
"""
args = self._get_args_for_clone()
return self.__class__(**{**args, **new_attributes})
def _get_args_for_clone(self):
return {
'api': self.api,
'id_': self.id_,
'file_name': self.file_name,
'size': self.size,
'content_type': self.content_type,
'content_sha1': self._encode_content_sha1(self.content_sha1, self.content_sha1_verified),
'file_info': self.file_info,
'upload_timestamp': self.upload_timestamp,
'server_side_encryption': self.server_side_encryption,
'file_retention': self.file_retention,
'legal_hold': self.legal_hold,
'replication_status': self.replication_status,
} # yapf: disable
def as_dict(self):
""" represents the object as a dict which looks almost exactly like the raw api output for upload/list """
result = {
'fileId': self.id_,
'fileName': self.file_name,
'fileInfo': self.file_info,
'serverSideEncryption': self.server_side_encryption.as_dict(),
'legalHold': self.legal_hold.value,
'fileRetention': self.file_retention.as_dict(),
}
if self.size is not None:
result['size'] = self.size
if self.upload_timestamp is not None:
result['uploadTimestamp'] = self.upload_timestamp
if self.content_type is not None:
result['contentType'] = self.content_type
if self.content_sha1 is not None:
result['contentSha1'] = self._encode_content_sha1(
self.content_sha1, self.content_sha1_verified
)
result['replicationStatus'] = self.replication_status and self.replication_status.value
return result
def __eq__(self, other):
sentry = object()
for attr in self._all_slots():
if getattr(self, attr) != getattr(other, attr, sentry):
return False
return True
def __repr__(self):
return '{}({})'.format(
self.__class__.__name__,
', '.join(repr(getattr(self, attr)) for attr in self._all_slots())
)
def _all_slots(self):
"""Return all slots for an object (for it's class and all parent classes). Useful in auxiliary methods."""
all_slots = []
for klass in self.__class__.__mro__[-1::-1]:
all_slots.extend(getattr(klass, '__slots__', []))
return all_slots
def delete(self, bypass_governance: bool = False) -> FileIdAndName:
"""Delete this file version. bypass_governance must be set to true if deleting a file version protected by
Object Lock governance mode retention settings (unless its retention period expired)"""
return self.api.delete_file_version(self.id_, self.file_name, bypass_governance)
def update_legal_hold(self, legal_hold: LegalHold) -> BaseFileVersion:
legal_hold = self.api.update_file_legal_hold(self.id_, self.file_name, legal_hold)
return self._clone(legal_hold=legal_hold)
def update_retention(
self,
file_retention: FileRetentionSetting,
bypass_governance: bool = False,
) -> BaseFileVersion:
file_retention = self.api.update_file_retention(
self.id_, self.file_name, file_retention, bypass_governance
)
return self._clone(file_retention=file_retention)
def _type(self):
"""
FOR TEST PURPOSES ONLY
not guaranteed to work for perpetuity (using undocumented server behavior)
"""
m = self._TYPE_MATCHER.match(self.id_)
assert m, self.id_
return self._FILE_TYPE[int(m.group(1))]
def get_content_sha1(self) -> Sha1HexDigest | None:
"""
Get the file's content SHA1 hex digest from the header or, if its absent,
from the file info. If both are missing, return None.
"""
if self.content_sha1 and self.content_sha1 != "none":
return self.content_sha1
elif LARGE_FILE_SHA1 in self.file_info:
return Sha1HexDigest(self.file_info[LARGE_FILE_SHA1])
# content SHA1 unknown
return None
class FileVersion(BaseFileVersion):
"""
A structure which represents a version of a file (in B2 cloud).
:ivar str ~.id_: ``fileId``
:ivar str ~.file_name: full file name (with path)
:ivar ~.size: size in bytes, can be ``None`` (unknown)
:ivar str ~.content_type: RFC 822 content type, for example ``"application/octet-stream"``
:ivar ~.upload_timestamp: in milliseconds since :abbr:`epoch (1970-01-01 00:00:00)`. Can be ``None`` (unknown).
:ivar str ~.action: ``"upload"``, ``"hide"`` or ``"delete"``
"""
__slots__ = [
'account_id',
'bucket_id',
'content_md5',
'action',
]
# defined at https://www.backblaze.com/b2/docs/files.html#httpHeaderSizeLimit
DEFAULT_HEADERS_LIMIT = 7000
ADVANCED_HEADERS_LIMIT = 2048
def __init__(
self,
api: B2Api,
id_: str,
file_name: str,
size: int | None | str,
content_type: str | None,
content_sha1: str | None,
file_info: dict[str, str],
upload_timestamp: int,
account_id: str,
bucket_id: str,
action: str,
content_md5: str | None,
server_side_encryption: EncryptionSetting,
file_retention: FileRetentionSetting = NO_RETENTION_FILE_SETTING,
legal_hold: LegalHold = LegalHold.UNSET,
replication_status: ReplicationStatus | None = None,
):
self.account_id = account_id
self.bucket_id = bucket_id
self.content_md5 = content_md5
self.action = action
super().__init__(
api=api,
id_=id_,
file_name=file_name,
size=size,
content_type=content_type,
content_sha1=content_sha1,
file_info=file_info,
upload_timestamp=upload_timestamp,
server_side_encryption=server_side_encryption,
file_retention=file_retention,
legal_hold=legal_hold,
replication_status=replication_status,
)
@property
def cache_control(self) -> str | None:
return self.file_info.get('b2-cache-control')
@property
def expires(self) -> str | None:
return self.file_info.get('b2-expires')
[docs] def expires_parsed(self) -> dt.datetime | None:
"""Return the expiration date as a datetime object, or None if there is no expiration date.
Raise ValueError if `expires` property is not a valid HTTP-date."""
if self.expires is None:
return None
return parse_http_date(self.expires)
@property
def content_disposition(self) -> str | None:
return self.file_info.get('b2-content-disposition')
@property
def content_encoding(self) -> str | None:
return self.file_info.get('b2-content-encoding')
@property
def content_language(self) -> str | None:
return self.file_info.get('b2-content-language')
def _get_args_for_clone(self):
args = super()._get_args_for_clone()
args.update(
{
'account_id': self.account_id,
'bucket_id': self.bucket_id,
'action': self.action,
'content_md5': self.content_md5,
}
)
return args
def as_dict(self):
result = super().as_dict()
result['accountId'] = self.account_id
result['bucketId'] = self.bucket_id
if self.action is not None:
result['action'] = self.action
if self.content_md5 is not None:
result['contentMd5'] = self.content_md5
return result
[docs] def get_fresh_state(self) -> FileVersion:
"""
Fetch all the information about this file version and return a new FileVersion object.
This method does NOT change the object it is called on.
"""
return self.api.get_file_info(self.id_)
[docs] def download(
self,
progress_listener: AbstractProgressListener | None = None,
range_: tuple[int, int] | None = None,
encryption: EncryptionSetting | None = None,
) -> DownloadedFile:
return self.api.download_file_by_id(
self.id_,
progress_listener=progress_listener,
range_=range_,
encryption=encryption,
)
def _get_upload_headers(self) -> bytes:
"""
Return encoded http headers, as when sending an upload request to b2 http api.
WARNING: the headers do not contain newlines between headers and spaces between
key and value. This implementation is in par with ADVANCED_HEADERS_LIMIT
and is reasonable only for `has_large_header` method
"""
# sometimes secret is not available, but we want to calculate headers
# size anyway; to bypass this, we use a fake encryption setting
# with a fake key
sse = self.server_side_encryption
if sse and sse.key and sse.key.secret is None:
sse = deepcopy(sse)
sse.key.secret = b'*' * sse.algorithm.get_length()
headers = self.api.raw_api.get_upload_file_headers(
upload_auth_token=self.api.account_info.get_account_auth_token(),
file_name=self.file_name,
content_length=self.size,
content_type=self.content_type,
content_sha1=self.content_sha1,
file_info=self.file_info,
server_side_encryption=sse,
file_retention=self.file_retention,
legal_hold=self.legal_hold,
)
headers_str = ''.join(
f'{key}{value}' for key, value in headers.items() if value is not None
)
return headers_str.encode('utf8')
@property
def has_large_header(self) -> bool:
"""
Determine whether FileVersion's info fits header size limit defined by B2.
This function makes sense only for "advanced" buckets, i.e. those which
have Server-Side Encryption or File Lock enabled.
See https://www.backblaze.com/b2/docs/files.html#httpHeaderSizeLimit.
"""
return len(self._get_upload_headers()) > self.ADVANCED_HEADERS_LIMIT
[docs]class DownloadVersion(BaseFileVersion):
"""
A structure which represents metadata of an initialized download
"""
__slots__ = [
'range_',
'content_disposition',
'content_length',
'content_language',
'expires',
'cache_control',
'content_encoding',
]
def __init__(
self,
api: B2Api,
id_: str,
file_name: str,
size: int,
content_type: str | None,
content_sha1: str | None,
file_info: dict[str, str],
upload_timestamp: int,
server_side_encryption: EncryptionSetting,
range_: Range,
content_disposition: str | None,
content_length: int,
content_language: str | None,
expires,
cache_control,
content_encoding: str | None,
file_retention: FileRetentionSetting = NO_RETENTION_FILE_SETTING,
legal_hold: LegalHold = LegalHold.UNSET,
replication_status: ReplicationStatus | None = None,
):
self.range_ = range_
self.content_disposition = content_disposition
self.content_length = content_length
self.content_language = content_language
self.expires = expires
self.cache_control = cache_control
self.content_encoding = content_encoding
super().__init__(
api=api,
id_=id_,
file_name=file_name,
size=size,
content_type=content_type,
content_sha1=content_sha1,
file_info=file_info,
upload_timestamp=upload_timestamp,
server_side_encryption=server_side_encryption,
file_retention=file_retention,
legal_hold=legal_hold,
replication_status=replication_status,
)
[docs] def expires_parsed(self) -> dt.datetime | None:
"""Return the expiration date as a datetime object, or None if there is no expiration date.
Raise ValueError if `expires` property is not a valid HTTP-date."""
if self.expires is None:
return None
return parse_http_date(self.expires)
[docs] def as_dict(self) -> dict:
result = super().as_dict()
if self.cache_control is not None:
result['cacheControl'] = self.cache_control
if self.expires is not None:
result['expires'] = self.expires
if self.content_disposition is not None:
result['contentDisposition'] = self.content_disposition
if self.content_encoding is not None:
result['contentEncoding'] = self.content_encoding
if self.content_language is not None:
result['contentLanguage'] = self.content_language
return result
def _get_args_for_clone(self):
args = super()._get_args_for_clone()
args.update(
{
'range_': self.range_,
'content_disposition': self.content_disposition,
'content_length': self.content_length,
'content_language': self.content_language,
'expires': self.expires,
'cache_control': self.cache_control,
'content_encoding': self.content_encoding,
}
)
return args
class FileVersionFactory:
"""
Construct :py:class:`b2sdk.v2.FileVersion` objects from api responses.
"""
FILE_VERSION_CLASS = FileVersion
def __init__(self, api: B2Api):
self.api = api
def from_api_response(self, file_version_dict, force_action=None):
"""
Turn this:
.. code-block:: python
{
"action": "hide",
"fileId": "4_zBucketName_f103b7ca31313c69c_d20151230_m030117_c001_v0001015_t0000",
"fileName": "randomdata",
"size": 0,
"uploadTimestamp": 1451444477000,
"replicationStatus": "pending"
}
or this:
.. code-block:: python
{
"accountId": "4aa9865d6f00",
"bucketId": "547a2a395826655d561f0010",
"contentLength": 1350,
"contentSha1": "753ca1c2d0f3e8748320b38f5da057767029a036",
"contentType": "application/octet-stream",
"fileId": "4_z547a2a395826655d561f0010_f106d4ca95f8b5b78_d20160104_m003906_c001_v0001013_t0005",
"fileInfo": {},
"fileName": "randomdata",
"serverSideEncryption": {"algorithm": "AES256", "mode": "SSE-B2"},
"replicationStatus": "completed"
}
into a :py:class:`b2sdk.v2.FileVersion` object.
"""
assert file_version_dict.get('action') is None or force_action is None, \
'action was provided by both info_dict and function argument'
action = file_version_dict.get('action') or force_action
file_name = file_version_dict['fileName']
id_ = file_version_dict['fileId']
if 'size' in file_version_dict:
size = file_version_dict['size']
elif 'contentLength' in file_version_dict:
size = file_version_dict['contentLength']
else:
raise ValueError('no size or contentLength')
upload_timestamp = file_version_dict.get('uploadTimestamp')
content_type = file_version_dict.get('contentType')
content_sha1 = file_version_dict.get('contentSha1')
content_md5 = file_version_dict.get('contentMd5')
file_info = file_version_dict.get('fileInfo')
server_side_encryption = EncryptionSettingFactory.from_file_version_dict(file_version_dict)
file_retention = FileRetentionSetting.from_file_version_dict(file_version_dict)
legal_hold = LegalHold.from_file_version_dict(file_version_dict)
replication_status_value = file_version_dict.get('replicationStatus')
replication_status = replication_status_value and ReplicationStatus[
replication_status_value.upper()]
return self.FILE_VERSION_CLASS(
self.api,
id_,
file_name,
size,
content_type,
content_sha1,
file_info,
upload_timestamp,
file_version_dict['accountId'],
file_version_dict['bucketId'],
action,
content_md5,
server_side_encryption,
file_retention,
legal_hold,
replication_status,
)
class DownloadVersionFactory:
"""
Construct :py:class:`b2sdk.v2.DownloadVersion` objects from download headers.
"""
def __init__(self, api: B2Api):
self.api = api
@classmethod
def range_and_size_from_header(cls, header):
raw_range, raw_size = header.split('/')
range_ = Range.from_header(raw_range)
size = int(raw_size)
return range_, size
@classmethod
def file_info_from_headers(cls, headers: dict) -> dict:
file_info = {}
prefix_len = len(FILE_INFO_HEADER_PREFIX_LOWER)
for header_name, header_value in headers.items():
if header_name[:prefix_len].lower() == FILE_INFO_HEADER_PREFIX_LOWER:
file_info_key = header_name[prefix_len:]
file_info[file_info_key] = b2_url_decode(header_value)
return file_info
def from_response_headers(self, headers):
file_info = self.file_info_from_headers(headers)
if 'Content-Range' in headers:
range_, size = self.range_and_size_from_header(headers['Content-Range'])
content_length = int(headers['Content-Length'])
else:
size = content_length = int(headers['Content-Length'])
range_ = Range(0, max(size - 1, 0))
return DownloadVersion(
api=self.api,
id_=headers['x-bz-file-id'],
file_name=b2_url_decode(headers['x-bz-file-name']),
size=size,
content_type=headers['content-type'],
content_sha1=headers['x-bz-content-sha1'],
file_info=file_info,
upload_timestamp=int(headers['x-bz-upload-timestamp']),
server_side_encryption=EncryptionSettingFactory.from_response_headers(headers),
range_=range_,
content_disposition=headers.get('Content-Disposition'),
content_length=content_length,
content_language=headers.get('Content-Language'),
expires=headers.get('Expires'),
cache_control=headers.get('Cache-Control'),
content_encoding=headers.get('Content-Encoding'),
file_retention=FileRetentionSetting.from_response_headers(headers),
legal_hold=LegalHold.from_response_headers(headers),
replication_status=ReplicationStatus.from_response_headers(headers),
)
[docs]class FileIdAndName:
"""
A structure which represents a B2 cloud file with just `file_name` and `fileId` attributes.
Used to return data from calls to b2_delete_file_version and b2_cancel_large_file.
"""
def __init__(self, file_id: str, file_name: str):
self.file_id = file_id
self.file_name = file_name
[docs] @classmethod
def from_cancel_or_delete_response(cls, response):
return cls(response['fileId'], response['fileName'])
[docs] def as_dict(self):
""" represents the object as a dict which looks almost exactly like the raw api output for delete_file_version """
return {'action': 'delete', 'fileId': self.file_id, 'fileName': self.file_name}
def __eq__(self, other):
return (self.file_id == other.file_id and self.file_name == other.file_name)
def __repr__(self):
return f'{self.__class__.__name__}({repr(self.file_id)}, {repr(self.file_name)})'