assert False
+def forward_lookup(fqdn, ip_family):
+ """Returns the ip address of the fqdn or None if none is found..
+
+ :param fqdn: Fully qualified domain name.
+ :param ip_family: 'A' or 'AAAA'"""
+ filename = '/var/cache/bind/dyn.colgarra.priv.at'
+ zonename = os.path.basename(filename)
+ zone = dns.zone.from_file(filename, zonename, relativize=False)
+ for name, ttl, rdata in zone.iterate_rdatas(ip_family):
+ if str(name)[:-1] == fqdn: # [:-1] removes trailing dot
+ return ipaddr.IPAddress(rdata.address)
+
+
def reverse_lookup(ip):
"""Returns an iterator of fqdns for the given IP address.
"""
:param ip: ipv4 address
:raises a BlockipError in case of errors."""
- command = ['iptables', '-I', 'blockip', '-s', str(ip), '-j', 'ACCEPT']
- check_call(command)
+ if ipfamily_by_ip(ip) == 'A':
+ command = ['iptables', '-I', 'blockip', '-s', str(ip), '-j', 'ACCEPT']
+ check_call(command)
def blockip_whitelist_delete(ip):
"""
:param ip: ipv4 address
:raises a BlockipError in case of errors."""
- command = ['iptables', '-D', 'blockip', '-s', str(ip), '-j', 'ACCEPT']
- check_call(command)
+ if ipfamily_by_ip(ip) == 'A':
+ command = ['iptables', '-D', 'blockip', '-s', str(ip), '-j', 'ACCEPT']
+ check_call(command)
def main(args):
nsupdate_delete(args.fqdn, 'A')
nsupdate_delete(args.fqdn, 'AAAA')
else:
+ ipfamily = ipfamily_by_ip(args.ip)
+ sync_dynamic_zones()
+ old_ip = forward_lookup(args.fqdn, ipfamily)
nsupdate_delete(args.fqdn, ipfamily_by_ip(args.ip))
- if ipfamily_by_ip(args.ip) == 'A':
- sync_dynamic_zones()
- if len(list(reverse_lookup(args.ip))) == 0:
- blockip_whitelist_delete(args.ip)
+ if old_ip is not None:
+ blockip_whitelist_delete(old_ip)
else:
- nsupdate_delete(args.fqdn, ipfamily_by_ip(args.ip))
+ ipfamily = ipfamily_by_ip(args.ip)
+ sync_dynamic_zones()
+ old_ip = forward_lookup(args.fqdn, ipfamily)
+ nsupdate_delete(args.fqdn, ipfamily)
nsupdate_add(args.fqdn, args.ttl, args.ip)
- if ipfamily_by_ip(args.ip) == 'A':
+ if old_ip != args.ip:
+ if old_ip is not None:
+ blockip_whitelist_delete(old_ip)
blockip_whitelist_add(args.ip)
except CalledProcessError as e:
sys.exit(e.returncode)