# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import hashlib
import os.path
from io import FileIO as file
from libcloud.utils.py3 import b
from libcloud.common.types import LibcloudError
from libcloud.storage.base import Object, Container, StorageDriver
from libcloud.storage.types import (
ObjectDoesNotExistError,
ContainerIsNotEmptyError,
ContainerDoesNotExistError,
ContainerAlreadyExistsError,
)
[docs]class DummyFileObject(file):
def __init__(self, yield_count=5, chunk_len=10):
self._yield_count = yield_count
self._chunk_len = chunk_len
[docs] def read(self, size):
i = 0
while i < self._yield_count:
yield self._get_chunk(self._chunk_len)
i += 1
def _get_chunk(self, chunk_len):
chunk = [str(x) for x in random.randint(97, 120)]
return chunk
def __len__(self):
return self._yield_count * self._chunk_len
[docs]class DummyIterator:
def __init__(self, data=None):
self.hash = hashlib.md5()
self._data = data or []
self._current_item = 0
[docs] def get_md5_hash(self):
return self.hash.hexdigest()
[docs] def next(self):
if self._current_item == len(self._data):
raise StopIteration
value = self._data[self._current_item]
self.hash.update(b(value))
self._current_item += 1
return value
def __next__(self):
return self.next()
def __enter__(self):
pass
def __exit__(self, type, value, traceback):
pass
[docs]class DummyStorageDriver(StorageDriver):
"""
Dummy Storage driver.
>>> from libcloud.storage.drivers.dummy import DummyStorageDriver
>>> driver = DummyStorageDriver('key', 'secret')
>>> container = driver.create_container(container_name='test container')
>>> container
<Container: name=test container, provider=Dummy Storage Provider>
>>> container.name
'test container'
>>> container.extra['object_count']
0
"""
name = "Dummy Storage Provider"
website = "http://example.com"
def __init__(self, api_key, api_secret):
"""
:param api_key: API key or username to used (required)
:type api_key: ``str``
:param api_secret: Secret password to be used (required)
:type api_secret: ``str``
:rtype: ``None``
"""
self._containers = {}
[docs] def iterate_containers(self):
"""
>>> driver = DummyStorageDriver('key', 'secret')
>>> list(driver.iterate_containers())
[]
>>> container_name = 'test container 1'
>>> container = driver.create_container(container_name=container_name)
>>> container
<Container: name=test container 1, provider=Dummy Storage Provider>
>>> container.name
'test container 1'
>>> container_name = 'test container 2'
>>> container = driver.create_container(container_name=container_name)
>>> container
<Container: name=test container 2, provider=Dummy Storage Provider>
>>> container = driver.create_container(
... container_name='test container 2')
... #doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ContainerAlreadyExistsError:
>>> container_list=list(driver.iterate_containers())
>>> sorted([c.name for c in container_list])
['test container 1', 'test container 2']
@inherits: :class:`StorageDriver.iterate_containers`
"""
for container in list(self._containers.values()):
yield container["container"]
[docs] def iterate_container_objects(self, container, prefix=None, ex_prefix=None):
prefix = self._normalize_prefix_argument(prefix, ex_prefix)
container = self.get_container(container.name)
objects = self._containers[container.name]["objects"].values()
return self._filter_listed_container_objects(objects, prefix)
[docs] def get_container(self, container_name):
"""
>>> driver = DummyStorageDriver('key', 'secret')
>>> driver.get_container('unknown') #doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ContainerDoesNotExistError:
>>> container_name = 'test container 1'
>>> container = driver.create_container(container_name=container_name)
>>> container
<Container: name=test container 1, provider=Dummy Storage Provider>
>>> container.name
'test container 1'
>>> driver.get_container('test container 1')
<Container: name=test container 1, provider=Dummy Storage Provider>
@inherits: :class:`StorageDriver.get_container`
"""
if container_name not in self._containers:
raise ContainerDoesNotExistError(driver=self, value=None, container_name=container_name)
return self._containers[container_name]["container"]
[docs] def get_container_cdn_url(self, container):
"""
>>> driver = DummyStorageDriver('key', 'secret')
>>> driver.get_container('unknown') #doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ContainerDoesNotExistError:
>>> container_name = 'test container 1'
>>> container = driver.create_container(container_name=container_name)
>>> container
<Container: name=test container 1, provider=Dummy Storage Provider>
>>> container.name
'test container 1'
>>> container.get_cdn_url()
'http://www.test.com/container/test_container_1'
@inherits: :class:`StorageDriver.get_container_cdn_url`
"""
if container.name not in self._containers:
raise ContainerDoesNotExistError(driver=self, value=None, container_name=container.name)
return self._containers[container.name]["cdn_url"]
[docs] def get_object(self, container_name, object_name):
"""
>>> driver = DummyStorageDriver('key', 'secret')
>>> driver.get_object('unknown', 'unknown')
... #doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ContainerDoesNotExistError:
>>> container_name = 'test container 1'
>>> container = driver.create_container(container_name=container_name)
>>> container
<Container: name=test container 1, provider=Dummy Storage Provider>
>>> driver.get_object(
... 'test container 1', 'unknown') #doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ObjectDoesNotExistError:
>>> obj = container.upload_object_via_stream(object_name='test object',
... iterator=DummyFileObject(5, 10), extra={})
>>> obj.name
'test object'
>>> obj.size
50
@inherits: :class:`StorageDriver.get_object`
"""
self.get_container(container_name)
container_objects = self._containers[container_name]["objects"]
if object_name not in container_objects:
raise ObjectDoesNotExistError(object_name=object_name, value=None, driver=self)
return container_objects[object_name]
[docs] def get_object_cdn_url(self, obj):
"""
>>> driver = DummyStorageDriver('key', 'secret')
>>> container_name = 'test container 1'
>>> container = driver.create_container(container_name=container_name)
>>> container
<Container: name=test container 1, provider=Dummy Storage Provider>
>>> obj = container.upload_object_via_stream(
... object_name='test object 5',
... iterator=DummyFileObject(5, 10), extra={})
>>> obj.name
'test object 5'
>>> obj.get_cdn_url()
'http://www.test.com/object/test_object_5'
@inherits: :class:`StorageDriver.get_object_cdn_url`
"""
container_name = obj.container.name
container_objects = self._containers[container_name]["objects"]
if obj.name not in container_objects:
raise ObjectDoesNotExistError(object_name=obj.name, value=None, driver=self)
return container_objects[obj.name].meta_data["cdn_url"]
[docs] def create_container(self, container_name):
"""
>>> driver = DummyStorageDriver('key', 'secret')
>>> container_name = 'test container 1'
>>> container = driver.create_container(container_name=container_name)
>>> container
<Container: name=test container 1, provider=Dummy Storage Provider>
>>> container = driver.create_container(
... container_name='test container 1')
... #doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ContainerAlreadyExistsError:
@inherits: :class:`StorageDriver.create_container`
"""
if container_name in self._containers:
raise ContainerAlreadyExistsError(
container_name=container_name, value=None, driver=self
)
extra = {"object_count": 0}
container = Container(name=container_name, extra=extra, driver=self)
self._containers[container_name] = {
"container": container,
"objects": {},
"cdn_url": "http://www.test.com/container/%s" % (container_name.replace(" ", "_")),
}
return container
[docs] def delete_container(self, container):
"""
>>> driver = DummyStorageDriver('key', 'secret')
>>> container = Container(name = 'test container',
... extra={'object_count': 0}, driver=driver)
>>> driver.delete_container(container=container)
... #doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ContainerDoesNotExistError:
>>> container = driver.create_container(
... container_name='test container 1')
... #doctest: +IGNORE_EXCEPTION_DETAIL
>>> len(driver._containers)
1
>>> driver.delete_container(container=container)
True
>>> len(driver._containers)
0
>>> container = driver.create_container(
... container_name='test container 1')
... #doctest: +IGNORE_EXCEPTION_DETAIL
>>> obj = container.upload_object_via_stream(
... object_name='test object', iterator=DummyFileObject(5, 10),
... extra={})
>>> driver.delete_container(container=container)
... #doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ContainerIsNotEmptyError:
@inherits: :class:`StorageDriver.delete_container`
"""
container_name = container.name
if container_name not in self._containers:
raise ContainerDoesNotExistError(container_name=container_name, value=None, driver=self)
container = self._containers[container_name]
if len(container["objects"]) > 0:
raise ContainerIsNotEmptyError(container_name=container_name, value=None, driver=self)
del self._containers[container_name]
return True
[docs] def download_object(
self, obj, destination_path, overwrite_existing=False, delete_on_failure=True
):
kwargs_dict = {
"obj": obj,
"response": DummyFileObject(),
"destination_path": destination_path,
"overwrite_existing": overwrite_existing,
"delete_on_failure": delete_on_failure,
}
return self._save_object(**kwargs_dict)
[docs] def download_object_as_stream(self, obj, chunk_size=None):
"""
>>> driver = DummyStorageDriver('key', 'secret')
>>> container = driver.create_container(
... container_name='test container 1')
... #doctest: +IGNORE_EXCEPTION_DETAIL
>>> obj = container.upload_object_via_stream(object_name='test object',
... iterator=DummyFileObject(5, 10), extra={})
>>> stream = container.download_object_as_stream(obj)
>>> stream #doctest: +ELLIPSIS
<...closed...>
@inherits: :class:`StorageDriver.download_object_as_stream`
"""
return DummyFileObject()
[docs] def upload_object(
self,
file_path,
container,
object_name,
extra=None,
verify_hash=True,
headers=None,
):
"""
>>> driver = DummyStorageDriver('key', 'secret')
>>> container_name = 'test container 1'
>>> container = driver.create_container(container_name=container_name)
>>> container.upload_object(file_path='/tmp/inexistent.file',
... object_name='test') #doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
LibcloudError:
>>> file_path = path = os.path.abspath(__file__)
>>> file_size = os.path.getsize(file_path)
>>> obj = container.upload_object(file_path=file_path,
... object_name='test')
>>> obj #doctest: +ELLIPSIS
<Object: name=test, size=...>
>>> obj.size == file_size
True
@inherits: :class:`StorageDriver.upload_object`
"""
if not os.path.exists(file_path):
raise LibcloudError(value="File %s does not exist" % (file_path), driver=self)
size = os.path.getsize(file_path)
return self._add_object(
container=container, object_name=object_name, size=size, extra=extra
)
[docs] def upload_object_via_stream(self, iterator, container, object_name, extra=None, headers=None):
"""
>>> driver = DummyStorageDriver('key', 'secret')
>>> container = driver.create_container(
... container_name='test container 1')
... #doctest: +IGNORE_EXCEPTION_DETAIL
>>> obj = container.upload_object_via_stream(
... object_name='test object', iterator=DummyFileObject(5, 10),
... extra={})
>>> obj #doctest: +ELLIPSIS
<Object: name=test object, size=50, ...>
@inherits: :class:`StorageDriver.upload_object_via_stream`
"""
size = len(iterator)
return self._add_object(
container=container, object_name=object_name, size=size, extra=extra
)
[docs] def delete_object(self, obj):
"""
>>> driver = DummyStorageDriver('key', 'secret')
>>> container = driver.create_container(
... container_name='test container 1')
... #doctest: +IGNORE_EXCEPTION_DETAIL
>>> obj = container.upload_object_via_stream(object_name='test object',
... iterator=DummyFileObject(5, 10), extra={})
>>> obj #doctest: +ELLIPSIS
<Object: name=test object, size=50, ...>
>>> container.delete_object(obj=obj)
True
>>> obj = Object(name='test object 2',
... size=1000, hash=None, extra=None,
... meta_data=None, container=container,driver=None)
>>> container.delete_object(obj=obj) #doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ObjectDoesNotExistError:
@inherits: :class:`StorageDriver.delete_object`
"""
container_name = obj.container.name
object_name = obj.name
obj = self.get_object(container_name=container_name, object_name=object_name)
del self._containers[container_name]["objects"][object_name]
return True
def _add_object(self, container, object_name, size, extra=None):
container = self.get_container(container.name)
extra = extra or {}
meta_data = extra.get("meta_data", {})
meta_data.update(
{"cdn_url": "http://www.test.com/object/%s" % (object_name.replace(" ", "_"))}
)
obj = Object(
name=object_name,
size=size,
extra=extra,
hash=None,
meta_data=meta_data,
container=container,
driver=self,
)
self._containers[container.name]["objects"][object_name] = obj
return obj
if __name__ == "__main__":
import doctest
doctest.testmod()