assert False
+def forward_lookup(fqdn, ip_family):
+ """Returns the ip address of the fqdn or None if none is found..
+
+ :param fqdn: Fully qualified domain name.
+ :param ip_family: 'A' or 'AAAA'"""
+ filename = '/var/cache/bind/dyn.colgarra.priv.at'
+ zonename = os.path.basename(filename)
+ zone = dns.zone.from_file(filename, zonename, relativize=False)
+ for name, ttl, rdata in zone.iterate_rdatas(ip_family):
+ if str(name)[:-1] == fqdn: # [:-1] removes trailing dot
+ return ipaddr.IPAddress(rdata.address)
+
+
def reverse_lookup(ip):
"""Returns an iterator of fqdns for the given IP address.
"""
:param fqdn: Fully qualified domain name
:param ip_family: A or AAAA
- :raises an NsupdateError in case of errors."""
+ :raises an CalledProcessError in case of errors."""
command = "update add {fqdn} {ttl} IN {ip_family} {ip}\n\n".format(fqdn=fqdn, ttl=ttl, ip_family=ipfamily_by_ip(ip), ip=ip)
p = Popen(['nsupdate', '-l'], stdin=PIPE)
p.communicate(command)
"""
:param fqdn: Fully qualified domain name
:param ip_family: A or AAAA
- :raises an NsupdateError in case of errors."""
+ :raises an CalledProcessError in case of errors."""
command = "update delete {fqdn} {ip_family}\n\n".format(fqdn=fqdn, ip_family=ip_family)
p = Popen(['nsupdate', '-l'], stdin=PIPE)
p.communicate(command)
def blockip_whitelist_add(ip):
"""
:param ip: ipv4 address
- :raises a BlockipError in case of errors."""
- command = ['iptables', '-I', 'blockip', '-s', str(ip), '-j', 'ACCEPT']
- check_call(command)
+ """
+ if ipfamily_by_ip(ip) == 'A':
+ command = ['iptables', '-I', 'blockip', '-s', str(ip), '-j', 'ACCEPT']
+ p = Popen(command, stderr=PIPE)
+ stdout, stderr = p.communicate()
def blockip_whitelist_delete(ip):
"""
:param ip: ipv4 address
- :raises a BlockipError in case of errors."""
- command = ['iptables', '-D', 'blockip', '-s', str(ip), '-j', 'ACCEPT']
- check_call(command)
+ """
+ if ipfamily_by_ip(ip) == 'A':
+ command = ['iptables', '-D', 'blockip', '-s', str(ip), '-j', 'ACCEPT']
+ p = Popen(command, stderr=PIPE)
+ stdout, stderr = p.communicate()
def main(args):
nsupdate_delete(args.fqdn, 'A')
nsupdate_delete(args.fqdn, 'AAAA')
else:
+ ipfamily = ipfamily_by_ip(args.ip)
+ sync_dynamic_zones()
+ old_ip = forward_lookup(args.fqdn, ipfamily)
nsupdate_delete(args.fqdn, ipfamily_by_ip(args.ip))
- if ipfamily_by_ip(args.ip) == 'A':
- sync_dynamic_zones()
- if len(list(reverse_lookup(args.ip))) == 0:
- blockip_whitelist_delete(args.ip)
+ if old_ip is not None:
+ blockip_whitelist_delete(old_ip)
else:
- nsupdate_delete(args.fqdn, ipfamily_by_ip(args.ip))
+ ipfamily = ipfamily_by_ip(args.ip)
+ sync_dynamic_zones()
+ old_ip = forward_lookup(args.fqdn, ipfamily)
+ nsupdate_delete(args.fqdn, ipfamily)
nsupdate_add(args.fqdn, args.ttl, args.ip)
- if ipfamily_by_ip(args.ip) == 'A':
+ if old_ip != args.ip:
+ if old_ip is not None:
+ blockip_whitelist_delete(old_ip)
blockip_whitelist_add(args.ip)
except CalledProcessError as e:
sys.exit(e.returncode)