# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Vultr DNS Driver
"""
import json
from typing import Any, Dict, List, Optional
from libcloud.dns.base import Zone, Record, DNSDriver
from libcloud.dns.types import (
Provider,
RecordType,
ZoneDoesNotExistError,
ZoneAlreadyExistsError,
RecordDoesNotExistError,
RecordAlreadyExistsError,
)
from libcloud.utils.py3 import urlencode
from libcloud.common.vultr import (
DEFAULT_API_VERSION,
VultrResponse,
VultrConnection,
VultrResponseV2,
VultrConnectionV2,
)
__all__ = [
"ZoneRequiredException",
"VultrDNSResponse",
"VultrDNSConnection",
"VultrDNSDriver",
]
[docs]class ZoneRequiredException(Exception):
pass
[docs]class VultrDNSResponse(VultrResponse):
pass
[docs]class VultrDNSConnection(VultrConnection):
responseCls = VultrDNSResponse
class VultrDNSResponseV2(VultrResponseV2):
pass
class VultrDNSConnectionV2(VultrConnectionV2):
responseCls = VultrDNSResponseV2
[docs]class VultrDNSDriver(DNSDriver):
type = Provider.VULTR
name = "Vultr DNS"
website = "https://www.vultr.com"
def __new__(
cls,
key,
secret=None,
secure=True,
host=None,
port=None,
api_version=DEFAULT_API_VERSION,
region=None,
**kwargs,
):
if cls is VultrDNSDriver:
if api_version == "1":
cls = VultrDNSDriverV1
elif api_version == "2":
cls = VultrDNSDriverV2
else:
raise NotImplementedError(
"No Vultr driver found for API version: %s" % (api_version)
)
return super().__new__(cls)
class VultrDNSDriverV1(VultrDNSDriver):
connectionCls = VultrDNSConnection
RECORD_TYPE_MAP = {
RecordType.A: "A",
RecordType.AAAA: "AAAA",
RecordType.TXT: "TXT",
RecordType.CNAME: "CNAME",
RecordType.MX: "MX",
RecordType.NS: "NS",
RecordType.SRV: "SRV",
}
def list_zones(self):
"""
Return a list of records for the provided zone.
:param zone: Zone to list records for.
:type zone: :class:`Zone`
:return: ``list`` of :class:`Record`
"""
action = "/v1/dns/list"
params = {"api_key": self.key}
response = self.connection.request(action=action, params=params)
zones = self._to_zones(response.objects[0])
return zones
def list_records(self, zone):
"""
Returns a list of records for the provided zone.
:param zone: zone to list records for
:type zone: `Zone`
:rtype: list of :class: `Record`
"""
if not isinstance(zone, Zone):
raise ZoneRequiredException("zone should be of type Zone")
zones = self.list_zones()
if not self.ex_zone_exists(zone.domain, zones):
raise ZoneDoesNotExistError(value="", driver=self, zone_id=zone.domain)
action = "/v1/dns/records"
params = {"domain": zone.domain}
response = self.connection.request(action=action, params=params)
records = self._to_records(response.objects[0], zone=zone)
return records
def get_zone(self, zone_id):
"""
Returns a `Zone` instance.
:param zone_id: name of the zone user wants to get.
:type zone_id: ``str``
:rtype: :class:`Zone`
"""
ret_zone = None
action = "/v1/dns/list"
params = {"api_key": self.key}
response = self.connection.request(action=action, params=params)
zones = self._to_zones(response.objects[0])
if not self.ex_zone_exists(zone_id, zones):
raise ZoneDoesNotExistError(value=None, zone_id=zone_id, driver=self)
for zone in zones:
if zone_id == zone.domain:
ret_zone = zone
return ret_zone
def get_record(self, zone_id, record_id):
"""
Returns a Record instance.
:param zone_id: name of the required zone
:type zone_id: ``str``
:param record_id: ID of the required record
:type record_id: ``str``
:rtype: :class: `Record`
"""
ret_record = None
zone = self.get_zone(zone_id=zone_id)
records = self.list_records(zone=zone)
if not self.ex_record_exists(record_id, records):
raise RecordDoesNotExistError(value="", driver=self, record_id=record_id)
for record in records:
if record_id == record.id:
ret_record = record
return ret_record
def create_zone(self, domain, type="master", ttl=None, extra=None):
"""
Returns a `Zone` object.
:param domain: Zone domain name, (e.g. example.com).
:type domain: ``str``
:param type: Zone type (master / slave).
:type type: ``str``
:param ttl: TTL for new records. (optional)
:type ttl: ``int``
:param extra: (optional) Extra attributes (driver specific).
(e.g. {'serverip':'127.0.0.1'})
"""
extra = extra or {}
if extra and extra.get("serverip"):
serverip = extra["serverip"]
params = {"api_key": self.key}
data = urlencode({"domain": domain, "serverip": serverip})
action = "/v1/dns/create_domain"
zones = self.list_zones()
if self.ex_zone_exists(domain, zones):
raise ZoneAlreadyExistsError(value="", driver=self, zone_id=domain)
self.connection.request(params=params, action=action, data=data, method="POST")
zone = Zone(id=domain, domain=domain, type=type, ttl=ttl, driver=self, extra=extra)
return zone
def create_record(self, name, zone, type, data, extra=None):
"""
Create a new record.
:param name: Record name without the domain name (e.g. www).
Note: If you want to create a record for a base domain
name, you should specify empty string ('') for this
argument.
:type name: ``str``
:param zone: Zone where the requested record is created.
:type zone: :class:`Zone`
:param type: DNS record type (A, AAAA, ...).
:type type: :class:`RecordType`
:param data: Data for the record (depends on the record type).
:type data: ``str``
:param extra: Extra attributes (driver specific). (optional)
:type extra: ``dict``
:rtype: :class:`Record`
"""
extra = extra or {}
ret_record = None
old_records_list = self.list_records(zone=zone)
# check if record already exists
# if exists raise RecordAlreadyExistsError
for record in old_records_list:
if record.name == name and record.data == data:
raise RecordAlreadyExistsError(value="", driver=self, record_id=record.id)
MX = self.RECORD_TYPE_MAP.get("MX")
SRV = self.RECORD_TYPE_MAP.get("SRV")
if extra and extra.get("priority"):
priority = int(extra["priority"])
post_data = {
"domain": zone.domain,
"name": name,
"type": self.RECORD_TYPE_MAP.get(type),
"data": data,
}
if type == MX or type == SRV:
post_data["priority"] = priority
encoded_data = urlencode(post_data)
params = {"api_key": self.key}
action = "/v1/dns/create_record"
self.connection.request(action=action, params=params, data=encoded_data, method="POST")
updated_zone_records = zone.list_records()
for record in updated_zone_records:
if record.name == name and record.data == data:
ret_record = record
return ret_record
def delete_zone(self, zone):
"""
Delete a zone.
Note: This will delete all the records belonging to this zone.
:param zone: Zone to delete.
:type zone: :class:`Zone`
:rtype: ``bool``
"""
action = "/v1/dns/delete_domain"
params = {"api_key": self.key}
data = urlencode({"domain": zone.domain})
zones = self.list_zones()
if not self.ex_zone_exists(zone.domain, zones):
raise ZoneDoesNotExistError(value="", driver=self, zone_id=zone.domain)
response = self.connection.request(params=params, action=action, data=data, method="POST")
return response.status == 200
def delete_record(self, record):
"""
Delete a record.
:param record: Record to delete.
:type record: :class:`Record`
:rtype: ``bool``
"""
action = "/v1/dns/delete_record"
params = {"api_key": self.key}
data = urlencode({"RECORDID": record.id, "domain": record.zone.domain})
zone_records = self.list_records(record.zone)
if not self.ex_record_exists(record.id, zone_records):
raise RecordDoesNotExistError(value="", driver=self, record_id=record.id)
response = self.connection.request(action=action, params=params, data=data, method="POST")
return response.status == 200
def ex_zone_exists(self, zone_id, zones_list):
"""
Function to check if a `Zone` object exists.
:param zone_id: Name of the `Zone` object.
:type zone_id: ``str``
:param zones_list: A list containing `Zone` objects
:type zones_list: ``list``
:rtype: Returns `True` or `False`
"""
zone_ids = []
for zone in zones_list:
zone_ids.append(zone.domain)
return zone_id in zone_ids
def ex_record_exists(self, record_id, records_list):
"""
:param record_id: Name of the `Record` object.
:type record_id: ``str``
:param records_list: A list containing `Record` objects
:type records_list: ``list``
:rtype: ``bool``
"""
record_ids = []
for record in records_list:
record_ids.append(record.id)
return record_id in record_ids
def _to_zone(self, item):
"""
Build an object `Zone` from the item dictionary
:param item: item to build the zone from
:type item: `dictionary`
:rtype: :instance: `Zone`
"""
type = "master"
extra = {"date_created": item["date_created"]}
zone = Zone(
id=item["domain"],
domain=item["domain"],
driver=self,
type=type,
ttl=None,
extra=extra,
)
return zone
def _to_zones(self, items):
"""
Returns a list of `Zone` objects.
:param: items: a list that contains dictionary objects to be passed
to the _to_zone function.
:type items: ``list``
"""
zones = []
for item in items:
zones.append(self._to_zone(item))
return zones
def _to_record(self, item, zone):
extra = {}
if item.get("priority"):
extra["priority"] = item["priority"]
type = self._string_to_record_type(item["type"])
record = Record(
id=item["RECORDID"],
name=item["name"],
type=type,
data=item["data"],
zone=zone,
driver=self,
extra=extra,
)
return record
def _to_records(self, items, zone):
records = []
for item in items:
records.append(self._to_record(item, zone=zone))
return records
class VultrDNSDriverV2(VultrDNSDriver):
connectionCls = VultrDNSConnectionV2
RECORD_TYPE_MAP = {
RecordType.A: "A",
RecordType.AAAA: "AAAA",
RecordType.CNAME: "CNAME",
RecordType.NS: "NS",
RecordType.MX: "MX",
RecordType.SRV: "SRV",
RecordType.TXT: "TXT",
RecordType.CAA: "CAA",
RecordType.SSHFP: "SSHFP",
}
def list_zones(self) -> List[Zone]:
"""Return a list of zones.
:return: ``list`` of :class:`Zone`
"""
data = self._paginated_request("/v2/domains", "domains")
return [self._to_zone(item) for item in data]
def get_zone(self, zone_id: str) -> Zone:
"""Return a Zone instance.
:param zone_id: ID of the required zone
:type zone_id: ``str``
:rtype: :class:`Zone`
"""
resp = self.connection.request("/v2/domains/%s" % zone_id)
return self._to_zone(resp.object["domain"])
def create_zone(
self,
domain: str,
type: str = "master",
ttl: Optional[int] = None,
extra: Optional[Dict[str, Any]] = None,
) -> Zone:
"""Create a new zone.
:param domain: Zone domain name (e.g. example.com)
:type domain: ``str``
:param type: Zone type. Only 'master' value is supported.
:type type: ``str``
:param ttl: TTL for new records. (unused)
:type ttl: ``int``
:param extra: Extra attributes 'ip': ``str`` IP for a default A record
'dns_sec': ``bool`` Enable DSNSEC.
:type extra: ``dict``
:rtype: :class:`Zone`
"""
data = {
"domain": domain,
}
extra = extra or {}
if "ip" in extra:
data["ip"] = extra["ip"]
if "dns_sec" in extra:
data["dns_sec"] = "enabled" if extra["dns_sec"] is True else "disabled"
resp = self.connection.request("/v2/domains", data=json.dumps(data), method="POST")
return self._to_zone(resp.object["domain"])
def delete_zone(self, zone: Zone) -> bool:
"""Delete a zone.
Note: This will delete all the records belonging to this zone.
:param zone: Zone to delete.
:type zone: :class:`Zone`
:rtype: ``bool``
"""
resp = self.connection.request("/v2/domains/%s" % zone.domain, method="DELETE")
return resp.success()
def list_records(self, zone: Zone) -> List[Record]:
"""Return a list of records for the provided zone.
:param zone: Zone to list records for.
:type zone: :class:`Zone`
:return: ``list`` of :class:`Record`
"""
data = self._paginated_request("/v2/domains/%s/records" % zone.domain, "records")
return [self._to_record(item, zone) for item in data]
def get_record(self, zone_id: str, record_id: str) -> Record:
"""Return a Record instance.
:param zone_id: ID of the required zone
:type zone_id: ``str``
:param record_id: ID of the required record
:type record_id: ``str``
:rtype: :class:`Record`
"""
resp = self.connection.request("/v2/domains/{}/records/{}".format(zone_id, record_id))
# Avoid making an extra API call, as zone_id is enough for
# standard fields
zone = Zone(id=zone_id, domain=zone_id, type="master", ttl=None, driver=self)
return self._to_record(resp.object["record"], zone)
def create_record(
self,
name: str,
zone: Zone,
type: RecordType,
data: str,
extra: Optional[Dict[str, Any]] = None,
) -> Record:
"""Create a new record.
:param name: Record name without the domain name (e.g. www).
Note: If you want to create a record for a base domain
name, you should specify empty string ('') for this
argument.
:type name: ``str``
:param zone: Zone where the requested record is created.
:type zone: :class:`Zone`
:param type: DNS record type (A, AAAA, ...).
:type type: :class:`RecordType`
:param data: Data for the record (depends on the record type).
:type data: ``str``
:keyword extra: Extra attributes 'ttl': Time to live in seconds
'priority': DNS priority. Only
required for MX and SRV
:type extra: ``dict``
:rtype: :class:`Record`
"""
data = {
"name": name,
"type": self.RECORD_TYPE_MAP[type],
"data": data,
}
extra = extra or {}
if "ttl" in extra:
data["ttl"] = int(extra["ttl"])
if "priority" in extra:
data["priority"] = int(extra["priority"])
resp = self.connection.request(
"/v2/domains/%s/records" % zone.domain, data=json.dumps(data), method="POST"
)
return self._to_record(resp.object["record"], zone)
def update_record(
self,
record: Record,
name: Optional[str] = None,
type: Optional[RecordType] = None,
data: Optional[str] = None,
extra: Optional[Dict[str, Any]] = None,
) -> bool:
"""Update an existing record.
:param record: Record to update.
:type record: :class:`Record`
:keyword name: Record name without the domain name (e.g. www).
Note: If you want to create a record for a base domain
name, you should specify empty string ('') for this
argument.
:type name: ``str``
:keyword type: DNS record type. (Unused)
:type type: :class:`RecordType`
:keyword data: Data for the record (depends on the record type).
:type data: ``str``
:keyword extra: Extra attributes 'ttl': Time to live in seconds
'priority': DNS priority. Only
required for MX and SRV
:type extra: ``dict``
:rtype: ``bool``
"""
body = {}
if name:
body["name"] = name
if data:
body["data"] = data
extra = extra or {}
if "ttl" in extra:
body["ttl"] = int(extra["ttl"])
if "priority" in extra:
body["priority"] = int(extra["priority"])
resp = self.connection.request(
"/v2/domains/{}/records/{}".format(record.zone.domain, record.id),
data=json.dumps(body),
method="PATCH",
)
return resp.success()
def delete_record(self, record: Record) -> bool:
"""Delete a record.
:param record: Record to delete.
:type record: :class:`Record`
:rtype: ``bool``
"""
resp = self.connection.request(
"/v2/domains/{}/records/{}".format(record.zone.domain, record.id),
method="DELETE",
)
return resp.success()
def _to_zone(self, data: Dict[str, Any]) -> Zone:
type_ = "master"
domain = data["domain"]
extra = {
"date_created": data["date_created"],
}
return Zone(id=domain, domain=domain, driver=self, type=type_, ttl=None, extra=extra)
def _to_record(self, data: Dict[str, Any], zone: Zone) -> Record:
id_ = data["id"]
name = data["name"]
type_ = self._string_to_record_type(data["type"])
data_ = data["data"]
ttl = data["ttl"]
extra = {
"priority": data["priority"],
}
return Record(
id=id_,
name=name,
type=type_,
data=data_,
ttl=ttl,
driver=self,
zone=zone,
extra=extra,
)
def _paginated_request(
self,
url: str,
key: str,
params: Optional[Dict[str, Any]] = None,
) -> List[Any]:
"""Perform multiple calls to get the full list of items when
the API responses are paginated.
:param url: API endpoint
:type url: ``str``
:param key: Result object key
:type key: ``str``
:param params: Request parameters
:type params: ``dict``
:return: ``list`` of API response objects
:rtype: ``list``
"""
params = params if params is not None else {}
resp = self.connection.request(url, params=params).object
data = list(resp.get(key, []))
objects = data
while True:
next_page = resp["meta"]["links"]["next"]
if next_page:
params["cursor"] = next_page
resp = self.connection.request(url, params=params).object
data = list(resp.get(key, []))
objects.extend(data)
else:
return objects