Package ldaptor :: Package protocols :: Module pureber
[hide private]
[frames] | no frames]

Source Code for Module ldaptor.protocols.pureber

  1  # Copyright (C) 2001 Tommi Virtanen 
  2  # 
  3  # This library is free software; you can redistribute it and/or 
  4  # modify it under the terms of version 2.1 of the GNU Lesser General Public 
  5  # License as published by the Free Software Foundation. 
  6  # 
  7  # This library is distributed in the hope that it will be useful, 
  8  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
  9  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 10  # Lesser General Public License for more details. 
 11  # 
 12  # You should have received a copy of the GNU Lesser General Public 
 13  # License along with this library; if not, write to the Free Software 
 14  # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
 15   
 16  """Pure, simple, BER encoding and decoding""" 
 17   
 18  # This BER library is currently aimed at supporting LDAP, thus 
 19  # the following restrictions from RFC2251 apply: 
 20  # 
 21  # (1) Only the definite form of length encoding will be used. 
 22  # 
 23  # (2) OCTET STRING values will be encoded in the primitive form 
 24  #     only. 
 25  # 
 26  # (3) If the value of a BOOLEAN type is true, the encoding MUST have 
 27  #     its contents octets set to hex "FF". 
 28  # 
 29  # (4) If a value of a type is its default value, it MUST be absent. 
 30  #     Only some BOOLEAN and INTEGER types have default values in 
 31  #     this protocol definition. 
 32   
 33   
 34  import string 
 35   
 36  # xxxxxxxx 
 37  # |/|\.../ 
 38  # | | | 
 39  # | | tag 
 40  # | | 
 41  # | primitive (0) or structured (1) 
 42  # | 
 43  # class 
 44   
 45  CLASS_MASK              = 0xc0 
 46  CLASS_UNIVERSAL = 0x00 
 47  CLASS_APPLICATION       = 0x40 
 48  CLASS_CONTEXT           = 0x80 
 49  CLASS_PRIVATE           = 0xc0 
 50   
 51  STRUCTURED_MASK         = 0x20 
 52  STRUCTURED              = 0x20 
 53  NOT_STRUCTURED          = 0x00 
 54   
 55  TAG_MASK                = 0x1f 
 56   
 57  # LENGTH 
 58  # 0xxxxxxx = 0..127 
 59  # 1xxxxxxx = len is stored in the next 0xxxxxxx octets 
 60  # indefinite form not supported 
 61   
62 -class UnknownBERTag(Exception):
63 - def __init__(self, tag, context):
64 Exception.__init__(self) 65 self.tag = tag 66 self.context = context
67
68 - def __str__(self):
69 return "BERDecoderContext has no tag 0x%02x: %s" \ 70 % (self.tag, self.context)
71 72 import UserList 73
74 -def berDecodeLength(m, offset=0):
75 """ 76 Return a tuple of (length, lengthLength). 77 m must be atleast one byte long. 78 """ 79 l=ber2int(m[offset+0]) 80 ll=1 81 if l&0x80: 82 ll=1+(l&0x7F) 83 need(m, offset+ll) 84 l=ber2int(m[offset+1:offset+ll], signed=0) 85 return (l, ll)
86
87 -def int2berlen(i):
88 assert i>=0 89 e=int2ber(i, signed=False) 90 if i <= 127: 91 return e 92 else: 93 l=len(e) 94 assert l>0 95 assert l<=127 96 return chr(0x80|l) + e
97
98 -def int2ber(i, signed=True):
99 encoded='' 100 while ((signed and (i>127 or i<-128)) 101 or (not signed and (i>255))): 102 encoded=chr(i%256)+encoded 103 i=i>>8 104 encoded=chr(i%256)+encoded 105 return encoded
106
107 -def ber2int(e, signed=True):
108 need(e, 1) 109 v=0L+ord(e[0]) 110 if v&0x80 and signed: 111 v=v-256 112 for i in range(1, len(e)): 113 v=(v<<8) | ord(e[i]) 114 return v
115
116 -class BERBase:
117 tag = None 118
119 - def identification(self):
120 return self.tag
121
122 - def __init__(self, tag=None):
123 if tag is not None: 124 self.tag=tag
125
126 - def __len__(self):
127 return len(str(self))
128
129 - def __cmp__(self, other):
130 if isinstance(other, BERBase): 131 return cmp(str(self), str(other)) 132 else: 133 return -1
134
135 - def __eq__(self, other):
136 if isinstance(other, BERBase): 137 return str(self) == str(other) 138 else: 139 return False
140
141 - def __ne__(self, other):
142 if isinstance(other, BERBase): 143 return str(self) != str(other) 144 else: 145 return False
146
147 -class BERStructured(BERBase):
148 - def identification(self):
149 return STRUCTURED|self.tag
150
151 -class BERException(Exception): pass
152
153 -class BERExceptionInsufficientData(Exception): pass
154
155 -def need(buf, n):
156 d=n-len(buf) 157 if d>0: 158 raise BERExceptionInsufficientData, d
159
160 -class BERInteger(BERBase):
161 tag = 0x02 162 value = None 163
164 - def fromBER(klass, tag, content, berdecoder=None):
165 assert len(content)>0 166 value=ber2int(content) 167 r = klass(value=value, tag=tag) 168 return r
169 fromBER = classmethod(fromBER) 170
171 - def __init__(self, value=None, tag=None):
172 """Create a new BERInteger object. 173 value is an integer. 174 """ 175 BERBase.__init__(self, tag) 176 assert value is not None 177 self.value=value
178
179 - def __str__(self):
180 encoded=int2ber(self.value) 181 return chr(self.identification()) \ 182 +int2berlen(len(encoded)) \ 183 +encoded
184
185 - def __repr__(self):
186 if self.tag==self.__class__.tag: 187 return self.__class__.__name__+"(value=%r)"%self.value 188 else: 189 return self.__class__.__name__+"(value=%r, tag=%d)" \ 190 %(self.value, self.tag)
191
192 -class BEROctetString(BERBase):
193 tag = 0x04 194 195 value = None 196
197 - def fromBER(klass, tag, content, berdecoder=None):
198 assert len(content)>=0 199 r = klass(value=content, tag=tag) 200 return r
201 fromBER = classmethod(fromBER) 202
203 - def __init__(self, value=None, tag=None):
204 BERBase.__init__(self, tag) 205 assert value is not None 206 self.value=value
207
208 - def __str__(self):
209 value = str(self.value) 210 return chr(self.identification()) \ 211 +int2berlen(len(value)) \ 212 +value
213
214 - def __repr__(self):
215 if self.tag==self.__class__.tag: 216 return self.__class__.__name__+"(value=%s)" \ 217 %repr(self.value) 218 else: 219 return self.__class__.__name__ \ 220 +"(value=%s, tag=%d)" \ 221 %(repr(self.value), self.tag)
222
223 -class BERNull(BERBase):
224 tag = 0x05 225
226 - def fromBER(klass, tag, content, berdecoder=None):
227 assert len(content) == 0 228 r = klass(tag=tag) 229 return r
230 fromBER = classmethod(fromBER) 231
232 - def __init__(self, tag=None):
233 BERBase.__init__(self, tag)
234
235 - def __str__(self):
236 return chr(self.identification())+chr(0)
237
238 - def __repr__(self):
239 if self.tag==self.__class__.tag: 240 return self.__class__.__name__+"()" 241 else: 242 return self.__class__.__name__+"(tag=%d)"%self.tag
243
244 -class BERBoolean(BERBase):
245 tag = 0x01 246
247 - def fromBER(klass, tag, content, berdecoder=None):
248 assert len(content) > 0 249 value = ber2int(content) 250 r = klass(value=value, tag=tag) 251 return r
252 fromBER = classmethod(fromBER) 253
254 - def __init__(self, value=None, tag=None):
255 """Create a new BERInteger object. 256 value is an integer. 257 """ 258 BERBase.__init__(self, tag) 259 assert value is not None 260 if value: 261 value=0xFF 262 self.value=value
263
264 - def __str__(self):
265 assert self.value==0 or self.value==0xFF 266 return chr(self.identification()) \ 267 +int2berlen(1) \ 268 +chr(self.value)
269
270 - def __repr__(self):
271 if self.tag==self.__class__.tag: 272 return self.__class__.__name__+"(value=%d)"%self.value 273 else: 274 return self.__class__.__name__+"(value=%d, tag=%d)" \ 275 %(self.value, self.tag)
276 277
278 -class BEREnumerated(BERInteger):
279 tag = 0x0a
280
281 -class BERSequence(BERStructured, UserList.UserList):
282 # TODO __getslice__ calls __init__ with no args. 283 tag = 0x10 284
285 - def fromBER(klass, tag, content, berdecoder=None):
286 l = berDecodeMultiple(content, berdecoder) 287 r = klass(l, tag=tag) 288 return r
289 fromBER = classmethod(fromBER) 290
291 - def __init__(self, value=None, tag=None):
292 BERStructured.__init__(self, tag) 293 UserList.UserList.__init__(self) 294 assert value is not None 295 self[:]=value
296
297 - def __str__(self):
298 r=string.join(map(str, self.data), '') 299 return chr(self.identification())+int2berlen(len(r))+r
300
301 - def __repr__(self):
302 if self.tag==self.__class__.tag: 303 return self.__class__.__name__+"(value=%s)"%repr(self.data) 304 else: 305 return self.__class__.__name__+"(value=%s, tag=%d)" \ 306 %(repr(self.data), self.tag)
307 308
309 -class BERSequenceOf(BERSequence):
310 pass
311
312 -class BERSet(BERSequence):
313 tag = 0x11 314 pass
315 316 317
318 -class BERDecoderContext:
319 Identities = { 320 BERInteger.tag: BERInteger, 321 BEROctetString.tag: BEROctetString, 322 BERNull.tag: BERNull, 323 BERBoolean.tag: BERBoolean, 324 BEREnumerated.tag: BEREnumerated, 325 BERSequence.tag: BERSequence, 326 BERSet.tag: BERSet, 327 } 328
329 - def __init__(self, fallback=None, inherit=None):
330 self.fallback=fallback 331 self.inherit_context=inherit
332
333 - def lookup_id(self, id):
334 try: 335 return self.Identities[id] 336 except KeyError: 337 if self.fallback: 338 return self.fallback.lookup_id(id) 339 else: 340 return None
341
342 - def inherit(self):
343 return self.inherit_context or self
344
345 - def __repr__(self):
346 identities = [] 347 for tag, class_ in self.Identities.items(): 348 identities.append('0x%02x: %s' % (tag, class_.__name__)) 349 return "<"+self.__class__.__name__ \ 350 +" identities={%s}" % ', '.join(identities) \ 351 +" fallback="+repr(self.fallback) \ 352 +" inherit="+repr(self.inherit_context) \ 353 +">"
354
355 -def berDecodeObject(context, m):
356 """berDecodeObject(context, string) -> (berobject, bytesUsed) 357 berobject may be None. 358 """ 359 while m: 360 need(m, 2) 361 i=ber2int(m[0], signed=0)&(CLASS_MASK|TAG_MASK) 362 363 length, lenlen = berDecodeLength(m, offset=1) 364 need(m, 1+lenlen+length) 365 m2 = m[1+lenlen:1+lenlen+length] 366 367 berclass=context.lookup_id(i) 368 if berclass: 369 inh=context.inherit() 370 assert inh 371 r = berclass.fromBER(tag=i, 372 content=m2, 373 berdecoder=inh) 374 return (r, 1+lenlen+length) 375 else: 376 #raise UnknownBERTag, (i, context) 377 print str(UnknownBERTag(i, context)) #TODO 378 return (None, 1+lenlen+length) 379 return (None, 0)
380
381 -def berDecodeMultiple(content, berdecoder):
382 """berDecodeMultiple(content, berdecoder) -> [objects] 383 384 Decodes everything in content and returns a list of decoded 385 objects. 386 387 All of content will be decoded, and content must contain complete 388 BER objects. 389 """ 390 l = [] 391 while content: 392 n, bytes = berDecodeObject(berdecoder, content) 393 if n is not None: 394 l.append(n) 395 assert bytes <= len(content) 396 content = content[bytes:] 397 return l
398 399 #TODO unimplemented classes are below: 400 401 #class BERObjectIdentifier(BERBase): 402 # tag = 0x06 403 # pass 404 405 #class BERIA5String(BERBase): 406 # tag = 0x16 407 # pass 408 409 #class BERPrintableString(BERBase): 410 # tag = 0x13 411 # pass 412 413 #class BERT61String(BERBase): 414 # tag = 0x14 415 # pass 416 417 #class BERUTCTime(BERBase): 418 # tag = 0x17 419 # pass 420 421 #class BERBitString(BERBase): 422 # tag = 0x03 423 # pass 424