import hashlib
import io
import os
import re
import time
from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache, wraps
from logging import getLogger as get_logger
from typing import IO, Any, AnyStr, BinaryIO, Callable, Dict, Iterator, List, Optional, Tuple, Union
import boto3
import botocore
from botocore.awsrequest import AWSResponse
from megfile.errors import S3BucketNotFoundError, S3ConfigError, S3FileExistsError, S3FileNotFoundError, S3IsADirectoryError, S3NameTooLongError, S3NotADirectoryError, S3NotALinkError, S3PermissionError, S3UnknownError, SameFileError, UnsupportedError, _create_missing_ok_generator
from megfile.errors import _logger as error_logger
from megfile.errors import patch_method, raise_s3_error, s3_error_code_should_retry, s3_should_retry, translate_fs_error, translate_s3_error
from megfile.interfaces import Access, ContextIterator, FileCacher, FileEntry, PathLike, StatResult, URIPath
from megfile.lib.compare import is_same_file
from megfile.lib.compat import fspath
from megfile.lib.fnmatch import translate
from megfile.lib.glob import has_magic, has_magic_ignore_brace, ungloblize
from megfile.lib.joinpath import uri_join
from megfile.lib.s3_buffered_writer import DEFAULT_MAX_BUFFER_SIZE, GLOBAL_MAX_WORKERS, S3BufferedWriter
from megfile.lib.s3_cached_handler import S3CachedHandler
from megfile.lib.s3_limited_seekable_writer import S3LimitedSeekableWriter
from megfile.lib.s3_memory_handler import S3MemoryHandler
from megfile.lib.s3_pipe_handler import S3PipeHandler
from megfile.lib.s3_prefetch_reader import DEFAULT_BLOCK_SIZE, S3PrefetchReader
from megfile.lib.s3_share_cache_reader import S3ShareCacheReader
from megfile.lib.url import get_url_scheme
from megfile.smart_path import SmartPath
from megfile.utils import cachedproperty, calculate_md5, generate_cache_path, get_binary_mode, get_content_offset, is_readable, necessary_params, thread_local
__all__ = [
'S3Path',
'parse_s3_url',
'get_endpoint_url',
'get_s3_session',
'get_s3_client',
's3_path_join',
'is_s3',
's3_buffered_open',
's3_cached_open',
's3_memory_open',
's3_pipe_open',
's3_prefetch_open',
's3_share_cache_open',
's3_open',
'S3Cacher',
'S3BufferedWriter',
'S3LimitedSeekableWriter',
'S3PrefetchReader',
'S3ShareCacheReader',
's3_upload',
's3_download',
's3_load_content',
's3_readlink',
's3_glob',
's3_glob_stat',
's3_iglob',
's3_rename',
's3_makedirs',
's3_concat',
]
_logger = get_logger(__name__)
content_md5_header = 'megfile-content-md5'
endpoint_url = 'https://s3.amazonaws.com'
max_pool_connections = 32
max_retries = 10
max_keys = 1000
def _patch_make_request(client: botocore.client.BaseClient):
def after_callback(result: Tuple[AWSResponse, dict], *args, **kwargs):
if not isinstance(result, tuple) or len(result) != 2 \
or not isinstance(result[0], AWSResponse) or not isinstance(result[1], dict):
return result
http, parsed_response = result
if http.status_code >= 500:
error_code = parsed_response.get("Error", {}).get("Code")
operation_model = kwargs.get('operation_model') or (
args[0] if args else None)
operation_name = operation_model.name if operation_model else 'ProxyMethod'
error_class = client.exceptions.from_code(error_code)
raise error_class(parsed_response, operation_name)
return result
def retry_callback(error, operation_model, request_dict, request_context):
if is_readable(request_dict['body']):
request_dict['body'].seek(0)
def before_callback(operation_model, request_dict, request_context):
_logger.debug(
'send s3 request: %r, with parameters: %s', operation_model.name,
request_dict)
client._make_request = patch_method(
client._make_request,
max_retries=max_retries,
should_retry=s3_should_retry,
after_callback=after_callback,
before_callback=before_callback,
retry_callback=retry_callback)
return client
[docs]def parse_s3_url(s3_url: PathLike) -> Tuple[str, str]:
s3_url = fspath(s3_url)
if not is_s3(s3_url):
raise ValueError('Not a s3 url: %r' % s3_url)
rightpart = s3_url.split('://', maxsplit=1)[1]
bucketmatch = re.match('(.*?)/', rightpart)
if bucketmatch is None:
bucket = rightpart
path = ''
else:
bucket = bucketmatch.group(1)
path = rightpart[len(bucket) + 1:]
return bucket, path
def get_scoped_config(profile_name: Optional[str] = None) -> Dict:
return get_s3_session(
profile_name=profile_name)._session.get_scoped_config()
@lru_cache()
def warning_endpoint_url(key: str, endpoint_url: str):
_logger.info("using %s: %s" % (key, endpoint_url))
[docs]def get_endpoint_url(profile_name: Optional[str] = None) -> str:
'''Get the endpoint url of S3
:returns: S3 endpoint url
'''
environ_key = f'{profile_name}__OSS_ENDPOINT'.upper(
) if profile_name else 'OSS_ENDPOINT'
environ_endpoint_url = os.environ.get(environ_key)
if environ_endpoint_url:
warning_endpoint_url(environ_key, environ_endpoint_url)
return environ_endpoint_url
try:
config_endpoint_url = get_scoped_config(profile_name=profile_name).get(
's3', {}).get('endpoint_url')
if config_endpoint_url:
warning_endpoint_url('~/.aws/config', config_endpoint_url)
return config_endpoint_url
except botocore.exceptions.ProfileNotFound:
pass
return endpoint_url
[docs]def get_s3_session(profile_name=None) -> boto3.Session:
'''Get S3 session
:returns: S3 session
'''
return thread_local(
f's3_session:{profile_name}', boto3.Session, profile_name=profile_name)
def get_access_token(profile_name=None):
access_key_env_name = f"{profile_name}__AWS_ACCESS_KEY_ID".upper(
) if profile_name else "AWS_ACCESS_KEY_ID"
secret_key_env_name = f"{profile_name}__AWS_SECRET_ACCESS_KEY".upper(
) if profile_name else "AWS_SECRET_ACCESS_KEY"
access_key = os.getenv(access_key_env_name)
secret_key = os.getenv(secret_key_env_name)
if access_key and secret_key:
return access_key, secret_key
try:
credentials = get_s3_session(
profile_name=profile_name).get_credentials()
except botocore.exceptions.ProfileNotFound:
credentials = None
if credentials:
if not access_key:
access_key = credentials.access_key
if not secret_key:
secret_key = credentials.secret_key
return access_key, secret_key
[docs]def get_s3_client(
config: Optional[botocore.config.Config] = None,
cache_key: Optional[str] = None,
profile_name: Optional[str] = None):
'''Get S3 client
:returns: S3 client
'''
if cache_key is not None:
return thread_local(
f"{cache_key}:{profile_name}",
get_s3_client,
config=config,
profile_name=profile_name)
if config:
config = config.merge(botocore.config.Config(connect_timeout=5))
else:
config = botocore.config.Config(connect_timeout=5)
addressing_style_env_key = 'AWS_S3_ADDRESSING_STYLE'
if profile_name:
addressing_style_env_key = f'{profile_name}__AWS_S3_ADDRESSING_STYLE'.upper(
)
addressing_style = os.environ.get(addressing_style_env_key)
if addressing_style:
config = config.merge(
botocore.config.Config(s3={'addressing_style': addressing_style}))
access_key, secret_key = get_access_token(profile_name)
try:
session = get_s3_session(profile_name=profile_name)
except botocore.exceptions.ProfileNotFound:
session = get_s3_session()
client = session.client(
's3',
endpoint_url=get_endpoint_url(profile_name=profile_name),
config=config,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
)
client = _patch_make_request(client)
return client
[docs]def s3_path_join(path: PathLike, *other_paths: PathLike) -> str:
'''
Concat 2 or more path to a complete path
:param path: Given path
:param other_paths: Paths to be concatenated
:returns: Concatenated complete path
.. note ::
The difference between this function and ``os.path.join`` is that this function ignores left side slash (which indicates absolute path) in ``other_paths`` and will directly concat.
e.g. os.path.join('/path', 'to', '/file') => '/file', but s3_path_join('/path', 'to', '/file') => '/path/to/file'
'''
return uri_join(fspath(path), *map(fspath, other_paths))
def _list_all_buckets(profile_name: Optional[str] = None) -> List[str]:
client = get_s3_client(profile_name=profile_name)
response = client.list_buckets()
return [content['Name'] for content in response['Buckets']]
def _parse_s3_url_ignore_brace(s3_url: str) -> Tuple[str, str]:
s3_url = fspath(s3_url)
s3_scheme, rightpart = s3_url[:5], s3_url[5:]
if s3_scheme != 's3://':
raise ValueError('Not a s3 url: %r' % s3_url)
left_brace = False
for current_index, current_character in enumerate(rightpart):
if current_character == "/" and left_brace is False:
return rightpart[:current_index], rightpart[current_index + 1:]
elif current_character == "{":
left_brace = True
elif current_character == "}":
left_brace = False
return rightpart, ""
def _group_s3path_by_bucket(
s3_pathname: str, profile_name: Optional[str] = None) -> List[str]:
bucket, key = _parse_s3_url_ignore_brace(s3_pathname)
if not bucket:
if not key:
raise UnsupportedError('Glob whole s3', s3_pathname)
raise S3BucketNotFoundError('Empty bucket name: %r' % s3_pathname)
grouped_path = []
def generate_s3_path(bucket: str, key: str) -> str:
if key:
return "s3://%s/%s" % (bucket, key)
return "s3://%s%s" % (bucket, "/" if s3_pathname.endswith("/") else "")
all_bucket = lru_cache(maxsize=1)(_list_all_buckets)
for bucketname in ungloblize(bucket):
if has_magic(bucketname):
split_bucketname = bucketname.split("/", 1)
path_part = None
if len(split_bucketname) == 2:
bucketname, path_part = split_bucketname
pattern = re.compile(translate(re.sub(r'\*{2,}', '*', bucketname)))
for bucket in all_bucket(profile_name):
if pattern.fullmatch(bucket) is not None:
if path_part is not None:
bucket = "%s/%s" % (bucket, path_part)
grouped_path.append(generate_s3_path(bucket, key))
else:
grouped_path.append(generate_s3_path(bucketname, key))
return grouped_path
def _s3_split_magic_ignore_brace(s3_pathname: str) -> Tuple[str, str]:
if not s3_pathname:
raise ValueError("s3_pathname: %s", s3_pathname)
has_protocol = False
if s3_pathname.startswith("s3://"):
has_protocol = True
s3_pathname = s3_pathname[5:]
has_delimiter = False
if s3_pathname.endswith("/"):
has_delimiter = True
s3_pathname = s3_pathname[:-1]
normal_parts = []
magic_parts = []
left_brace = False
left_index = 0
for current_index, current_character in enumerate(s3_pathname):
if current_character == "/" and left_brace is False:
if has_magic_ignore_brace(s3_pathname[left_index:current_index]):
magic_parts.append(s3_pathname[left_index:current_index])
if s3_pathname[current_index + 1:]:
magic_parts.append(s3_pathname[current_index + 1:])
left_index = len(s3_pathname)
break
normal_parts.append(s3_pathname[left_index:current_index])
left_index = current_index + 1
elif current_character == "{":
left_brace = True
elif current_character == "}":
left_brace = False
if s3_pathname[left_index:]:
if has_magic_ignore_brace(s3_pathname[left_index:]):
magic_parts.append(s3_pathname[left_index:])
else:
normal_parts.append(s3_pathname[left_index:])
if has_protocol and normal_parts:
normal_parts.insert(0, "s3:/")
elif has_protocol:
magic_parts.insert(0, "s3:/")
if has_delimiter and magic_parts:
magic_parts.append("")
elif has_delimiter:
normal_parts.append("")
return "/".join(normal_parts), "/".join(magic_parts)
def _group_s3path_by_prefix(s3_pathname: str) -> List[str]:
_, key = parse_s3_url(s3_pathname)
if not key:
return ungloblize(s3_pathname)
top_dir, magic_part = _s3_split_magic_ignore_brace(s3_pathname)
if not top_dir:
return [magic_part]
grouped_path = []
for pathname in ungloblize(top_dir):
if magic_part:
pathname = "/".join([pathname, magic_part])
grouped_path.append(pathname)
return grouped_path
def _become_prefix(prefix: str) -> str:
if prefix != '' and not prefix.endswith('/'):
prefix += '/'
return prefix
def _s3_split_magic(s3_pathname: str) -> Tuple[str, str]:
if not has_magic(s3_pathname):
return s3_pathname, ''
delimiter = '/'
normal_parts = []
magic_parts = []
all_parts = s3_pathname.split(delimiter)
for i, part in enumerate(all_parts):
if not has_magic(part):
normal_parts.append(part)
else:
magic_parts = all_parts[i:]
break
return delimiter.join(normal_parts), delimiter.join(magic_parts)
def _list_objects_recursive(
s3_client, bucket: str, prefix: str, delimiter: str = ''):
resp = s3_client.list_objects_v2(
Bucket=bucket, Prefix=prefix, Delimiter=delimiter, MaxKeys=max_keys)
while True:
yield resp
if not resp['IsTruncated']:
break
resp = s3_client.list_objects_v2(
Bucket=bucket,
Prefix=prefix,
Delimiter=delimiter,
ContinuationToken=resp['NextContinuationToken'],
MaxKeys=max_keys)
def _make_stat(content: Dict[str, Any]):
return StatResult(
islnk=content.get('islnk', False),
size=content['Size'],
mtime=content['LastModified'].timestamp(),
extra=content,
)
def _s3_glob_stat_single_path(
s3_pathname: PathLike,
recursive: bool = True,
missing_ok: bool = True,
followlinks: bool = False,
profile_name: Optional[str] = None) -> Iterator[FileEntry]:
if not recursive:
# If not recursive, replace ** with *
s3_pathname = re.sub(r'\*{2,}', '*', s3_pathname)
top_dir, wildcard_part = _s3_split_magic(s3_pathname)
search_dir = wildcard_part.endswith('/')
def should_recursive(wildcard_part: str) -> bool:
if '**' in wildcard_part:
return True
for expanded_path in ungloblize(wildcard_part):
parts_length = len(expanded_path.split('/'))
if parts_length + search_dir >= 2:
return True
return False
def create_generator(_s3_pathname) -> Iterator[FileEntry]:
if not S3Path(top_dir).exists():
return
if not has_magic(_s3_pathname):
_s3_pathname_obj = S3Path(_s3_pathname)
if _s3_pathname_obj.is_file():
stat = S3Path(_s3_pathname).stat(follow_symlinks=followlinks)
yield FileEntry(
_s3_pathname_obj.name, _s3_pathname_obj.path, stat)
if _s3_pathname_obj.is_dir():
yield FileEntry(
_s3_pathname_obj.name, _s3_pathname_obj.path,
StatResult(isdir=True))
return
delimiter = ''
if not should_recursive(wildcard_part):
delimiter = '/'
dirnames = set()
pattern = re.compile(translate(_s3_pathname))
bucket, key = parse_s3_url(top_dir)
prefix = _become_prefix(key)
client = get_s3_client(profile_name=profile_name)
with raise_s3_error(_s3_pathname):
for resp in _list_objects_recursive(client, bucket, prefix,
delimiter):
for content in resp.get('Contents', []):
path = s3_path_join('s3://', bucket, content['Key'])
if not search_dir and pattern.match(path):
yield FileEntry(
S3Path(path).name, path, _make_stat(content))
dirname = os.path.dirname(path)
while dirname not in dirnames and dirname != top_dir:
dirnames.add(dirname)
path = dirname + '/' if search_dir else dirname
if pattern.match(path):
yield FileEntry(
S3Path(path).name, path, StatResult(isdir=True))
dirname = os.path.dirname(dirname)
for common_prefix in resp.get('CommonPrefixes', []):
path = s3_path_join(
's3://', bucket, common_prefix['Prefix'])
dirname = os.path.dirname(path)
if dirname not in dirnames and dirname != top_dir:
dirnames.add(dirname)
path = dirname + '/' if search_dir else dirname
if pattern.match(path):
yield FileEntry(
S3Path(path).name, path, StatResult(isdir=True))
return create_generator(s3_pathname)
def _s3_scan_pairs(src_url: PathLike,
dst_url: PathLike) -> Iterator[Tuple[PathLike, PathLike]]:
for src_file_path in S3Path(src_url).scan():
content_path = src_file_path[len(src_url):]
if len(content_path) > 0:
dst_file_path = s3_path_join(dst_url, content_path)
else:
dst_file_path = dst_url
yield src_file_path, dst_file_path
[docs]def is_s3(path: PathLike) -> bool:
'''
1. According to `aws-cli <https://docs.aws.amazon.com/cli/latest/reference/s3/index.html>`_ , test if a path is s3 path.
2. megfile also support the path like `s3[+profile_name]://bucket/key`
:param path: Path to be tested
:returns: True if path is s3 path, else False
'''
path = fspath(path)
if re.match(r'^s3(\+\w+)?:\/\/', path):
return True
return False
def _s3_binary_mode(s3_open_func):
@wraps(s3_open_func)
def wrapper(
s3_url,
mode: str = 'rb',
encoding: Optional[str] = None,
errors: Optional[str] = None,
**kwargs):
bucket, key = parse_s3_url(s3_url)
if not bucket:
raise S3BucketNotFoundError('Empty bucket name: %r' % s3_url)
if not key or key.endswith('/'):
raise S3IsADirectoryError('Is a directory: %r' % s3_url)
if 'x' in mode:
if S3Path(s3_url).is_file():
raise S3FileExistsError('File exists: %r' % s3_url)
mode = mode.replace('x', 'w')
fileobj = s3_open_func(s3_url, get_binary_mode(mode), **kwargs)
if 'b' not in mode:
fileobj = io.TextIOWrapper(
fileobj, encoding=encoding, errors=errors) # pytype: disable=wrong-arg-types
fileobj.mode = mode
return fileobj
return wrapper
[docs]@_s3_binary_mode
def s3_prefetch_open(
s3_url: PathLike,
mode: str = 'rb',
followlinks: bool = False,
*,
max_concurrency: Optional[int] = None,
max_block_size: int = DEFAULT_BLOCK_SIZE) -> S3PrefetchReader:
'''Open a asynchronous prefetch reader, to support fast sequential read and random read
.. note ::
User should make sure that reader / writer are closed correctly
Supports context manager
Some parameter setting may perform well: max_concurrency=10 or 20, max_block_size=8 or 16 MB, default value None means using global thread pool
:param max_concurrency: Max download thread number, None by default
:param max_block_size: Max data size downloaded by each thread, in bytes, 8MB by default
:returns: An opened S3PrefetchReader object
:raises: S3FileNotFoundError
'''
if mode != 'rb':
raise ValueError('unacceptable mode: %r' % mode)
if not isinstance(s3_url, S3Path):
s3_url = S3Path(s3_url)
if followlinks:
try:
s3_url = s3_url.readlink()
except S3NotALinkError:
pass
bucket, key = parse_s3_url(s3_url.path_with_protocol)
config = botocore.config.Config(max_pool_connections=max_pool_connections)
client = get_s3_client(
config=config,
cache_key='s3_filelike_client',
profile_name=s3_url._profile_name)
return S3PrefetchReader(
bucket,
key,
s3_client=client,
max_retries=max_retries,
max_workers=max_concurrency,
block_size=max_block_size,
profile_name=s3_url._profile_name)
[docs]@_s3_binary_mode
def s3_share_cache_open(
s3_url: PathLike,
mode: str = 'rb',
followlinks: bool = False,
*,
cache_key: str = 'lru',
max_concurrency: Optional[int] = None,
max_block_size: int = DEFAULT_BLOCK_SIZE) -> S3ShareCacheReader:
'''Open a asynchronous prefetch reader, to support fast sequential read and random read
.. note ::
User should make sure that reader / writer are closed correctly
Supports context manager
Some parameter setting may perform well: max_concurrency=10 or 20, max_block_size=8 or 16 MB, default value None means using global thread pool
:param max_concurrency: Max download thread number, None by default
:param max_block_size: Max data size downloaded by each thread, in bytes, 8MB by default
:returns: An opened S3ShareCacheReader object
:raises: S3FileNotFoundError
'''
if mode != 'rb':
raise ValueError('unacceptable mode: %r' % mode)
if not isinstance(s3_url, S3Path):
s3_url = S3Path(s3_url)
if followlinks:
try:
s3_url = s3_url.readlink()
except S3NotALinkError:
pass
bucket, key = parse_s3_url(s3_url.path_with_protocol)
config = botocore.config.Config(max_pool_connections=max_pool_connections)
client = get_s3_client(
config=config,
cache_key='s3_filelike_client',
profile_name=s3_url._profile_name)
return S3ShareCacheReader(
bucket,
key,
cache_key=cache_key,
s3_client=client,
max_retries=max_retries,
max_workers=max_concurrency,
block_size=max_block_size,
profile_name=s3_url._profile_name)
[docs]@_s3_binary_mode
def s3_pipe_open(
s3_url: PathLike,
mode: str,
followlinks: bool = False,
*,
join_thread: bool = True) -> S3PipeHandler:
'''Open a asynchronous read-write reader / writer, to support fast sequential read / write
.. note ::
User should make sure that reader / writer are closed correctly
Supports context manager
When join_thread is False, while the file handle are closing, this function will not wait until the asynchronous writing finishes;
False doesn't affect read-handle, but this can speed up write-handle because file will be written asynchronously.
But asynchronous behaviour can guarantee the file are successfully written, and frequent execution may cause thread and file handle exhaustion
:param mode: Mode to open file, either "rb" or "wb"
:param join_thread: If wait after function execution until s3 finishes writing
:returns: An opened BufferedReader / BufferedWriter object
'''
if mode not in ('rb', 'wb'):
raise ValueError('unacceptable mode: %r' % mode)
if mode[0] == 'r' and not S3Path(s3_url).is_file():
raise S3FileNotFoundError('No such file: %r' % s3_url)
if not isinstance(s3_url, S3Path):
s3_url = S3Path(s3_url)
if followlinks:
try:
s3_url = s3_url.readlink()
except S3NotALinkError:
pass
bucket, key = parse_s3_url(s3_url.path_with_protocol)
config = botocore.config.Config(max_pool_connections=max_pool_connections)
client = get_s3_client(
config=config,
cache_key='s3_filelike_client',
profile_name=s3_url._profile_name)
return S3PipeHandler(
bucket,
key,
mode,
s3_client=client,
join_thread=join_thread,
profile_name=s3_url._profile_name)
[docs]@_s3_binary_mode
def s3_cached_open(
s3_url: PathLike,
mode: str,
followlinks: bool = False,
*,
cache_path: Optional[str] = None) -> S3CachedHandler:
'''Open a local-cache file reader / writer, for frequent random read / write
.. note ::
User should make sure that reader / writer are closed correctly
Supports context manager
cache_path can specify the path of cache file. Performance could be better if cache file path is on ssd or tmpfs
:param mode: Mode to open file, could be one of "rb", "wb" or "ab"
:param cache_path: cache file path
:returns: An opened BufferedReader / BufferedWriter object
'''
if mode not in ('rb', 'wb', 'ab', 'rb+', 'wb+', 'ab+'):
raise ValueError('unacceptable mode: %r' % mode)
if not isinstance(s3_url, S3Path):
s3_url = S3Path(s3_url)
if followlinks:
try:
s3_url = s3_url.readlink()
except S3NotALinkError:
pass
bucket, key = parse_s3_url(s3_url.path_with_protocol)
config = botocore.config.Config(max_pool_connections=max_pool_connections)
client = get_s3_client(
config=config,
cache_key='s3_filelike_client',
profile_name=s3_url._profile_name)
return S3CachedHandler(
bucket,
key,
mode,
s3_client=client,
cache_path=cache_path,
profile_name=s3_url._profile_name)
[docs]@_s3_binary_mode
def s3_buffered_open(
s3_url: PathLike,
mode: str,
followlinks: bool = False,
*,
max_concurrency: Optional[int] = None,
max_buffer_size: int = DEFAULT_MAX_BUFFER_SIZE,
forward_ratio: Optional[float] = None,
block_size: int = DEFAULT_BLOCK_SIZE,
limited_seekable: bool = False,
buffered: bool = True,
share_cache_key: Optional[str] = None,
cache_path: Optional[str] = None
) -> Union[S3PrefetchReader, S3BufferedWriter, io.BufferedReader, io.
BufferedWriter, S3MemoryHandler]:
'''Open an asynchronous prefetch reader, to support fast sequential read
.. note ::
User should make sure that reader / writer are closed correctly
Supports context manager
Some parameter setting may perform well: max_concurrency=10 or 20, max_block_size=8 or 16 MB, default value None means using global thread pool
:param max_concurrency: Max download thread number, None by default
:param max_buffer_size: Max cached buffer size in memory, 128MB by default
:param block_size: Size of single block, 8MB by default. Each block will be uploaded or downloaded by single thread.
:param limited_seekable: If write-handle supports limited seek (both file head part and tail part can seek block_size). Notes: This parameter are valid only for write-handle. Read-handle support arbitrary seek
:returns: An opened S3PrefetchReader object
:raises: S3FileNotFoundError
'''
if mode not in ('rb', 'wb', 'ab', 'rb+', 'wb+', 'ab+'):
raise ValueError('unacceptable mode: %r' % mode)
if not isinstance(s3_url, S3Path):
s3_url = S3Path(s3_url)
if followlinks:
try:
s3_url = s3_url.readlink()
except S3NotALinkError:
pass
bucket, key = parse_s3_url(s3_url.path_with_protocol)
config = botocore.config.Config(max_pool_connections=max_pool_connections)
client = get_s3_client(
config=config,
cache_key='s3_filelike_client',
profile_name=s3_url._profile_name)
if 'a' in mode or '+' in mode:
if cache_path is None:
return S3MemoryHandler(
bucket,
key,
mode,
s3_client=client,
profile_name=s3_url._profile_name)
return S3CachedHandler(
bucket,
key,
mode,
s3_client=client,
cache_path=cache_path,
profile_name=s3_url._profile_name)
if mode == 'rb':
# A rough conversion algorithm to align 2 types of Reader / Writer parameters
# TODO: Optimize the conversion algorithm
block_capacity = max_buffer_size // block_size
if forward_ratio is None:
block_forward = None
else:
block_forward = max(int(block_capacity * forward_ratio), 1)
if share_cache_key is not None:
reader = S3ShareCacheReader(
bucket,
key,
cache_key=share_cache_key,
s3_client=client,
max_retries=max_retries,
max_workers=max_concurrency,
block_size=block_size,
block_forward=block_forward,
profile_name=s3_url._profile_name)
else:
reader = S3PrefetchReader(
bucket,
key,
s3_client=client,
max_retries=max_retries,
max_workers=max_concurrency,
block_capacity=block_capacity,
block_forward=block_forward,
block_size=block_size,
profile_name=s3_url._profile_name)
if buffered:
reader = io.BufferedReader(reader) # pytype: disable=wrong-arg-types
return reader
if limited_seekable:
writer = S3LimitedSeekableWriter(
bucket,
key,
s3_client=client,
max_workers=max_concurrency,
max_buffer_size=max_buffer_size,
block_size=block_size,
profile_name=s3_url._profile_name)
else:
writer = S3BufferedWriter(
bucket,
key,
s3_client=client,
max_workers=max_concurrency,
max_buffer_size=max_buffer_size,
block_size=block_size,
profile_name=s3_url._profile_name)
if buffered:
writer = io.BufferedWriter(writer) # pytype: disable=wrong-arg-types
return writer
[docs]@_s3_binary_mode
def s3_memory_open(
s3_url: PathLike, mode: str,
followlinks: bool = False) -> S3MemoryHandler:
'''Open a memory-cache file reader / writer, for frequent random read / write
.. note ::
User should make sure that reader / writer are closed correctly
Supports context manager
:param mode: Mode to open file, could be one of "rb", "wb", "ab", "rb+", "wb+" or "ab+"
:returns: An opened BufferedReader / BufferedWriter object
'''
if mode not in ('rb', 'wb', 'ab', 'rb+', 'wb+', 'ab+'):
raise ValueError('unacceptable mode: %r' % mode)
if not isinstance(s3_url, S3Path):
s3_url = S3Path(s3_url)
if followlinks:
try:
s3_url = s3_url.readlink()
except S3NotALinkError:
pass
bucket, key = parse_s3_url(s3_url.path_with_protocol)
config = botocore.config.Config(max_pool_connections=max_pool_connections)
client = get_s3_client(
config=config,
cache_key='s3_filelike_client',
profile_name=s3_url._profile_name)
return S3MemoryHandler(
bucket, key, mode, s3_client=client, profile_name=s3_url._profile_name)
s3_open = s3_buffered_open
[docs]def s3_download(
src_url: PathLike,
dst_url: PathLike,
followlinks: bool = False,
callback: Optional[Callable[[int], None]] = None) -> None:
'''
Downloads a file from s3 to local filesystem.
:param src_url: source s3 path
:param dst_url: target fs path
:param callback: Called periodically during copy, and the input parameter is the data size (in bytes) of copy since the last call
'''
from megfile.fs import is_fs
from megfile.fs_path import FSPath
if not isinstance(src_url, S3Path):
src_url = S3Path(src_url)
if followlinks:
try:
src_url = src_url.readlink()
except S3NotALinkError:
pass
src_bucket, src_key = parse_s3_url(src_url.path_with_protocol)
if not src_bucket:
raise S3BucketNotFoundError(
'Empty bucket name: %r' % src_url.path_with_protocol)
if not src_url.exists():
raise S3FileNotFoundError(
'File not found: %r' % src_url.path_with_protocol)
if not src_url.is_file():
raise S3IsADirectoryError(
'Is a directory: %r' % src_url.path_with_protocol)
dst_url = fspath(dst_url)
if not is_fs(dst_url):
raise OSError(f'dst_url is not fs path: {dst_url}')
if not dst_url or dst_url.endswith('/'):
raise S3IsADirectoryError('Is a directory: %r' % dst_url)
dst_path = FSPath(dst_url)
dst_directory = os.path.dirname(dst_path.path_without_protocol)
if dst_directory != '':
os.makedirs(dst_directory, exist_ok=True)
client = get_s3_client(profile_name=src_url._profile_name)
try:
client.download_file(
src_bucket,
src_key,
dst_path.path_without_protocol,
Callback=callback)
except Exception as error:
error = translate_fs_error(error, dst_url)
error = translate_s3_error(error, src_url.path_with_protocol)
raise error
src_stat = src_url.stat()
os.utime(
dst_path.path_without_protocol, (src_stat.st_mtime, src_stat.st_mtime))
[docs]def s3_upload(
src_url: PathLike,
dst_url: PathLike,
callback: Optional[Callable[[int], None]] = None,
**kwargs) -> None:
'''
Uploads a file from local filesystem to s3.
:param src_url: source fs path
:param dst_url: target s3 path
:param callback: Called periodically during copy, and the input parameter is the data size (in bytes) of copy since the last call
'''
from megfile.fs import is_fs
from megfile.fs_path import FSPath
if not is_fs(src_url):
raise OSError(f'src_url is not fs path: {src_url}')
src_path = FSPath(src_url)
dst_bucket, dst_key = parse_s3_url(dst_url)
if not dst_bucket:
raise S3BucketNotFoundError('Empty bucket name: %r' % dst_url)
if not dst_key or dst_key.endswith('/'):
raise S3IsADirectoryError('Is a directory: %r' % dst_url)
client = get_s3_client(profile_name=S3Path(dst_url)._profile_name)
with open(src_path.path_without_protocol, 'rb') as src:
with raise_s3_error(dst_url):
client.upload_fileobj(
src, Bucket=dst_bucket, Key=dst_key, Callback=callback)
[docs]def s3_load_content(
s3_url,
start: Optional[int] = None,
stop: Optional[int] = None,
followlinks: bool = False) -> bytes:
'''
Get specified file from [start, stop) in bytes
:param s3_url: Specified path
:param start: start index
:param stop: stop index
:returns: bytes content in range [start, stop)
'''
def _get_object(client, bucket, key, range_str):
return client.get_object(
Bucket=bucket, Key=key, Range=range_str)['Body'].read()
s3_url = S3Path(s3_url)
if followlinks:
try:
s3_url = s3_url.readlink()
except S3NotALinkError:
pass
bucket, key = parse_s3_url(s3_url.path_with_protocol)
if not bucket:
raise S3BucketNotFoundError('Empty bucket name: %r' % s3_url)
if not key or key.endswith('/'):
raise S3IsADirectoryError('Is a directory: %r' % s3_url)
start, stop = get_content_offset(
start, stop, s3_url.getsize(follow_symlinks=False))
if start == 0 and stop == 0:
return b''
range_str = 'bytes=%d-%d' % (start, stop - 1)
client = get_s3_client(profile_name=s3_url._profile_name)
with raise_s3_error(s3_url.path):
return patch_method(
_get_object,
max_retries=max_retries,
should_retry=s3_should_retry,
)(client, bucket, key, range_str)
[docs]def s3_readlink(path) -> str:
'''
Return a string representing the path to which the symbolic link points.
:returns: Return a string representing the path to which the symbolic link points.
:raises: S3NameTooLongError, S3BucketNotFoundError, S3IsADirectoryError, S3NotALinkError
'''
return S3Path(path).readlink().path_with_protocol
[docs]def s3_rename(src_url: PathLike, dst_url: PathLike):
'''
Move s3 file path from src_url to dst_url
:param dst_url: Given destination path
'''
src_path_ins = S3Path(src_url)
if src_path_ins.is_file():
src_path_ins.copy(dst_url)
else:
src_path_ins.sync(dst_url)
src_path_ins.remove()
[docs]class S3Cacher(FileCacher):
cache_path = None
def __init__(
self, path: str, cache_path: Optional[str] = None, mode: str = 'r'):
if mode not in ('r', 'w', 'a'):
raise ValueError('unacceptable mode: %r' % mode)
if mode in ('r', 'a'):
if cache_path is None:
cache_path = generate_cache_path(path)
s3_download(path, cache_path)
self.name = path
self.mode = mode
self.cache_path = cache_path
def _close(self):
if self.cache_path is not None and \
os.path.exists(self.cache_path):
if self.mode in ('w', 'a'):
s3_upload(self.cache_path, self.name)
os.unlink(self.cache_path)
[docs]def s3_glob(
path: PathLike,
recursive: bool = True,
missing_ok: bool = True,
followlinks: bool = False,
) -> List[str]:
'''Return s3 path list in ascending alphabetical order, in which path matches glob pattern
Notes: Only glob in bucket. If trying to match bucket with wildcard characters, raise UnsupportedError
:param recursive: If False, `**` will not search directory recursively
:param missing_ok: If False and target path doesn't match any file, raise FileNotFoundError
:raises: UnsupportedError, when bucket part contains wildcard characters
:returns: A list contains paths match `s3_pathname`
'''
return list(
s3_iglob(
path=path,
recursive=recursive,
missing_ok=missing_ok,
followlinks=followlinks))
[docs]def s3_glob_stat(
path: PathLike,
recursive: bool = True,
missing_ok: bool = True,
followlinks: bool = False) -> Iterator[FileEntry]:
'''Return a generator contains tuples of path and file stat, in ascending alphabetical order, in which path matches glob pattern
Notes: Only glob in bucket. If trying to match bucket with wildcard characters, raise UnsupportedError
:param recursive: If False, `**` will not search directory recursively
:param missing_ok: If False and target path doesn't match any file, raise FileNotFoundError
:raises: UnsupportedError, when bucket part contains wildcard characters
:returns: A generator contains tuples of path and file stat, in which paths match `s3_pathname`
'''
return S3Path(path).glob_stat(
pattern="",
recursive=recursive,
missing_ok=missing_ok,
followlinks=followlinks)
[docs]def s3_iglob(
path: PathLike,
recursive: bool = True,
missing_ok: bool = True,
followlinks: bool = False,
) -> Iterator[str]:
'''Return s3 path iterator in ascending alphabetical order, in which path matches glob pattern
Notes: Only glob in bucket. If trying to match bucket with wildcard characters, raise UnsupportedError
:param recursive: If False, `**` will not search directory recursively
:param missing_ok: If False and target path doesn't match any file, raise FileNotFoundError
:raises: UnsupportedError, when bucket part contains wildcard characters
:returns: An iterator contains paths match `s3_pathname`
'''
for path_obj in S3Path(path).iglob(pattern="", recursive=recursive,
missing_ok=missing_ok,
followlinks=followlinks):
yield path_obj.path_with_protocol
[docs]def s3_makedirs(path: PathLike, exist_ok: bool = False):
'''
Create an s3 directory.
Purely creating directory is invalid because it's unavailable on OSS.
This function is to test the target bucket have WRITE access.
:param path: Given path
:param exist_ok: If False and target directory exists, raise S3FileExistsError
:raises: S3BucketNotFoundError, S3FileExistsError
'''
return S3Path(path).mkdir(parents=True, exist_ok=exist_ok)
def _group_src_paths_by_block(
src_paths: List[PathLike], block_size: int = DEFAULT_BLOCK_SIZE
) -> List[List[Tuple[PathLike, Optional[str]]]]:
groups = []
current_group, current_group_size = [], 0
for src_path in src_paths:
current_file_size = S3Path(src_path).stat().size
if current_file_size == 0:
continue
if current_file_size >= block_size:
if len(groups) == 0:
if current_group_size + current_file_size > 2 * block_size:
group_lack_size = block_size - current_group_size
current_group.append(
(src_path, f'bytes=0-{group_lack_size-1}'))
groups.extend(
[
current_group,
[
(
src_path,
f'bytes={group_lack_size}-{current_file_size-1}'
)
]
])
else:
current_group.append((src_path, None))
groups.append(current_group)
else:
groups[-1].extend(current_group)
groups.append([(src_path, None)])
current_group, current_group_size = [], 0
else:
current_group.append((src_path, None))
current_group_size += current_file_size
if current_group_size >= block_size:
groups.append(current_group)
current_group, current_group_size = [], 0
if current_group:
groups.append(current_group)
return groups
[docs]def s3_concat(
src_paths: List[PathLike],
dst_path: PathLike,
block_size: int = DEFAULT_BLOCK_SIZE,
max_workers: int = GLOBAL_MAX_WORKERS) -> None:
'''Concatenate s3 files to one file.
:param src_paths: Given source paths
:param dst_path: Given destination path
'''
client = S3Path(dst_path)._client
with raise_s3_error(dst_path):
if block_size == 0:
groups = [[(src_path, None)] for src_path in src_paths]
else:
groups = _group_src_paths_by_block(src_paths, block_size=block_size)
with MultiPartWriter(client, dst_path) as writer, ThreadPoolExecutor(
max_workers=max_workers) as executor:
for index, group in enumerate(groups, start=1):
if len(group) == 1:
executor.submit(
writer.upload_part_copy, index, group[0][0],
group[0][1])
else:
executor.submit(writer.upload_part_by_paths, index, group)
[docs]@SmartPath.register
class S3Path(URIPath):
protocol = "s3"
def __init__(self, path: "PathLike", *other_paths: "PathLike"):
super().__init__(path, *other_paths)
protocol = get_url_scheme(self.path)
self._protocol_with_profile = self.protocol
self._profile_name = None
if protocol.startswith('s3+'):
self._protocol_with_profile = protocol
self._profile_name = protocol[3:]
self._s3_path = f"s3://{self.path[len(protocol)+3:]}"
elif not protocol:
self._s3_path = f"s3://{self.path.lstrip('/')}"
else:
self._s3_path = self.path
[docs] @cachedproperty
def path_with_protocol(self) -> str:
'''Return path with protocol, like file:///root, s3://bucket/key'''
path = self.path
protocol_prefix = self._protocol_with_profile + "://"
if path.startswith(protocol_prefix):
return path
return protocol_prefix + path.lstrip('/')
[docs] @cachedproperty
def path_without_protocol(self) -> str:
'''Return path without protocol, example: if path is s3://bucket/key, return bucket/key'''
path = self.path
protocol_prefix = self._protocol_with_profile + "://"
if path.startswith(protocol_prefix):
path = path[len(protocol_prefix):]
return path
@cachedproperty
def _client(self):
return get_s3_client(profile_name=self._profile_name)
def _s3_get_metadata(self) -> dict:
'''
Get object metadata
:param path: Object path
:returns: Object metadata
'''
bucket, key = parse_s3_url(self.path_with_protocol)
if not bucket:
return {}
if not key or key.endswith('/'):
return {}
try:
with raise_s3_error(self.path_with_protocol):
resp = self._client.head_object(Bucket=bucket, Key=key)
return dict(
(key.lower(), value) for key, value in resp['Metadata'].items())
except Exception as error:
if isinstance(error,
(S3UnknownError, S3ConfigError, S3PermissionError)):
raise error
return {}
[docs] def access(
self, mode: Access = Access.READ,
followlinks: bool = False) -> bool:
'''
Test if path has access permission described by mode
:param mode: access mode
:returns: bool, if the bucket of s3_url has read/write access.
'''
s3_url = self.path_with_protocol
if followlinks:
try:
s3_url = self.readlink().path_with_protocol
except S3NotALinkError:
pass
bucket, key = parse_s3_url(s3_url) # only check bucket accessibility
if not bucket:
raise Exception("No available bucket")
if not isinstance(mode, Access):
raise TypeError(
'Unsupported mode: {} -- Mode should use one of the enums belonging to: {}'
.format(mode, ', '.join([str(a) for a in Access])))
if mode not in (Access.READ, Access.WRITE):
raise TypeError('Unsupported mode: {}'.format(mode))
try:
if not self.exists():
return False
except Exception as error:
error = translate_s3_error(error, s3_url)
if isinstance(error, S3PermissionError):
return False
raise error
if mode == Access.READ:
return True
try:
if not key:
key = 'test'
elif key.endswith('/'):
key = key[:-1]
upload_id = self._client.create_multipart_upload(
Bucket=bucket,
Key=key,
)['UploadId']
self._client.abort_multipart_upload(
Bucket=bucket,
Key=key,
UploadId=upload_id,
)
return True
except Exception as error:
error = translate_s3_error(error, s3_url)
if isinstance(error, S3PermissionError):
return False
raise error
[docs] def exists(self, followlinks: bool = False) -> bool:
'''
Test if s3_url exists
If the bucket of s3_url are not permitted to read, return False
:returns: True if s3_url eixsts, else False
'''
bucket, key = parse_s3_url(self.path_with_protocol)
if not bucket: # s3:// => True, s3:///key => False
return not key
return self.is_file(followlinks) or self.is_dir()
[docs] def getmtime(self, follow_symlinks: bool = False) -> float:
'''
Get last-modified time of the file on the given s3_url path (in Unix timestamp format).
If the path is an existent directory, return the latest modified time of all file in it. The mtime of empty directory is 1970-01-01 00:00:00
If s3_url is not an existent path, which means s3_exist(s3_url) returns False, then raise S3FileNotFoundError
:returns: Last-modified time
:raises: S3FileNotFoundError, UnsupportedError
'''
return self.stat(follow_symlinks=follow_symlinks).mtime
[docs] def getsize(self, follow_symlinks: bool = False) -> int:
'''
Get file size on the given s3_url path (in bytes).
If the path in a directory, return the sum of all file size in it, including file in subdirectories (if exist).
The result excludes the size of directory itself. In other words, return 0 Byte on an empty directory path.
If s3_url is not an existent path, which means s3_exist(s3_url) returns False, then raise S3FileNotFoundError
:returns: File size
:raises: S3FileNotFoundError, UnsupportedError
'''
return self.stat(follow_symlinks=follow_symlinks).size
[docs] def glob(
self,
pattern,
recursive: bool = True,
missing_ok: bool = True,
followlinks: bool = False,
) -> List['S3Path']:
'''Return s3 path list in ascending alphabetical order, in which path matches glob pattern
Notes: Only glob in bucket. If trying to match bucket with wildcard characters, raise UnsupportedError
:param pattern: Glob the given relative pattern in the directory represented by this path
:param recursive: If False, `**` will not search directory recursively
:param missing_ok: If False and target path doesn't match any file, raise FileNotFoundError
:raises: UnsupportedError, when bucket part contains wildcard characters
:returns: A list contains paths match `s3_pathname`
'''
return list(
self.iglob(
pattern=pattern,
recursive=recursive,
missing_ok=missing_ok,
followlinks=followlinks))
[docs] def glob_stat(
self,
pattern,
recursive: bool = True,
missing_ok: bool = True,
followlinks: bool = False) -> Iterator[FileEntry]:
'''Return a generator contains tuples of path and file stat, in ascending alphabetical order, in which path matches glob pattern
Notes: Only glob in bucket. If trying to match bucket with wildcard characters, raise UnsupportedError
:param pattern: Glob the given relative pattern in the directory represented by this path
:param recursive: If False, `**` will not search directory recursively
:param missing_ok: If False and target path doesn't match any file, raise FileNotFoundError
:raises: UnsupportedError, when bucket part contains wildcard characters
:returns: A generator contains tuples of path and file stat, in which paths match `s3_pathname`
'''
glob_path = self._s3_path
if pattern:
glob_path = self.joinpath(pattern)._s3_path # pytype: disable=attribute-error
s3_pathname = fspath(glob_path)
def create_generator():
for group_s3_pathname_1 in _group_s3path_by_bucket(
s3_pathname, self._profile_name):
for group_s3_pathname_2 in _group_s3path_by_prefix(
group_s3_pathname_1):
for file_entry in _s3_glob_stat_single_path(
group_s3_pathname_2, recursive, missing_ok,
followlinks=followlinks,
profile_name=self._profile_name):
if self._profile_name:
file_entry = file_entry._replace(
path=
f"{self._protocol_with_profile}://{file_entry.path[5:]}"
)
yield file_entry
return _create_missing_ok_generator(
create_generator(), missing_ok,
S3FileNotFoundError('No match file: %r' % s3_pathname))
[docs] def iglob(
self,
pattern,
recursive: bool = True,
missing_ok: bool = True,
followlinks: bool = False,
) -> Iterator['S3Path']:
'''Return s3 path iterator in ascending alphabetical order, in which path matches glob pattern
Notes: Only glob in bucket. If trying to match bucket with wildcard characters, raise UnsupportedError
:param pattern: Glob the given relative pattern in the directory represented by this path
:param recursive: If False, `**` will not search directory recursively
:param missing_ok: If False and target path doesn't match any file, raise FileNotFoundError
:raises: UnsupportedError, when bucket part contains wildcard characters
:returns: An iterator contains paths match `s3_pathname`
'''
for file_entry in self.glob_stat(pattern=pattern, recursive=recursive,
missing_ok=missing_ok,
followlinks=followlinks):
yield self.from_path(file_entry.path)
[docs] def is_dir(self, followlinks: bool = False) -> bool:
'''
Test if an s3 url is directory
Specific procedures are as follows:
If there exists a suffix, of which ``os.path.join(s3_url, suffix)`` is a file
If the url is empty bucket or s3://
:param followlinks: whether followlinks is True or False, result is the same. Because s3 symlink not support dir.
:returns: True if path is s3 directory, else False
'''
bucket, key = parse_s3_url(self.path_with_protocol)
if not bucket: # s3:// => True, s3:///key => False
return not key
prefix = _become_prefix(key)
try:
resp = self._client.list_objects_v2(
Bucket=bucket, Prefix=prefix, Delimiter='/', MaxKeys=1)
except Exception as error:
error = translate_s3_error(error, self.path_with_protocol)
if isinstance(error,
(S3UnknownError, S3ConfigError, S3PermissionError)):
raise error
return False
if not key: # bucket is accessible
return True
if 'KeyCount' in resp:
return resp['KeyCount'] > 0
return len(resp.get('Contents', [])) > 0 or \
len(resp.get('CommonPrefixes', [])) > 0
[docs] def is_file(self, followlinks: bool = False) -> bool:
'''
Test if an s3_url is file
:returns: True if path is s3 file, else False
'''
s3_url = self.path_with_protocol
if followlinks:
try:
s3_url = self.readlink().path_with_protocol
except S3NotALinkError:
pass
bucket, key = parse_s3_url(s3_url)
if not bucket or not key or key.endswith('/'):
# s3://, s3:///key, s3://bucket, s3://bucket/prefix/
return False
try:
self._client.head_object(Bucket=bucket, Key=key)
except Exception as error:
error = translate_s3_error(error, s3_url)
if isinstance(error,
(S3UnknownError, S3ConfigError, S3PermissionError)):
raise error
return False
return True
[docs] def listdir(self, followlinks: bool = False) -> List[str]:
'''
Get all contents of given s3_url. The result is in acsending alphabetical order.
:returns: All contents have prefix of s3_url in acsending alphabetical order
:raises: S3FileNotFoundError, S3NotADirectoryError
'''
entries = list(self.scandir(followlinks=followlinks))
return sorted([entry.name for entry in entries])
[docs] def iterdir(self, followlinks: bool = False) -> Iterator['S3Path']:
'''
Get all contents of given s3_url. The result is in acsending alphabetical order.
:returns: All contents have prefix of s3_url in acsending alphabetical order
:raises: S3FileNotFoundError, S3NotADirectoryError
'''
for path in self.listdir(followlinks=followlinks):
yield self.joinpath(path) # type: ignore
[docs] def load(self, followlinks: bool = False) -> BinaryIO:
'''Read all content in binary on specified path and write into memory
User should close the BinaryIO manually
:returns: BinaryIO
'''
s3_url = self.path_with_protocol
if followlinks:
try:
s3_url = self.readlink().path_with_protocol
except S3NotALinkError:
pass
bucket, key = parse_s3_url(s3_url)
if not bucket:
raise S3BucketNotFoundError('Empty bucket name: %r' % s3_url)
if not key or key.endswith('/'):
raise S3IsADirectoryError('Is a directory: %r' % s3_url)
buffer = io.BytesIO()
with raise_s3_error(s3_url):
self._client.download_fileobj(bucket, key, buffer)
buffer.seek(0)
return buffer
[docs] def hasbucket(self) -> bool:
'''
Test if the bucket of s3_url exists
:returns: True if bucket of s3_url eixsts, else False
'''
bucket, _ = parse_s3_url(self.path_with_protocol)
if not bucket:
return False
try:
self._client.head_bucket(Bucket=bucket)
except Exception as error:
error = translate_s3_error(error, self.path_with_protocol)
if isinstance(error, S3PermissionError):
# Aliyun OSS doesn't give bucket api permission when you only have read and write permission
try:
self._client.list_objects_v2(Bucket=bucket, MaxKeys=1)
return True
except Exception as error2:
error2 = translate_s3_error(error2, self.path_with_protocol)
if isinstance(
error2,
(S3UnknownError, S3ConfigError, S3PermissionError)):
raise error2
return False
elif isinstance(error, (S3UnknownError, S3ConfigError)):
raise error
elif isinstance(error, S3FileNotFoundError):
return False
return True
[docs] def mkdir(self, mode=0o777, parents: bool = False, exist_ok: bool = False):
'''
Create an s3 directory.
Purely creating directory is invalid because it's unavailable on OSS.
This function is to test the target bucket have WRITE access.
:param mode: mode is ignored, only be compatible with pathlib.Path
:param parents: parents is ignored, only be compatible with pathlib.Path
:param exist_ok: If False and target directory exists, raise S3FileExistsError
:raises: S3BucketNotFoundError, S3FileExistsError
'''
bucket, _ = parse_s3_url(self.path_with_protocol)
if not bucket:
raise S3BucketNotFoundError(
'Empty bucket name: %r' % self.path_with_protocol)
if not self.hasbucket():
raise S3BucketNotFoundError(
'No such bucket: %r' % self.path_with_protocol)
if exist_ok:
if self.is_file():
raise S3FileExistsError(
'File exists: %r' % self.path_with_protocol)
return
if self.exists():
raise S3FileExistsError('File exists: %r' % self.path_with_protocol)
[docs] def move(self, dst_url: PathLike) -> None:
'''
Move file/directory path from src_url to dst_url
:param dst_url: Given destination path
'''
for src_file_path, dst_file_path in _s3_scan_pairs(
self.path_with_protocol, dst_url):
S3Path(src_file_path).rename(dst_file_path)
[docs] def remove(self, missing_ok: bool = False) -> None:
'''
Remove the file or directory on s3, `s3://` and `s3://bucket` are not permitted to remove
:param missing_ok: if False and target file/directory not exists, raise S3FileNotFoundError
:raises: S3PermissionError, S3FileNotFoundError, UnsupportedError
'''
bucket, key = parse_s3_url(self.path_with_protocol)
if not bucket:
if not key:
raise UnsupportedError(
'Remove whole s3', self.path_with_protocol)
raise S3BucketNotFoundError(
'Empty bucket name: %r' % self.path_with_protocol)
if not key:
raise UnsupportedError('Remove bucket', self.path_with_protocol)
if not self.exists():
if missing_ok:
return
raise S3FileNotFoundError(
'No such file or directory: %r' % self.path_with_protocol)
client = self._client
with raise_s3_error(self.path_with_protocol):
if self.is_file():
client.delete_object(Bucket=bucket, Key=key)
return
prefix = _become_prefix(key)
total_count, error_count = 0, 0
for resp in _list_objects_recursive(client, bucket, prefix):
if 'Contents' in resp:
keys = [
{
'Key': content['Key']
} for content in resp['Contents']
]
total_count += len(keys)
errors = []
retries = 2
retry_interval = min(0.1 * 2**retries, 30)
for i in range(retries):
# doc: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.delete_objects
if not keys:
break
response = client.delete_objects(
Bucket=bucket, Delete={'Objects': keys})
keys = []
for error_info in response.get('Errors', []):
if s3_error_code_should_retry(
error_info.get('Code')):
error_logger.warning(
"retry %s times, removing file: %s, with error %s: %s"
% (
i + 1, error_info['Key'],
error_info['Code'],
error_info['Message']))
keys.append({'Key': error_info['Key']})
else:
errors.append(error_info)
time.sleep(retry_interval)
for error_info in errors:
error_logger.error(
"failed remove file: %s, with error %s: %s" % (
error_info['Key'], error_info['Code'],
error_info['Message']))
error_count += len(errors)
if error_count > 0:
error_msg = "failed remove path: %s, total file count: %s, failed count: %s" % (
self.path_with_protocol, total_count, error_count)
raise S3UnknownError(
Exception(error_msg), self.path_with_protocol)
[docs] def rename(self, dst_path: PathLike) -> 'S3Path':
'''
Move s3 file path from src_url to dst_url
:param dst_path: Given destination path
'''
s3_rename(self.path_with_protocol, dst_path)
return self.from_path(dst_path)
[docs] def scan(self, missing_ok: bool = True,
followlinks: bool = False) -> Iterator[str]:
'''
Iteratively traverse only files in given s3 directory, in alphabetical order.
Every iteration on generator yields a path string.
If s3_url is a file path, yields the file only
If s3_url is a non-existent path, return an empty generator
If s3_url is a bucket path, return all file paths in the bucket
If s3_url is an empty bucket, return an empty generator
If s3_url doesn't contain any bucket, which is s3_url == 's3://', raise UnsupportedError. walk() on complete s3 is not supported in megfile
:param missing_ok: If False and there's no file in the directory, raise FileNotFoundError
:raises: UnsupportedError
:returns: A file path generator
'''
scan_stat_iter = self.scan_stat(
missing_ok=missing_ok, followlinks=followlinks)
def create_generator() -> Iterator[str]:
for file_entry in scan_stat_iter:
yield file_entry.path
return create_generator()
[docs] def scan_stat(self, missing_ok: bool = True,
followlinks: bool = False) -> Iterator[FileEntry]:
'''
Iteratively traverse only files in given directory, in alphabetical order.
Every iteration on generator yields a tuple of path string and file stat
:param missing_ok: If False and there's no file in the directory, raise FileNotFoundError
:raises: UnsupportedError
:returns: A file path generator
'''
bucket, key = parse_s3_url(self.path_with_protocol)
if not bucket:
raise UnsupportedError('Scan whole s3', self.path_with_protocol)
def create_generator() -> Iterator[FileEntry]:
if not self.is_dir():
if self.is_file():
# On s3, file and directory may be of same name and level, so need to test the path is file or directory
yield FileEntry(
self.name, fspath(self.path_with_protocol),
self.stat(follow_symlinks=followlinks))
return
if not key.endswith('/') and self.is_file():
yield FileEntry(
self.name, fspath(self.path_with_protocol),
self.stat(follow_symlinks=followlinks))
prefix = _become_prefix(key)
client = self._client
with raise_s3_error(self.path_with_protocol):
for resp in _list_objects_recursive(client, bucket, prefix):
for content in resp.get('Contents', []):
full_path = s3_path_join(
f'{self._protocol_with_profile}://', bucket,
content['Key'])
yield FileEntry(
S3Path(full_path).name, full_path,
_make_stat(content))
return _create_missing_ok_generator(
create_generator(), missing_ok,
S3FileNotFoundError('No match file: %r' % self.path_with_protocol))
[docs] def scandir(self, followlinks: bool = False) -> Iterator[FileEntry]:
'''
Get all contents of given s3_url, the order of result is not guaranteed.
:returns: All contents have prefix of s3_url
:raises: S3FileNotFoundError, S3NotADirectoryError
'''
bucket, key = parse_s3_url(self.path_with_protocol)
if not bucket and key:
raise S3BucketNotFoundError(
'Empty bucket name: %r' % self.path_with_protocol)
if self.is_file():
raise S3NotADirectoryError(
'Not a directory: %r' % self.path_with_protocol)
elif not self.is_dir():
raise S3FileNotFoundError(
'No such directory: %r' % self.path_with_protocol)
prefix = _become_prefix(key)
client = self._client
# In order to do check on creation,
# we need to wrap the iterator in another function
def create_generator() -> Iterator[FileEntry]:
with raise_s3_error(self.path_with_protocol):
def generate_s3_path(
protocol: str, bucket: str, key: str) -> str:
return "%s://%s/%s" % (protocol, bucket, key)
if not bucket and not key: # list buckets
response = client.list_buckets()
for content in response['Buckets']:
yield FileEntry(
content['Name'], f"s3://{content['Name']}",
StatResult(
ctime=content['CreationDate'].timestamp(),
isdir=True,
extra=content,
))
return
for resp in _list_objects_recursive(client, bucket, prefix,
'/'):
for common_prefix in resp.get('CommonPrefixes', []):
yield FileEntry(
common_prefix['Prefix'][len(prefix):-1],
generate_s3_path(
self._protocol_with_profile, bucket,
common_prefix['Prefix']),
StatResult(isdir=True, extra=common_prefix))
for content in resp.get('Contents', []):
src_url = generate_s3_path(
self._protocol_with_profile, bucket, content['Key'])
if followlinks and S3Path(src_url).is_symlink():
content['islnk'] = True
yield FileEntry(
content['Key'][len(prefix):], src_url,
_make_stat(content))
return ContextIterator(create_generator())
def _getdirstat(self) -> StatResult:
'''
Return StatResult of given s3_url directory, including:
1. Directory size: the sum of all file size in it, including file in subdirectories (if exist).
The result excludes the size of directory itself. In other words, return 0 Byte on an empty directory path
2. Last-modified time of directory: return the latest modified time of all file in it. The mtime of empty directory is 1970-01-01 00:00:00
:returns: An int indicates size in Bytes
'''
if not self.is_dir():
raise S3FileNotFoundError(
'No such file or directory: %r' % self.path_with_protocol)
bucket, key = parse_s3_url(self.path_with_protocol)
prefix = _become_prefix(key)
client = self._client
size = 0
mtime = 0.0
with raise_s3_error(self.path_with_protocol):
for resp in _list_objects_recursive(client, bucket, prefix):
for content in resp.get('Contents', []):
size += content['Size']
last_modified = content['LastModified'].timestamp()
if mtime < last_modified:
mtime = last_modified
return StatResult(size=size, mtime=mtime, isdir=True)
[docs] def stat(self, follow_symlinks=True) -> StatResult:
'''
Get StatResult of s3_url file, including file size and mtime, referring to s3_getsize and s3_getmtime
If s3_url is not an existent path, which means s3_exist(s3_url) returns False, then raise S3FileNotFoundError
If attempt to get StatResult of complete s3, such as s3_dir_url == 's3://', raise S3BucketNotFoundError
:returns: StatResult
:raises: S3FileNotFoundError, S3BucketNotFoundError
'''
islnk = False
bucket, key = parse_s3_url(self.path_with_protocol)
if not bucket:
raise S3BucketNotFoundError(
'Empty bucket name: %r' % self.path_with_protocol)
if not self.is_file():
return self._getdirstat()
client = self._client
with raise_s3_error(self.path_with_protocol):
content = client.head_object(Bucket=bucket, Key=key)
if 'Metadata' in content:
metadata = dict(
(key.lower(), value)
for key, value in content['Metadata'].items())
if metadata and 'symlink_to' in metadata:
islnk = True
if islnk and follow_symlinks:
s3_url = metadata['symlink_to']
bucket, key = parse_s3_url(s3_url)
content = client.head_object(Bucket=bucket, Key=key)
stat_record = StatResult(
islnk=islnk,
size=content['ContentLength'],
mtime=content['LastModified'].timestamp(),
extra=content)
return stat_record
[docs] def lstat(self) -> StatResult:
'''Like Path.stat() but, if the path points to a symbolic link, return the symbolic link’s information rather than its target’s.'''
return self.stat(follow_symlinks=False)
[docs] def unlink(self, missing_ok: bool = False) -> None:
'''
Remove the file on s3
:param missing_ok: if False and target file not exists, raise S3FileNotFoundError
:raises: S3PermissionError, S3FileNotFoundError, S3IsADirectoryError
'''
bucket, key = parse_s3_url(self.path_with_protocol)
if not bucket or not key or key.endswith('/'):
raise S3IsADirectoryError(
'Is a directory: %r' % self.path_with_protocol)
if not self.is_file():
if missing_ok:
return
raise S3FileNotFoundError(
'No such file: %r' % self.path_with_protocol)
with raise_s3_error(self.path_with_protocol):
self._client.delete_object(Bucket=bucket, Key=key)
[docs] def walk(self, followlinks: bool = False
) -> Iterator[Tuple[str, List[str], List[str]]]:
'''
Iteratively traverse the given s3 directory, in top-bottom order. In other words, firstly traverse parent directory, if subdirectories exist, traverse the subdirectories in alphabetical order.
Every iteration on generator yields a 3-tuple: (root, dirs, files)
- root: Current s3 path;
- dirs: Name list of subdirectories in current directory. The list is sorted by name in ascending alphabetical order;
- files: Name list of files in current directory. The list is sorted by name in ascending alphabetical order;
If s3_url is a file path, return an empty generator
If s3_url is a non-existent path, return an empty generator
If s3_url is a bucket path, bucket will be the top directory, and will be returned at first iteration of generator
If s3_url is an empty bucket, only yield one 3-tuple (notes: s3 doesn't have empty directory)
If s3_url doesn't contain any bucket, which is s3_url == 's3://', raise UnsupportedError. walk() on complete s3 is not supported in megfile
:param followlinks: whether followlinks is True or False, result is the same. Because s3 symlink not support dir.
:raises: UnsupportedError
:returns: A 3-tuple generator
'''
bucket, key = parse_s3_url(self.path_with_protocol)
if not bucket:
raise UnsupportedError('Walk whole s3', self.path_with_protocol)
if not self.is_dir():
return
stack = [key]
client = self._client
while len(stack) > 0:
current = _become_prefix(stack.pop())
dirs, files = [], []
for resp in _list_objects_recursive(client, bucket, current, '/'):
for common_prefix in resp.get('CommonPrefixes', []):
dirs.append(common_prefix['Prefix'][:-1])
for content in resp.get('Contents', []):
files.append(content['Key'])
dirs = sorted(dirs)
stack.extend(reversed(dirs))
root = s3_path_join(
f'{self._protocol_with_profile}://', bucket, current)[:-1]
dirs = [path[len(current):] for path in dirs]
files = sorted(path[len(current):] for path in files)
yield root, dirs, files
[docs] def md5(self, recalculate: bool = False, followlinks: bool = False) -> str:
'''
Get md5 meta info in files that uploaded/copied via megfile
If meta info is lost or non-existent, return None
:param recalculate: calculate md5 in real-time or return s3 etag
:param followlinks: If is True, calculate md5 for real file
:returns: md5 meta info
'''
bucket, _ = parse_s3_url(self.path_with_protocol)
if not bucket:
raise S3BucketNotFoundError(
'Empty bucket name: %r' % self.path_with_protocol)
stat = self.stat(follow_symlinks=followlinks)
if stat.isdir:
hash_md5 = hashlib.md5() # nosec
for file_name in self.listdir():
chunk = S3Path(
s3_path_join(
self.path_with_protocol,
file_name)).md5(recalculate=recalculate).encode()
hash_md5.update(chunk)
return hash_md5.hexdigest()
if recalculate:
path_instance = self
if followlinks:
try:
path_instance = self.readlink()
except S3NotALinkError:
pass
with path_instance.open('rb') as f:
return calculate_md5(f)
return stat.extra.get('ETag', '')[1:-1]
[docs] def copy(
self,
dst_url: PathLike,
followlinks: bool = False,
callback: Optional[Callable[[int], None]] = None) -> None:
''' File copy on S3
Copy content of file on `src_path` to `dst_path`.
It's caller's responsebility to ensure the s3_isfile(src_url) == True
:param dst_path: Target file path
:param callback: Called periodically during copy, and the input parameter is the data size (in bytes) of copy since the last call
'''
src_url = self.path_with_protocol
src_bucket, src_key = parse_s3_url(src_url)
dst_bucket, dst_key = parse_s3_url(dst_url)
if dst_bucket == src_bucket and src_key.rstrip('/') == dst_key.rstrip(
'/'):
raise SameFileError(
f"'{src_url}' and '{dst_url}' are the same file")
if not src_bucket:
raise S3BucketNotFoundError('Empty bucket name: %r' % src_url)
if self.is_dir():
raise S3IsADirectoryError('Is a directory: %r' % src_url)
if not dst_bucket:
raise S3BucketNotFoundError('Empty bucket name: %r' % dst_url)
if not dst_key or dst_key.endswith('/'):
raise S3IsADirectoryError('Is a directory: %r' % dst_url)
if followlinks:
try:
s3_url = self.readlink().path
src_bucket, src_key = parse_s3_url(s3_url)
except S3NotALinkError:
pass
with raise_s3_error(f"'{src_url}' or '{dst_url}'"):
self._client.copy(
{
'Bucket': src_bucket,
'Key': src_key,
},
Bucket=dst_bucket,
Key=dst_key,
Callback=callback)
[docs] def sync(
self, dst_url: PathLike, followlinks: bool = False,
force: bool = False) -> None:
'''
Copy file/directory on src_url to dst_url
:param dst_url: Given destination path
:param followlinks: False if regard symlink as file, else True
:param force: Sync file forcely, do not ignore same files
'''
for src_file_path, dst_file_path in _s3_scan_pairs(
self.path_with_protocol, dst_url):
src_file_path = self.from_path(src_file_path)
dst_file_path = self.from_path(dst_file_path)
if not force and dst_file_path.exists() and is_same_file(
src_file_path.stat(), dst_file_path.stat(), 'copy'):
continue
src_file_path.copy(dst_file_path, followlinks=followlinks)
[docs] def symlink(self, dst_path: PathLike) -> None:
'''
Create a symbolic link pointing to src_path named dst_path.
:param dst_path: Desination path
:raises: S3NameTooLongError, S3BucketNotFoundError, S3IsADirectoryError
'''
if len(str(self._s3_path).encode()) > 1024:
raise S3NameTooLongError('File name too long: %r' % dst_path)
src_bucket, src_key = parse_s3_url(self.path_with_protocol)
dst_bucket, dst_key = parse_s3_url(dst_path)
if not src_bucket:
raise S3BucketNotFoundError('Empty bucket name: %r' % self.path)
if not dst_bucket:
raise S3BucketNotFoundError('Empty bucket name: %r' % dst_path)
if not dst_key or dst_key.endswith('/'):
raise S3IsADirectoryError('Is a directory: %r' % dst_path)
src_path = self._s3_path
try:
src_path = self.readlink()._s3_path
except S3NotALinkError:
pass
with raise_s3_error(dst_path):
self._client.put_object(
Bucket=dst_bucket,
Key=dst_key,
Metadata={"symlink_to": src_path})
[docs] def readlink(self) -> 'S3Path':
'''
Return a S3Path instance representing the path to which the symbolic link points.
:returns: Return a S3Path instance representing the path to which the symbolic link points.
:raises: S3NameTooLongError, S3BucketNotFoundError, S3IsADirectoryError, S3NotALinkError
'''
bucket, key = parse_s3_url(self.path_with_protocol)
if not bucket:
raise S3BucketNotFoundError(
'Empty bucket name: %r' % self.path_with_protocol)
if not key or key.endswith('/'):
raise S3IsADirectoryError(
'Is a directory: %r' % self.path_with_protocol)
metadata = self._s3_get_metadata()
if not 'symlink_to' in metadata:
raise S3NotALinkError('Not a link: %r' % self.path_with_protocol)
else:
return self.from_path(metadata['symlink_to'])
[docs] def is_symlink(self) -> bool:
'''
Test whether a path is link
:returns: True if a path is link, else False
:raises: S3NotALinkError
'''
bucket, key = parse_s3_url(self.path_with_protocol)
if not bucket:
return False
if not key or key.endswith('/'):
return False
metadata = self._s3_get_metadata()
return 'symlink_to' in metadata
[docs] def save(self, file_object: BinaryIO):
'''Write the opened binary stream to specified path, but the stream won't be closed
:param file_object: Stream to be read
'''
bucket, key = parse_s3_url(self.path_with_protocol)
if not bucket:
raise S3BucketNotFoundError(
'Empty bucket name: %r' % self.path_with_protocol)
if not key or key.endswith('/'):
raise S3IsADirectoryError(
'Is a directory: %r' % self.path_with_protocol)
with raise_s3_error(self.path_with_protocol):
self._client.upload_fileobj(file_object, Bucket=bucket, Key=key)
[docs] def open(
self,
mode: str = 'r',
*,
encoding: Optional[str] = None,
errors: Optional[str] = None,
s3_open_func: Callable[[str, str], BinaryIO] = s3_open,
**kwargs) -> IO[AnyStr]: # pytype: disable=signature-mismatch
return s3_open_func( # pytype: disable=wrong-keyword-args
self,
mode,
encoding=encoding,
errors=errors,
**necessary_params(s3_open_func, **kwargs))
[docs] def absolute(self) -> 'S3Path':
'''
Make the path absolute, without normalization or resolving symlinks. Returns a new path object
'''
return self
[docs] def cwd(self) -> 'S3Path':
'''Return current working directory
returns: Current working directory
'''
return self.from_path(self.path_with_protocol)
class MultiPartWriter:
def __init__(self, client, path: PathLike) -> None:
self._client = client
self._multipart_upload_info = []
bucket, key = parse_s3_url(path)
self._bucket = bucket
self._key = key
self._upload_id = self._client.create_multipart_upload(
Bucket=self._bucket, Key=self._key)['UploadId']
def upload_part(self, part_num: int, file_obj: io.BytesIO) -> None:
response = self._client.upload_part(
Body=file_obj,
UploadId=self._upload_id,
PartNumber=part_num,
Bucket=self._bucket,
Key=self._key,
)
self._multipart_upload_info.append(
{
'PartNumber': part_num,
'ETag': response['ETag']
})
def upload_part_by_paths(
self, part_num: int, paths: List[Tuple[PathLike, str]]) -> None:
file_obj = io.BytesIO()
for path, bytes_range in paths:
bucket, key = parse_s3_url(path)
if bytes_range:
file_obj.write(
self._client.get_object(
Bucket=bucket, Key=key,
Range=bytes_range)['Body'].read())
else:
file_obj.write(
self._client.get_object(Bucket=bucket,
Key=key)['Body'].read())
file_obj.seek(0, os.SEEK_SET)
self.upload_part(part_num, file_obj)
def upload_part_copy(
self,
part_num: int,
path: PathLike,
copy_source_range: Optional[str] = None) -> None:
bucket, key = parse_s3_url(path)
params = dict(
UploadId=self._upload_id,
PartNumber=part_num,
CopySource={
'Bucket': bucket,
'Key': key
},
Bucket=self._bucket,
Key=self._key,
)
if copy_source_range:
params['CopySourceRange'] = copy_source_range
response = self._client.upload_part_copy(**params)
self._multipart_upload_info.append(
{
'PartNumber': part_num,
'ETag': response['CopyPartResult']['ETag']
})
def close(self):
self._multipart_upload_info.sort(key=lambda t: t['PartNumber'])
self._client.complete_multipart_upload(
UploadId=self._upload_id,
Bucket=self._bucket,
Key=self._key,
MultipartUpload={'Parts': self._multipart_upload_info})
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()