Source code for keg_storage.backends.azure

import base64
import os
import typing

import arrow
from azure.storage.blob import (
    BlockBlobService,
    BlobPrefix,
    BlobBlock,
)

from keg_storage.backends import base


[docs]class AzureFile(base.RemoteFile): """ Base class for Azure file interface. Since read and write operations are very different and integrating the two would introduce a lot of complexity there are distinct subclasses for files opened for reading and writing. """ def __init__(self, container_name: str, path: str, mode: base.FileMode, service: BlockBlobService, chunk_size: int = 10 * 1024 * 1024): """ :param container_name: container / bucket name :param path: blob name :param mode: file mode :param service: blob service instance to use for API calls :param chunk_size: read/write buffer size """ super().__init__(mode) self.chunk_size = chunk_size self.container_name = container_name self.path = path self.service = service # Local buffer to reduce number of requests self.buffer = bytearray()
[docs]class AzureWriter(AzureFile): """ We are using Azure Block Blobs for all operations. The process for writing them is substantially similar to that of S3 with a couple of differences. 1. We generate the IDs for the blocks 2. There is no separate call to instantiate the upload. The first call to put_block will create the blob. """ def __init__(self, container_name: str, path: str, mode: base.FileMode, service: BlockBlobService, chunk_size: int = BlockBlobService.MAX_BLOCK_SIZE): super().__init__( container_name=container_name, path=path, mode=mode, service=service, # chunk_size cannot be larger than MAX_BLOCK_SIZE due to API restrictions chunk_size=min(chunk_size, BlockBlobService.MAX_BLOCK_SIZE), ) self.blocks = [] def _gen_block_id(self) -> str: """ Generate a unique ID for the block. This is meant to be opaque but it is generated from: 1. The index of the block as an 64 bit unsigned big endian integer 2. 40 bytes of random data The two parts are concatenated and base64 encoded giving us 64 bytes which is the maximum Azure allows. """ index_part = len(self.blocks).to_bytes(8, byteorder='big', signed=False) random_part = os.urandom(40) return base64.b64encode(index_part + random_part).decode() def _flush(self): if len(self.buffer) == 0: # If there is no buffered data, we don't need to do anything return # Upload at most chunk_size bytes to a new block block_id = self._gen_block_id() self.service.put_block( container_name=self.container_name, blob_name=self.path, block=bytes(self.buffer[:self.chunk_size]), block_id=block_id ) # Store the block_id to later concatenate when we close this file self.blocks.append(BlobBlock(id=block_id)) # Cycle the buffer self.buffer = self.buffer[self.chunk_size:] def _finalize(self): self.service.put_block_list( container_name=self.container_name, blob_name=self.path, block_list=self.blocks ) self.blocks = []
[docs] def write(self, data: bytes) -> None: self.buffer.extend(data) while len(self.buffer) >= self.chunk_size: # Write may be bigger than the chunk size and _flush() only uploads a single chunk so # repeated calls may be necessary self._flush()
[docs] def close(self): self._flush() if self.blocks: # If we haven't created any blocks, we don't need to finalize self._finalize()
[docs]class AzureReader(AzureFile): """ The Azure reader uses byte ranged API calls to fill a local buffer to avoid lots of API overhead for small read sizes. """ def __init__(self, container_name: str, path: str, mode: base.FileMode, service: BlockBlobService, chunk_size=10 * 1024 * 1024): super().__init__( container_name=container_name, path=path, mode=mode, service=service, chunk_size=chunk_size, ) # Get overall blob size to avoid out of bounds requests self.blob_size = self._get_blob_size() # Current offset for the next request to fill the local buffer self.current_offset = 0 # Current read position in the blob including reads filled from the local buffer self.read_position = 0 def _get_blob_size(self): """ Retrieve the overall size of the blob """ blob = self.service.get_blob_properties( container_name=self.container_name, blob_name=self.path ) return blob.properties.content_length def _refill_buffer(self): """ Retrieve the next chunk and add it to the local buffer. """ range_end = self.current_offset + self.chunk_size - 1 # -1 because range is inclusive blob = self.service.get_blob_to_bytes( container_name=self.container_name, blob_name=self.path, start_range=self.current_offset, end_range=range_end, ) self.current_offset += len(blob.content) self.buffer.extend(blob.content) def _read_from_buffer(self, max_size): """ Read up to max_size bytes from the local buffer. """ read_size = min(len(self.buffer), max_size) output = self.buffer[:read_size] self.buffer = self.buffer[read_size:] self.read_position += len(output) return output
[docs] def read(self, size: int) -> bytes: output_buf = bytes() # Read up to the end of the file or `size` bytes whichever is less read_size = min(self.blob_size - self.read_position, size) while len(output_buf) < read_size: if len(self.buffer) == 0: # Buffer is empty, refill it. self._refill_buffer() remaining = read_size - len(output_buf) read = self._read_from_buffer(remaining) output_buf += read return output_buf
[docs]class AzureStorage(base.StorageBackend): def __init__(self, account: str, key: str, bucket: str, name: str = 'azure'): super().__init__() self.name = name self.account = account self.key = key self.bucket = bucket def _create_service(self): return BlockBlobService( account_name=self.account, account_key=self.key, )
[docs] def list(self, path: str) -> typing.List[base.ListEntry]: service = self._create_service() if not path.endswith('/'): path = path + '/' list_iter = service.list_blobs(container_name=self.bucket, prefix=path, delimiter='/') def construct_entry(blob): if isinstance(blob, BlobPrefix): return base.ListEntry(name=blob.name, last_modified=None, size=0) return base.ListEntry( name=blob.name, last_modified=arrow.get(blob.properties.last_modified), size=blob.properties.content_length, ) return [construct_entry(blob) for blob in list_iter]
[docs] def open(self, path: str, mode: typing.Union[base.FileMode, str], buffer_size: int = 10 * 1024 * 1024) -> AzureFile: # noqa mode = base.FileMode.as_mode(mode) if (mode & base.FileMode.read) and (mode & base.FileMode.write): raise NotImplementedError('Read+write mode not supported by the Azure backend') elif mode & base.FileMode.write: return AzureWriter( container_name=self.bucket, path=path, mode=mode, service=self._create_service(), chunk_size=buffer_size ) elif mode & base.FileMode.read: return AzureReader( container_name=self.bucket, path=path, mode=mode, service=self._create_service(), chunk_size=buffer_size ) else: raise ValueError('Unsupported mode. Accepted modes are FileMode.read or FileMode.write')
[docs] def delete(self, path: str): service = self._create_service() service.delete_blob( container_name=self.bucket, blob_name=path )