Source code for keg_storage.backends.filesystem

import os
import string
from operator import attrgetter
from typing import (
    List,
    Optional,
    Union,
)

import pathlib

import arrow

from keg_storage.backends import base


class LocalFSError(Exception):
    pass


[docs]class LocalFSFile(base.RemoteFile): def __init__(self, path: pathlib.Path, mode: base.FileMode): self.path = path if mode & base.FileMode.read and mode & base.FileMode.write: str_mode = 'r+b' else: str_mode = str(mode) self.fp = path.open(str_mode) super().__init__(mode)
[docs] def read(self, size: int) -> bytes: return self.fp.read(size)
[docs] def write(self, data: bytes) -> None: self.fp.write(data)
[docs] def close(self): self.fp.close()
[docs]class LocalFSStorage(base.InternalLinksStorageBackend): disallowed_path_chars = frozenset(set('~?*' + string.whitespace) - {' '}) def __init__( self, root: Union[str, pathlib.Path], linked_endpoint: Optional[str] = None, secret_key: Optional[bytes] = None, name: str = None ): self.root = pathlib.Path(root).resolve() if not self.root.is_dir(): raise LocalFSError('Storage root does not exist or is not a directory') super().__init__( linked_endpoint=linked_endpoint, secret_key=secret_key, name=name if name is not None else f'fs-{root.name}' ) def _is_under_root(self, path: pathlib.Path) -> bool: try: path.resolve().relative_to(self.root) except ValueError: return False return True def _resolve_path(self, path: str) -> pathlib.Path: return self.root.joinpath(path).resolve() def _is_file(self, path: pathlib.Path) -> bool: return path.is_file() and not path.is_symlink() def _validate_path(self, path: str): if any(c in self.disallowed_path_chars for c in path): raise ValueError('Unsupported characters in path')
[docs] def list(self, path: str) -> List[base.ListEntry]: self._validate_path(path) resolved_path = self._resolve_path(path) if not self._is_under_root(resolved_path): # prevent escaping the root by including `..` or symlinks in `path` raise LocalFSError('Invalid path') if not resolved_path.is_dir(): raise LocalFSError(f'{path} does not exist or is not a directory') lst = [] for dir, _, files in os.walk(resolved_path, followlinks=False): for f in files: full_path = pathlib.Path(dir).joinpath(f) if not self._is_file(full_path): # Skip symlinks and special files like sockets continue full_path = full_path.resolve() if not self._is_under_root(full_path): continue st = full_path.stat() rel_path = full_path.relative_to(self.root) lst.append(base.ListEntry( name=str(rel_path), size=st.st_size, last_modified=arrow.get(st.st_mtime) )) return sorted(lst, key=attrgetter('name'))
[docs] def open(self, path: str, mode: Union[base.FileMode, str]): self._validate_path(path) path = self._resolve_path(path) if not self._is_under_root(path): raise LocalFSError('Invalid path') if path.exists() and not self._is_file(path): raise LocalFSError('Invalid path') mode = base.FileMode.as_mode(mode) if mode & base.FileMode.write: path.parent.mkdir(parents=True, exist_ok=True) return LocalFSFile(path, mode)
[docs] def copy(self, path: str, new_path: str): self._validate_path(path) self._validate_path(new_path) path = str(self._resolve_path(path)) new_path = str(self._resolve_path(new_path)) return self.put(path, new_path)
[docs] def delete(self, path: str): self._validate_path(path) path = self._resolve_path(path) if not self._is_under_root(path): raise LocalFSError('Invalid path') if not path.exists(): return if not self._is_file(path): raise LocalFSError('Invalid path') path.unlink()