Source code for libcloud.storage.drivers.dummy

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import random
import hashlib
import os.path
from io import FileIO as file

from libcloud.utils.py3 import b
from libcloud.common.types import LibcloudError
from libcloud.storage.base import Object, Container, StorageDriver
from libcloud.storage.types import (
    ObjectDoesNotExistError,
    ContainerIsNotEmptyError,
    ContainerDoesNotExistError,
    ContainerAlreadyExistsError,
)


[docs]class DummyFileObject(file): def __init__(self, yield_count=5, chunk_len=10): self._yield_count = yield_count self._chunk_len = chunk_len
[docs] def read(self, size): i = 0 while i < self._yield_count: yield self._get_chunk(self._chunk_len) i += 1
def _get_chunk(self, chunk_len): chunk = [str(x) for x in random.randint(97, 120)] return chunk def __len__(self): return self._yield_count * self._chunk_len
[docs]class DummyIterator: def __init__(self, data=None): self.hash = hashlib.md5() self._data = data or [] self._current_item = 0
[docs] def get_md5_hash(self): return self.hash.hexdigest()
[docs] def next(self): if self._current_item == len(self._data): raise StopIteration value = self._data[self._current_item] self.hash.update(b(value)) self._current_item += 1 return value
def __next__(self): return self.next() def __enter__(self): pass def __exit__(self, type, value, traceback): pass
[docs]class DummyStorageDriver(StorageDriver): """ Dummy Storage driver. >>> from libcloud.storage.drivers.dummy import DummyStorageDriver >>> driver = DummyStorageDriver('key', 'secret') >>> container = driver.create_container(container_name='test container') >>> container <Container: name=test container, provider=Dummy Storage Provider> >>> container.name 'test container' >>> container.extra['object_count'] 0 """ name = "Dummy Storage Provider" website = "http://example.com" def __init__(self, api_key, api_secret): """ :param api_key: API key or username to used (required) :type api_key: ``str`` :param api_secret: Secret password to be used (required) :type api_secret: ``str`` :rtype: ``None`` """ self._containers = {}
[docs] def get_meta_data(self): """ >>> driver = DummyStorageDriver('key', 'secret') >>> driver.get_meta_data()['object_count'] 0 >>> driver.get_meta_data()['container_count'] 0 >>> driver.get_meta_data()['bytes_used'] 0 >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container_name = 'test container 2' >>> container = driver.create_container(container_name=container_name) >>> obj = container.upload_object_via_stream( ... object_name='test object', iterator=DummyFileObject(5, 10), ... extra={}) >>> driver.get_meta_data()['object_count'] 1 >>> driver.get_meta_data()['container_count'] 2 >>> driver.get_meta_data()['bytes_used'] 50 :rtype: ``dict`` """ container_count = len(self._containers) object_count = sum( len(self._containers[container]["objects"]) for container in self._containers ) bytes_used = 0 for container in self._containers: objects = self._containers[container]["objects"] for _, obj in objects.items(): bytes_used += obj.size return { "container_count": int(container_count), "object_count": int(object_count), "bytes_used": int(bytes_used), }
[docs] def iterate_containers(self): """ >>> driver = DummyStorageDriver('key', 'secret') >>> list(driver.iterate_containers()) [] >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 1, provider=Dummy Storage Provider> >>> container.name 'test container 1' >>> container_name = 'test container 2' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 2, provider=Dummy Storage Provider> >>> container = driver.create_container( ... container_name='test container 2') ... #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ContainerAlreadyExistsError: >>> container_list=list(driver.iterate_containers()) >>> sorted([c.name for c in container_list]) ['test container 1', 'test container 2'] @inherits: :class:`StorageDriver.iterate_containers` """ for container in list(self._containers.values()): yield container["container"]
[docs] def iterate_container_objects(self, container, prefix=None, ex_prefix=None): prefix = self._normalize_prefix_argument(prefix, ex_prefix) container = self.get_container(container.name) objects = self._containers[container.name]["objects"].values() return self._filter_listed_container_objects(objects, prefix)
[docs] def get_container(self, container_name): """ >>> driver = DummyStorageDriver('key', 'secret') >>> driver.get_container('unknown') #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ContainerDoesNotExistError: >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 1, provider=Dummy Storage Provider> >>> container.name 'test container 1' >>> driver.get_container('test container 1') <Container: name=test container 1, provider=Dummy Storage Provider> @inherits: :class:`StorageDriver.get_container` """ if container_name not in self._containers: raise ContainerDoesNotExistError(driver=self, value=None, container_name=container_name) return self._containers[container_name]["container"]
[docs] def get_container_cdn_url(self, container): """ >>> driver = DummyStorageDriver('key', 'secret') >>> driver.get_container('unknown') #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ContainerDoesNotExistError: >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 1, provider=Dummy Storage Provider> >>> container.name 'test container 1' >>> container.get_cdn_url() 'http://www.test.com/container/test_container_1' @inherits: :class:`StorageDriver.get_container_cdn_url` """ if container.name not in self._containers: raise ContainerDoesNotExistError(driver=self, value=None, container_name=container.name) return self._containers[container.name]["cdn_url"]
[docs] def get_object(self, container_name, object_name): """ >>> driver = DummyStorageDriver('key', 'secret') >>> driver.get_object('unknown', 'unknown') ... #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ContainerDoesNotExistError: >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 1, provider=Dummy Storage Provider> >>> driver.get_object( ... 'test container 1', 'unknown') #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ObjectDoesNotExistError: >>> obj = container.upload_object_via_stream(object_name='test object', ... iterator=DummyFileObject(5, 10), extra={}) >>> obj.name 'test object' >>> obj.size 50 @inherits: :class:`StorageDriver.get_object` """ self.get_container(container_name) container_objects = self._containers[container_name]["objects"] if object_name not in container_objects: raise ObjectDoesNotExistError(object_name=object_name, value=None, driver=self) return container_objects[object_name]
[docs] def get_object_cdn_url(self, obj): """ >>> driver = DummyStorageDriver('key', 'secret') >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 1, provider=Dummy Storage Provider> >>> obj = container.upload_object_via_stream( ... object_name='test object 5', ... iterator=DummyFileObject(5, 10), extra={}) >>> obj.name 'test object 5' >>> obj.get_cdn_url() 'http://www.test.com/object/test_object_5' @inherits: :class:`StorageDriver.get_object_cdn_url` """ container_name = obj.container.name container_objects = self._containers[container_name]["objects"] if obj.name not in container_objects: raise ObjectDoesNotExistError(object_name=obj.name, value=None, driver=self) return container_objects[obj.name].meta_data["cdn_url"]
[docs] def create_container(self, container_name): """ >>> driver = DummyStorageDriver('key', 'secret') >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 1, provider=Dummy Storage Provider> >>> container = driver.create_container( ... container_name='test container 1') ... #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ContainerAlreadyExistsError: @inherits: :class:`StorageDriver.create_container` """ if container_name in self._containers: raise ContainerAlreadyExistsError( container_name=container_name, value=None, driver=self ) extra = {"object_count": 0} container = Container(name=container_name, extra=extra, driver=self) self._containers[container_name] = { "container": container, "objects": {}, "cdn_url": "http://www.test.com/container/%s" % (container_name.replace(" ", "_")), } return container
[docs] def delete_container(self, container): """ >>> driver = DummyStorageDriver('key', 'secret') >>> container = Container(name = 'test container', ... extra={'object_count': 0}, driver=driver) >>> driver.delete_container(container=container) ... #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ContainerDoesNotExistError: >>> container = driver.create_container( ... container_name='test container 1') ... #doctest: +IGNORE_EXCEPTION_DETAIL >>> len(driver._containers) 1 >>> driver.delete_container(container=container) True >>> len(driver._containers) 0 >>> container = driver.create_container( ... container_name='test container 1') ... #doctest: +IGNORE_EXCEPTION_DETAIL >>> obj = container.upload_object_via_stream( ... object_name='test object', iterator=DummyFileObject(5, 10), ... extra={}) >>> driver.delete_container(container=container) ... #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ContainerIsNotEmptyError: @inherits: :class:`StorageDriver.delete_container` """ container_name = container.name if container_name not in self._containers: raise ContainerDoesNotExistError(container_name=container_name, value=None, driver=self) container = self._containers[container_name] if len(container["objects"]) > 0: raise ContainerIsNotEmptyError(container_name=container_name, value=None, driver=self) del self._containers[container_name] return True
[docs] def download_object( self, obj, destination_path, overwrite_existing=False, delete_on_failure=True ): kwargs_dict = { "obj": obj, "response": DummyFileObject(), "destination_path": destination_path, "overwrite_existing": overwrite_existing, "delete_on_failure": delete_on_failure, } return self._save_object(**kwargs_dict)
[docs] def download_object_as_stream(self, obj, chunk_size=None): """ >>> driver = DummyStorageDriver('key', 'secret') >>> container = driver.create_container( ... container_name='test container 1') ... #doctest: +IGNORE_EXCEPTION_DETAIL >>> obj = container.upload_object_via_stream(object_name='test object', ... iterator=DummyFileObject(5, 10), extra={}) >>> stream = container.download_object_as_stream(obj) >>> stream #doctest: +ELLIPSIS <...closed...> @inherits: :class:`StorageDriver.download_object_as_stream` """ return DummyFileObject()
[docs] def upload_object( self, file_path, container, object_name, extra=None, verify_hash=True, headers=None, ): """ >>> driver = DummyStorageDriver('key', 'secret') >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container.upload_object(file_path='/tmp/inexistent.file', ... object_name='test') #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): LibcloudError: >>> file_path = path = os.path.abspath(__file__) >>> file_size = os.path.getsize(file_path) >>> obj = container.upload_object(file_path=file_path, ... object_name='test') >>> obj #doctest: +ELLIPSIS <Object: name=test, size=...> >>> obj.size == file_size True @inherits: :class:`StorageDriver.upload_object` """ if not os.path.exists(file_path): raise LibcloudError(value="File %s does not exist" % (file_path), driver=self) size = os.path.getsize(file_path) return self._add_object( container=container, object_name=object_name, size=size, extra=extra )
[docs] def upload_object_via_stream(self, iterator, container, object_name, extra=None, headers=None): """ >>> driver = DummyStorageDriver('key', 'secret') >>> container = driver.create_container( ... container_name='test container 1') ... #doctest: +IGNORE_EXCEPTION_DETAIL >>> obj = container.upload_object_via_stream( ... object_name='test object', iterator=DummyFileObject(5, 10), ... extra={}) >>> obj #doctest: +ELLIPSIS <Object: name=test object, size=50, ...> @inherits: :class:`StorageDriver.upload_object_via_stream` """ size = len(iterator) return self._add_object( container=container, object_name=object_name, size=size, extra=extra )
[docs] def delete_object(self, obj): """ >>> driver = DummyStorageDriver('key', 'secret') >>> container = driver.create_container( ... container_name='test container 1') ... #doctest: +IGNORE_EXCEPTION_DETAIL >>> obj = container.upload_object_via_stream(object_name='test object', ... iterator=DummyFileObject(5, 10), extra={}) >>> obj #doctest: +ELLIPSIS <Object: name=test object, size=50, ...> >>> container.delete_object(obj=obj) True >>> obj = Object(name='test object 2', ... size=1000, hash=None, extra=None, ... meta_data=None, container=container,driver=None) >>> container.delete_object(obj=obj) #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ObjectDoesNotExistError: @inherits: :class:`StorageDriver.delete_object` """ container_name = obj.container.name object_name = obj.name obj = self.get_object(container_name=container_name, object_name=object_name) del self._containers[container_name]["objects"][object_name] return True
def _add_object(self, container, object_name, size, extra=None): container = self.get_container(container.name) extra = extra or {} meta_data = extra.get("meta_data", {}) meta_data.update( {"cdn_url": "http://www.test.com/object/%s" % (object_name.replace(" ", "_"))} ) obj = Object( name=object_name, size=size, extra=extra, hash=None, meta_data=meta_data, container=container, driver=self, ) self._containers[container.name]["objects"][object_name] = obj return obj
if __name__ == "__main__": import doctest doctest.testmod()