Source code for keg_storage.backends.base
import enum
import typing
import arrow
class ListEntry(typing.NamedTuple):
name: str
last_modified: arrow.Arrow
size: int
[docs]class FileMode(enum.Flag):
read = enum.auto()
write = enum.auto()
def __str__(self):
s = 'r' if self & FileMode.read else ''
s += 'w' if self & FileMode.write else ''
return f'{s}b'
@classmethod
def as_mode(cls, obj):
if isinstance(obj, cls):
return obj
if not isinstance(obj, str):
raise ValueError('as_mode() accepts only FileMode or str arguments')
mode = cls(0)
if 'r' in obj:
mode |= cls.read
if 'w' in obj:
mode |= cls.write
return mode
class RemoteFile:
"""
This is a base class for objects returned by a backend's `open()` method. This is a file-like
object that provides read/write operations to the remote file. When creating a new backend, you
should subclass this and implement `read()` and `write()` methods at minimum.
After construction, a RemoteFile is presumed to be in an "open" state and should accept calls
to any of its methods. Any cleanup should be done in the `close()` method.
"""
# This is the default chunk size to use when iterating over this object
iter_chunk_size = 5 * 1024 * 1024
def __init__(self, mode: FileMode):
"""
Override this constructor to accept any additional arguments needed by the backend and to
perform any initialization required to get the file into an "open" state.
"""
self.mode = mode
def read(self, size: int) -> bytes:
"""
Read and return up to `size` bytes from the remote file. If the end of the file is reached
this should return an empty bytes string.
"""
raise NotImplementedError
def write(self, data: bytes) -> None:
"""
Write the data buffer to the remote file.
"""
raise NotImplementedError
def close(self):
"""
Cleanup and deallocate any held resources. This method may be called multiple times on a
single instance. If the file was already closed, this method should do nothing.
"""
pass
def iter_chunks(self, chunk_size: int = None):
"""
Iterate over the file in blocks of `chunk_size`.
"""
chunk_size = chunk_size or self.iter_chunk_size
while True:
chunk = self.read(chunk_size)
if chunk == b'':
break
yield chunk
def __iter__(self):
return self.iter_chunks()
def __del__(self):
self.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
class StorageBackend:
name = None
def __init__(self, *args, **kwargs):
pass
def list(self, path: str) -> typing.List[ListEntry]:
"""
Returns a list of `ListEntry`s representing files available under the directory or prefix
given in `path`.
"""
raise NotImplementedError()
def open(self, path: str, mode: typing.Union[FileMode, str]) -> RemoteFile:
"""
Returns a instance of RemoteFile for the given `path` that can be used for
reading and/or writing depending on the `mode` given.
"""
raise NotImplementedError()
def delete(self, path: str):
"""
Delete the remote file specified by `path`.
"""
raise NotImplementedError()
def get(self, path: str, dest: str) -> None:
"""
Copies a remote file at `path` to the `dest` path given on the local filesystem.
"""
with self.open(path, FileMode.read) as infile, open(dest, str(FileMode.write)) as outfile:
for chunk in infile.iter_chunks():
outfile.write(chunk)
def put(self, path: str, dest: str) -> None:
"""
Copies a local file at `path` to a remote file at `dest`.
"""
buffer_size = 5 * 1024 * 1024
with self.open(dest, FileMode.write) as outfile, open(path, str(FileMode.read)) as infile:
buf = infile.read(buffer_size)
while buf:
outfile.write(buf)
buf = infile.read(buffer_size)
def __str__(self):
return self.__class__.__name__
class FileNotFoundInStorageError(Exception):
def __init__(self, storage_type, filename):
self.storage_type = storage_type
self.filename = filename
def __str__(self):
return "File {} not found in {}.".format(self.filename, str(self.storage_type))