#!/usr/bin/python
+import os
import sys
import re
import argparse
-from subprocess import Popen, PIPE, call
+from subprocess import Popen, PIPE, check_call, CalledProcessError
import ipaddr
+import dns.zone # http://www.dnspython.org/
-class ExternalProgramError(RuntimeError):
- pass
-
-
-class NsupdateError(ExternalProgramError):
- def __init__(self, returncode):
- self.returncode = returncode
-
-
-class BlockipError(ExternalProgramError):
- def __init__(self, returncode):
- self.returncode = returncode
+def sync_dynamic_zones():
+ check_call(['rndc', 'sync'])
def ipfamily_by_ip(ip):
assert False
+def forward_lookup(fqdn, ip_family):
+ """Returns the ip address of the fqdn or None if none is found..
+
+ :param fqdn: Fully qualified domain name.
+ :param ip_family: 'A' or 'AAAA'"""
+ filename = '/var/cache/bind/dyn.colgarra.priv.at'
+ zonename = os.path.basename(filename)
+ zone = dns.zone.from_file(filename, zonename, relativize=False)
+ for name, ttl, rdata in zone.iterate_rdatas(ip_family):
+ if str(name)[:-1] == fqdn: # [:-1] removes trailing dot
+ return ipaddr.IPAddress(rdata.address)
+
+
+def reverse_lookup(ip):
+ """Returns an iterator of fqdns for the given IP address.
+
+ :param ip: Instance of ipaddr.IPv4Address or ipaddr.IPv6Address"""
+ filename = '/var/cache/bind/dyn.colgarra.priv.at'
+ zonename = os.path.basename(filename)
+ zone = dns.zone.from_file(filename, zonename, relativize=False)
+ for name, ttl, rdata in zone.iterate_rdatas(ipfamily_by_ip(ip)):
+ if ipaddr.IPAddress(rdata.address) == ip:
+ yield str(name)
+
+
def nsupdate_add(fqdn, ttl, ip):
"""
:param fqdn: Fully qualified domain name
p = Popen(['nsupdate', '-l'], stdin=PIPE)
p.communicate(command)
if p.returncode != 0:
- raise NsupdateError(p.returncode)
+ raise CalledProcessError(p.returncode, 'nsupdate -l')
def nsupdate_delete(fqdn, ip_family):
"""
p = Popen(['nsupdate', '-l'], stdin=PIPE)
p.communicate(command)
if p.returncode != 0:
- raise NsupdateError(p.returncode)
+ raise CalledProcessError(p.returncode, 'nsupdate -l')
def blockip_whitelist_add(ip):
"""
:param ip: ipv4 address
:raises a BlockipError in case of errors."""
- command = ['iptables', '-I', 'blockip', '-s', str(ip), '-j', 'ACCEPT']
- p = call(command)
- if p != 0:
- raise BlockipError(p)
+ if ipfamily_by_ip(ip) == 'A':
+ command = ['iptables', '-I', 'blockip', '-s', str(ip), '-j', 'ACCEPT']
+ check_call(command)
def blockip_whitelist_delete(ip):
"""
:param ip: ipv4 address
:raises a BlockipError in case of errors."""
- command = ['iptables', '-D', 'blockip', '-s', str(ip), '-j', 'ACCEPT']
- p = call(command)
- if p != 0:
- raise BlockipError(p)
+ if ipfamily_by_ip(ip) == 'A':
+ command = ['iptables', '-D', 'blockip', '-s', str(ip), '-j', 'ACCEPT']
+ check_call(command)
def main(args):
nsupdate_delete(args.fqdn, 'A')
nsupdate_delete(args.fqdn, 'AAAA')
else:
+ ipfamily = ipfamily_by_ip(args.ip)
+ sync_dynamic_zones()
+ old_ip = forward_lookup(args.fqdn, ipfamily)
nsupdate_delete(args.fqdn, ipfamily_by_ip(args.ip))
- if ipfamily_by_ip(args.ip) == 'A':
- blockip_whitelist_delete(args.ip)
+ if old_ip is not None:
+ blockip_whitelist_delete(old_ip)
else:
- nsupdate_delete(args.fqdn, ipfamily_by_ip(args.ip))
+ ipfamily = ipfamily_by_ip(args.ip)
+ sync_dynamic_zones()
+ old_ip = forward_lookup(args.fqdn, ipfamily)
+ nsupdate_delete(args.fqdn, ipfamily)
nsupdate_add(args.fqdn, args.ttl, args.ip)
- if ipfamily_by_ip(args.ip) == 'A':
+ if old_ip != args.ip:
+ if old_ip is not None:
+ blockip_whitelist_delete(old_ip)
blockip_whitelist_add(args.ip)
- except ExternalProgramError as e:
+ except CalledProcessError as e:
sys.exit(e.returncode)