import os
import ipaddress
import socket
+import getpass
+import time
# Third party modules
import requests
from .cfg_app import PpCfgAppError, PpConfigApplication
from .pdns_zone import PdnsApiZone
+from .pdns_record import PdnsApiRecord, PdnsSoaData, PdnsApiRrset
-__version__ = '0.5.1'
+__version__ = '0.6.1'
LOG = logging.getLogger(__name__)
_LIBRARY_NAME = "pp-pdns-api-client"
return url
# -------------------------------------------------------------------------
- def perform_request(self, path, method='GET', data=None, headers=None):
+ def perform_request(self, path, method='GET', data=None, headers=None, may_simulate=False):
"""Performing the underlying API request."""
if headers is None:
url = self._build_url(path)
if self.verbose > 1:
LOG.debug("Request method: {!r}".format(method))
- if data and self.verbose > 1:
+ if data and self.verbose > 2:
data_out = "{!r}".format(data)
try:
data_out = json.loads(data)
pass
else:
data_out = pp(data_out)
- LOG.debug("Data:\n%s", data_out)
+ LOG.debug("Data:\n{}".format(data_out))
+ LOG.debug("RAW data:\n{}".format(data))
headers.update({'User-Agent': self.user_agent})
headers.update({'Content-Type': 'application/json'})
if self.verbose > 1:
LOG.debug("Headers:\n%s", pp(headers))
+ if may_simulate and self.simulate:
+ LOG.debug("Simulation mode, Request will not be sent.")
+ return ''
+
session = requests.Session()
response = session.request(method, url, data=data, headers=headers, timeout=self.timeout)
except ValueError:
raise PpPDNSAppError('Failed to parse the response', response.text)
+ if self.verbose > 3:
+ LOG.debug("RAW response: {!r}.".format(response.text))
+ if not response.text:
+ return ''
+
json_response = response.json()
if 'location' in response.headers:
return zone
# -------------------------------------------------------------------------
- def is_local(self, domain):
+ def patch_zone(self, zone, payload):
if self.verbose > 1:
- LOG.debug("Checking, whether {!r} is a not public zone.".format(domain))
+ LOG.debug("Patching zone {!r} ...".format(zone.name))
- tld = domain.split('.')[-1]
- if tld in ('intern', 'internal', 'local', 'localdomain', 'lokal'):
- LOG.debug("Zone {!r} has a local TLD {!r}.".format(domain, tld))
- return True
+ path = "/servers/{}/zones/{}".format(self.api_servername, zone.name)
+ return self.perform_request(path, 'PATCH', json.dumps(payload), may_simulate=True)
- zone_base = domain.split('.')[0]
- if zone_base in ('intern', 'internal', 'local', 'localdomain', 'lokal'):
- LOG.debug("Zone {!r} has a local base {!r}.".format(domain, tld))
- return True
+ # -------------------------------------------------------------------------
+ def update_soa(self, zone, new_soa, comment=None, ttl=None):
- if tld != 'arpa':
- if self.verbose > 2:
- LOG.debug("Zone {!r} has a public TLD {!r}.".format(domain, tld))
- return False
+ if not isinstance(new_soa, PdnsSoaData):
+ msg = "New SOA must by of type PdnsSoaData, given {t}: {s!r}".format(
+ t=new_soa.__class__.__name__, s=new_soa)
+ raise TypeError(msg)
- if domain.endswith('.in-addr.arpa'):
- tupels = []
- for tupel in reversed(domain.replace('.in-addr.arpa', '').split('.')):
- tupels.append(tupel)
- if self.verbose > 2:
- LOG.debug("Got IPv4 tupels from zone {!r}: {}".format(domain, pp(tupels)))
- bitmask = None
- if len(tupels) == 1:
- bitmask = 8
- tupels.append('0')
- tupels.append('0')
- tupels.append('0')
- elif len(tupels) == 2:
- tupels.append('0')
- tupels.append('0')
- bitmask = 16
- elif len(tupels) == 3:
- bitmask = 24
- tupels.append('0')
- else:
- LOG.warn("Could not interprete reverse IPv4 zone {!r}.".format(domain))
- return False
- net_address = '.'.join(tupels) + '/{}'.format(bitmask)
- if self.verbose > 2:
- LOG.debug("Got IPv4 network address of zone {!r}: {!r}.".format(domain, net_address))
- network = ipaddress.ip_network(net_address)
- if network.is_global:
- if self.verbose > 1:
- LOG.debug("The network {!r} of zone {!r} is allocated for public networks.".format(
- net_address, domain))
- return False
- LOG.debug("The network {!r} of zone {!r} is allocated for local networks.".format(
- net_address, domain))
- return True
+ if ttl:
+ ttl = int(ttl)
+ else:
+ cur_soa_rrset = zone.get_soa_rrset()
+ ttl = cur_soa_rrset.ttl
+
+ if comment is not None:
+ comment = str(comment).strip()
+ if comment == '':
+ comment = None
+
+ rrset = {
+ 'name': zone.name,
+ 'type': 'SOA',
+ 'ttl': ttl,
+ 'changetype': 'REPLACE',
+ 'records': [],
+ }
+
+ if comment:
+ comment_rec = {
+ 'content': comment,
+ 'account': getpass.getuser(),
+ 'modified_at': time.time(),
+ }
+ rrset['comments'] = [comment_rec]
+
+ record = {
+ 'content': new_soa.data,
+ 'disabled': False,
+ 'name': zone.name,
+ 'set-ptr': False,
+ 'type': 'SOA',
+ }
+ rrset['records'].append(record)
+ payload = {"rrsets": [rrset]}
- if self.verbose > 2:
- LOG.debug("Zone {!r} seems to be a reverse zone for a public network.".format(domain))
- return False
+ if self.verbose > 1:
+ LOG.debug("Setting new SOA {s!r} for zone {z!r}, TTL {t} ...".format(
+ s=new_soa.data, z=zone.name, t=ttl))
+
+ self.patch_zone(zone, payload)
# =============================================================================