Source code for keg_storage.backends.sftp
import contextlib
import logging
import typing
import arrow
from paramiko import SSHClient
from .base import (
StorageBackend,
ListEntry,
FileMode,
RemoteFile,
)
log = logging.getLogger(__name__)
[docs]class SFTPRemoteFile(RemoteFile):
def __init__(self, mode, path, client):
super().__init__(mode)
self.path = path
self.client = client
self.file = None
self.sftp = client.open_sftp()
self.file = self.sftp.open(path, str(mode))
[docs] def read(self, size: int):
if not (self.mode & FileMode.read):
raise IOError('File not opened for reading')
return self.file.read(size)
[docs] def close(self):
# File may actually be none since the open operation in the constructor may have failed
# before assigning the value
if self.file is not None:
self.file.close()
self.client.close()
[docs] def write(self, data: bytes):
if not (self.mode & FileMode.write):
raise IOError('File not opened for writing')
return self.file.write(data)
[docs]class SFTPStorage(StorageBackend):
def __init__(
self,
host,
username,
key_filename,
known_hosts_fpath,
allow_agent=False,
look_for_keys=False,
name='sftp'
):
super().__init__()
self.host = host
self.username = username
self.key_filename = key_filename
self.known_hosts_fpath = known_hosts_fpath
self.allow_agent = allow_agent
self.look_for_keys = look_for_keys
self.name = name
def create_client(self):
client = SSHClient()
client.load_system_host_keys(self.known_hosts_fpath)
client.connect(
self.host,
username=self.username,
key_filename=self.key_filename,
allow_agent=self.allow_agent,
look_for_keys=self.look_for_keys,
)
return client
@contextlib.contextmanager
def connection(self):
with self.create_client() as client:
sftp = client.open_sftp()
yield sftp
[docs] def list(self, path: str):
with self.connection() as conn:
return [
ListEntry(
name=x.filename,
last_modified=arrow.get(x.st_mtime),
size=x.st_size
)
for x in conn.listdir_attr(path)
]
[docs] def open(self, path: str, mode: typing.Union[FileMode, str]):
mode = FileMode.as_mode(mode)
# SFTPRemoteFile is responsible for closing the client connection
client = self.create_client()
return SFTPRemoteFile(mode, path, client)
[docs] def delete(self, path: str):
log.info("Deleting remote file '%s'", path)
with self.connection() as conn:
conn.remove(path)