Attachment 'zone.py'
Download 1 #!/usr/bin/env python
2 from sets import Set
3
4 import dnslib
5 from dnslib import QTYPE, SOA, NS, A, AAAA, MX, CNAME
6 from dnslib import DNSRecord, DNSHeader, RR, RCODE
7
8 class Zone:
9 def __init__(self):
10 self.name = '' # zone name
11 self.data = {} # Dict; key is a domain name
12 #value: Dict for rtype # value: RRSet
13 self.cuts = {} # delegation point; dict key: name, val: NS Set
14 self.glue = {}
15 self.soa = Set([])
16 #self.ns = Set([])
17
18 def ReadRR(self,zfile):
19 rrlist = RR.fromZone(zfile)
20 for rr in rrlist:
21 name = str(rr.rname)
22 if not(name in self.data) : #new name
23 self.data[name] = {} # make empty node
24 upname = '.'.join(name.split('.')[1:])
25 while not (len(upname)>0 & (upname in self.data)):
26 self.data[upname] = {}
27 upname = '.'.join(upname.split('.')[1:])
28 elif rr.rtype == QTYPE.CNAME :
29 print 'CNAME error #1', self[name], rr
30
31 if QTYPE.CNAME in self.data[name]: # CNAME exists
32 print 'CNAME error #2', self.data[name], rr
33 if not(rr.rtype in self.data[name]): # new rtype -> rrset
34 self.data[name][rr.rtype] = Set([]) # empty query type
35 self.data[name][rr.rtype].add(rr)
36
37 if rr.rtype == QTYPE.SOA :
38 if self.name == '':
39 self.name = name
40 print 'SOA == ', rr
41 else :
42 print 'SOA duplicate ', self, name
43
44 def delegation(self, nsname):
45 self.cuts[nsname] = self.data[nsname]
46 print 'cut found: ', nsname
47 for rrtype in self.data[nsname]:
48 if rrtype == QTYPE.NS :
49 nsset = self.data[nsname][QTYPE.NS]
50 print 'Cut NS', nsset
51 for ns in nsset:
52 self.glue[ns.rdata]= {} # need glue
53 print 'need glue for', ns.rdata
54 else:
55 print 'delegation and record error', nsname
56
57 def Check_cuts(self):
58 for ns in self.cuts:
59 suffix = '.' + ns
60 for name in self.data.keys():
61 if name.endswith(suffix):
62 self.cuts[ns][name] = self.data[name]
63 print ns, self.data[ns]
64 print name, self.data[name] #glue ?
65 del self.data[name]
66
67 def Check_zone(self):
68 if len(self.name)==0 :
69 print 'No zone found'
70 return
71 zsuffix = '.'+ self.name
72 for name in self.data:
73 if name == self.name: #zone apex
74 rrs = self.data[self.name]
75 if QTYPE.SOA in rrs:
76 self.soa = rrs[QTYPE.SOA].copy().pop()
77 #self.ns = rrs[QTYPE.NS]
78 elif name.endswith(zsuffix) : # cuts found
79 if QTYPE.NS in self.data[name]:
80 self.delegation(name)
81 else: # Out of zone name
82 del self.data[name]
83 print 'Out of zone record',
84 print name, (self.data[name])
85
86 self.Check_cuts()
87
88 def print_apex(self):
89 print '---- zone apex record ----'
90 print 'zone name ', self.name
91 print self.data[self.name]
92 print '-----'
93 print 'SOA = ',self.soa
94 #print 'NS = ', self.ns
95 print 'cuts =', self.cuts.keys()
96 for nn in self.cuts.keys():
97 print '---', self.data[nn]
98
99 def dns_response(self, query): #query = DNSRecord.parse(packet)
100 reply = DNSRecord(DNSHeader(id=query.header.id, qr=1, aa=1), q=query.q)
101 qname = query.q.qname
102 qtype = query.q.qtype
103 qt = QTYPE[qtype]
104 qn = str(qname).lower()
105
106 if qn in self.data:
107 print self.data[qn]
108 if qtype in self.data[qn]:
109 for rr in self.data[qn][qtype]: #rrset
110 print 'found RRSet', rr
111 reply.add_answer(rr) # answer RRSet
112 elif (QTYPE.CNAME in self.data[qn]) :
113 print 'CNAME found. ---', self.data[qn][QTYPE.CNAME]
114 for rr in self.data[qn][QTYPE.CNAME]:
115 reply.add_answer(RR(qname, QTYPE.CNAME, ttl=TTL, rdata=rr.rdata))
116 # reply.add_answer(RR(qname, QTYPE.NS, ttl=TTL, rdata=rdata)
117 else: #NoData response here
118 reply.add_auth(self.soa)
119
120 else: # NXDomain
121 if not (qn.endswith('.' + self.name)):
122 print "Out of zone query"
123 return
124 # cuts test
125 for nn in self.cuts:
126 if qn.endswith('.' + nn):
127 print 'delegation found', nn, qn
128 print ' ', self.data[nn][QTYPE.NS]
129 for rr in self.data[nn][QTYPE.NS]:
130 reply.add_auth(rr)
131 # may need glue in additional section
132 return reply
133 # NXDOMAIN
134 reply.add_auth(self.soa)
135 reply.header.rcode = getattr(RCODE,'NXDOMAIN')
136
137 return reply
138
139 if __name__ == '__main__':
140 print '------ start ------'
141 zone = Zone()
142
143 zone.ReadRR(open('zone.txt').read())
144 zone.Check_zone()
145 zone.print_apex()
146 print '-------- Zone processed ---------'
147
148 print '-------- query test ---------'
149
150 d = DNSRecord.question("x.c.d.qMaiL.jp","A")
151 print d
152 print zone.dns_response(d)
153
154 d = DNSRecord.question("d.c.d.qmail.jp","A")
155 print d
156 print zone.dns_response(d)
157
158 print '---------------------------------'
159 d = DNSRecord.question("x.d.qmail.jp","A")
160 print d
161 print zone.dns_response(d)
Attached Files
To refer to attachments on a page, use attachment:filename, as shown below in the list of files. Do NOT use the URL of the [get] link, since this is subject to change and can break easily.You are not allowed to attach a file to this page.