Commit 89264930 authored by Sam Rushing's avatar Sam Rushing

split _ldap.pyx into coro.asn1.ber and coro.ldap.query

parent 32522fb1
# -*- Mode: Python -*-
# even empty, this file is needed so cython will see the .pxd
# -*- Mode: Cython -*-
# flags for BER tags
cdef enum FLAGS:
FLAGS_UNIVERSAL = 0x00
FLAGS_STRUCTURED = 0x20
FLAGS_APPLICATION = 0x40
FLAGS_CONTEXT = 0x80
# NULL is a pyrex keyword
# universal BER tags
cdef enum TAGS:
TAGS_BOOLEAN = 0x01
TAGS_INTEGER = 0x02
TAGS_BITSTRING = 0x03
TAGS_OCTET_STRING = 0x04
TAGS_NULL = 0x05
TAGS_OBJID = 0x06
TAGS_OBJDESCRIPTOR = 0x07
TAGS_EXTERNAL = 0x08
TAGS_REAL = 0x09
TAGS_ENUMERATED = 0x0a
TAGS_EMBEDDED_PDV = 0x0b
TAGS_UTF8STRING = 0x0c
TAGS_SEQUENCE = 0x10 | 0x20 # Equivalent to FLAGS_STRUCTURED
TAGS_SET = 0x11 | 0x20 # Equivalent to FLAGS_STRUCTURED
cdef int length_of_length (int n)
cdef void encode_length (int l, int n, char * buffer)
cdef object _encode_integer (int n)
cdef object _encode_long_integer (n)
cdef object _TLV1 (int tag, bytes data)
cdef object _TLV (int tag, object data)
cdef object _CHOICE (int n, bint structured)
cdef object _APPLICATION (int n)
cdef object _ENUMERATED (int n)
cdef object _INTEGER (int n)
cdef object _BOOLEAN (int n)
cdef object _SEQUENCE (object elems)
cdef object _SET (object elems)
cdef object _OCTET_STRING (bytes s)
cdef object _OBJID (list l)
cdef object decode_string (unsigned char * s, int * pos, int length)
cdef object decode_raw (unsigned char * s, int * pos, int length)
cdef object decode_bitstring (unsigned char * s, int * pos, int length)
cdef object decode_integer (unsigned char * s, int * pos, int length)
cdef object decode_long_integer (unsigned char * s, int * pos, int length)
cdef object decode_structured (unsigned char * s, int * pos, int length)
cdef object decode_objid (unsigned char * s, int * pos, int length)
cdef object decode_boolean (unsigned char * s, int * pos, int length)
cdef int _decode_length (unsigned char * s, int * pos, int lol)
cdef object _decode (unsigned char * s, int * pos, int eos, bint just_tlv)
# -*- Mode: Cython -*-
# Copyright (c) 2002-2011 IronPort Systems and Cisco Systems
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# [this code originally from _ldap.pyx]
# XXX I'm not happy with 'just_tlv' and the code that uses it - [see
# x509.py:der_extract()]. I think a better solution would be to
# change the decoder to include 'location' information in its
# output. This would probably break existing users of that
# facility (ldap, anyone else?). The problem is that the decoders
# were originally written for LDAP, which has no need for access
# to raw encoded data.
#
# A really nice thing to do would be to make the whole thing act
# more like a *codec* - it'd be great if you could take the output
# of the decoder and feed it back to the encoder and get the same
# DER out of it. The current decoder is 'halfway' like this - it
# doesn't bother with tag info for SEQUENCE, SET, INTEGER,
# etc... If we had something like this we could describe ASN1 data
# structures with something close to the ASN1 syntax, and be able
# to automatically decode and encode those structures via nice
# class wrappers.
from cpython cimport PyBytes_FromStringAndSize, PyNumber_Long, PyLong_Check
from libc.string cimport memcpy
import sys
W = sys.stderr.write
# ================================================================================
# BER encoders
# ================================================================================
# based on the table in dumpasn1.c
TAG_TABLE = {
0x01 : 'BOOLEAN', # 1: Boolean
0x02 : 'INTEGER', # 2: Integer
0x03 : 'BITSTRING', # 2: Bit string
0x04 : 'OCTETSTRING', # 4: Byte string
0x05 : 'NULLTAG', # 5: NULL
0x06 : 'OID', # 6: Object Identifier
0x07 : 'OBJDESCRIPTOR', # 7: Object Descriptor
0x08 : 'EXTERNAL', # 8: External
0x09 : 'REAL', # 9: Real
0x0A : 'ENUMERATED', # 10: Enumerated
0x0B : 'EMBEDDED_PDV', # 11: Embedded Presentation Data Value
0x0C : 'UTF8STRING', # 12: UTF8 string
0x10 : 'SEQUENCE', # 16: Sequence/sequence of
0x11 : 'SET', # 17: Set/set of
0x12 : 'NUMERIC_STRING', # 18: Numeric string
0x13 : 'PRINTABLE_STRING', # 19: Printable string (ASCII subset)
0x14 : 'T61_STRING', # 20: T61/Teletex string
0x15 : 'VIDEOTEX_STRING', # 21: Videotex string
0x16 : 'IA5_STRING', # 22: IA5/ASCII string
0x17 : 'UTC_TIME', # 23: UTC time
0x18 : 'GENERALIZED_TIME', # 24: Generalized time
0x19 : 'GRAPHIC_STRING', # 25: Graphic string
0x1A : 'VISIBLE_STRING', # 26: Visible string (ASCII subset)
0x1B : 'GENERAL_STRING', # 27: General string
0x1C : 'UNIVERSAL_STRING', # 28: Universal string
0x1E : 'BMP_STRING', # 30: Basic Multilingual Plane/Unicode string
}
cdef int length_of_length (int n):
cdef int r
# how long will the BER-encoded length <n> be?
if n < 0x80:
return 1
else:
r = 1
while n:
n = n >> 8
r = r + 1
return r
cdef void encode_length (int l, int n, char * buffer):
# caller must ensure room. see length_of_length above.
cdef int i
if l < 0x80:
buffer[0] = <char> l
else:
buffer[0] = <char> (0x80 | ((n-1) & 0x7f))
for i from 1 <= i < n:
buffer[n-i] = <char> (l & 0xff)
l = l >> 8
# encode an integer, ASN1 style.
# two's complement with the minimum number of bytes.
cdef object _encode_integer (int n):
cdef int n0, byte, i
# 16 bytes is more than enough for int == int64_t
cdef char result[16]
i = 0
n0 = n
byte = 0x80 # for n==0
while 1:
n = n >> 8
if n0 == n:
if n == -1 and ((not byte & 0x80) or (i==0)):
# negative, but high bit clear
result[15-i] = <char> 0xff
i = i + 1
elif n == 0 and (byte & 0x80):
# positive, but high bit set
result[15-i] = <char> 0x00
i = i + 1
break
else:
byte = n0 & 0xff
result[15-i] = <char> byte
i = i + 1
n0 = n
return PyBytes_FromStringAndSize (&result[16-i], i)
# encode an integer, ASN1 style.
# two's complement with the minimum number of bytes.
cdef object _encode_long_integer (n):
cdef int byte, i, rlen
cdef char * rbuf
# 1) how many bytes?
n0 = n
n1 = n
rlen = 0
while 1:
n1 = n1 >> 8
if n1 == n0:
break
else:
rlen = rlen + 1
n0 = n1
if rlen == 0:
rlen = 1
# 2) create result string
result = PyBytes_FromStringAndSize (NULL, rlen)
rbuf = result
# 3) render result string
i = 0
n0 = n
byte = 0x80 # for n==0
while 1:
n = n >> 8
if n0 == n:
if n == -1 and ((not byte & 0x80) or (i==0)):
# negative, but high bit clear
rbuf[(rlen-1)-i] = <char> 0xff
i = i + 1
elif n == 0 and byte & 0x80:
# positive, but high bit set
rbuf[(rlen-1)-i] = <char> 0x00
i = i + 1
break
else:
byte = n0 & 0xff
rbuf[(rlen-1)-i] = <char> byte
i = i + 1
n0 = n
return result
def encode_long_integer (n):
return _encode_long_integer (n)
# this function is at the heart of all ASN output.
# it returns a <tag, length, value> string.
# _TLV1 (tag, data)
# <tag> is an ASN1 tag
# <data> is a single string
cdef object _TLV1 (int tag, bytes data):
# compute length of concatenated data
cdef int rlen, i, lol
cdef bytes s
rlen = len (data)
# compute length of length
lol = length_of_length (rlen)
# create result string
result = PyBytes_FromStringAndSize (NULL, 1 + lol + rlen)
cdef char * rbuf
rbuf = result
# render tag
rbuf[0] = <char> tag
rbuf = rbuf + 1
# render length
encode_length (rlen, lol, rbuf)
rbuf = rbuf + lol
# render data
memcpy (rbuf, <char *> data, rlen)
# return result
return result
# _TLV (tag, *data)
# <data> is a sequence of strings
# <tag> is an ASN1 tag
cdef object _TLV (int tag, object data):
# compute length of concatenated data
cdef int rlen, i, ilen, lol
cdef bytes s
rlen = 0
for s in data:
rlen += len(s)
# compute length of length
lol = length_of_length (rlen)
# create result string
result = PyBytes_FromStringAndSize (NULL, 1 + lol + rlen)
cdef char * rbuf
rbuf = result
# render tag
rbuf[0] = <char> tag
rbuf = rbuf + 1
# render length
encode_length (rlen, lol, rbuf)
rbuf = rbuf + lol
# render data
for s in data:
ilen = len(s)
memcpy (rbuf, <char *>s, ilen)
rbuf = rbuf + ilen
# return result
return result
cdef object _CHOICE (int n, bint structured):
if structured:
n = n | <int>FLAGS_STRUCTURED
n = n | <int>FLAGS_CONTEXT
return n
cdef object _APPLICATION (int n):
return n | <int>FLAGS_APPLICATION | <int>FLAGS_STRUCTURED
cdef object _ENUMERATED (int n):
return _TLV1 (TAGS_ENUMERATED, _encode_integer (n))
cdef object _INTEGER (int n):
return _TLV1 (TAGS_INTEGER, _encode_integer (n))
cdef object _BOOLEAN (int n):
if n:
n = 0xff
else:
n = 0x00
return _TLV1 (TAGS_BOOLEAN, _encode_integer (n))
cdef object _SEQUENCE (object elems):
return _TLV (TAGS_SEQUENCE, elems)
cdef object _SET (object elems):
return _TLV (TAGS_SET, elems)
cdef object _OCTET_STRING (bytes s):
return _TLV1 (TAGS_OCTET_STRING, s)
cdef object _OBJID (list l):
cdef unsigned int i, list_len, one_num, temp_buf_off, temp_buf_len, done
cdef unsigned int buf_len, first_two_as_int
cdef char temp_buf[5], buf[32]
if len(l) < 2:
raise ValueError, "OBJID arg too short"
if l[0] < 2:
if l[1] >= 40:
raise ValueError, "OBJID arg out of range"
elif l[0] == 2:
if l[1] > 175:
raise ValueError, "OBJID arg out of range"
else:
raise ValueError, "OBJID arg out of range"
first_two_as_int = (l[0] * 40) + l[1]
# buf grows forwards. temp_buf grows backwards and is periodically
# emptied (forwards) into buf.
buf[0] = first_two_as_int
buf_len = 1
list_len = len (l)
for i from 2 <= i < list_len:
one_num = l[i]
temp_buf_off = 5
temp_buf_len = 0
done = 0
while not done:
temp_buf_off = temp_buf_off - 1
temp_buf_len = temp_buf_len + 1
temp_buf[temp_buf_off] = (one_num & 0x7f) | 0x80
one_num = one_num >> 7
if one_num == 0:
done = 1
temp_buf[4] = temp_buf[4] & 0x7f
if (buf_len + temp_buf_len) > 32:
raise ValueError, "OBJID arg too long"
memcpy (&buf[buf_len], &temp_buf[temp_buf_off], temp_buf_len)
buf_len = buf_len + temp_buf_len
result = PyBytes_FromStringAndSize (buf, buf_len)
return _TLV1 (TAGS_OBJID, result)
# ================================================================================
# externally visible python interfaces
# ================================================================================
def TLV (int tag, *data):
return _TLV (tag, data)
def CHOICE (int n, bint structured):
return _CHOICE (n, structured)
def APPLICATION (int n):
return _APPLICATION (n)
def ENUMERATED (int n):
return _ENUMERATED (n)
def INTEGER (n):
if PyLong_Check (n):
return _TLV (TAGS_INTEGER, _encode_long_integer (n))
else:
return _INTEGER (n)
def BOOLEAN (int n):
return _BOOLEAN (n)
def SEQUENCE (*elems):
return _SEQUENCE (elems)
def SET (*elems):
return _SET (elems)
def OCTET_STRING (s):
return _OCTET_STRING (s)
def OBJID (l):
return _OBJID (l)
# ================================================================================
# BER decoders
# ================================================================================
class DecodeError (Exception):
"""An ASN.1 decoding error occurred"""
def __str__(self):
return 'ASN.1 decoding error'
class InsufficientData (DecodeError):
"""ASN.1 encoding specifies more data than is available"""
def __str__(self):
return 'unexpected end of data'
class LengthTooLong (DecodeError):
"""We do not support ASN.1 data length > 32 bits"""
def __str__(self):
return 'length too long'
# Note: this codec was originally written for LDAP, but is now used outside of
# that context. We should consider implementing indefinite lengths.
class IndefiniteLength (DecodeError):
"""Quoth RFC2251 5.1: 'only the definite form of length encoding will be used' """
def __str__(self):
return 'indefinite length'
class MultiByteTag (DecodeError):
"""multi-byte tags not supported"""
def __str__(self):
return 'multi-byte tags not supported'
kind_unknown = 'unknown'
kind_application = 'application'
kind_context = 'context'
kind_oid = 'oid'
kind_bitstring = 'bitstring'
# SAFETY NOTE: it's important for each decoder to correctly handle length == zero.
cdef object decode_string (unsigned char * s, int * pos, int length):
# caller guarantees sufficient data in <s>
result = PyBytes_FromStringAndSize (<char *> (s+(pos[0])), length)
pos[0] = pos[0] + length
return result
cdef object decode_raw (unsigned char * s, int * pos, int length):
# caller guarantees sufficient data in <s>
result = PyBytes_FromStringAndSize (<char *> (s+(pos[0])), length)
pos[0] = pos[0] + length
return result
cdef object decode_bitstring (unsigned char * s, int * pos, int length):
# caller guarantees sufficient data in <s>
unused = <int>s[pos[0]]
result = PyBytes_FromStringAndSize (<char *> (s+(pos[0]+1)), length-1)
pos[0] = pos[0] + length
return unused, result
cdef object decode_integer (unsigned char * s, int * pos, int length):
cdef int n
if length == 0:
return 0
else:
n = s[pos[0]]
if n & 0x80:
# negative
n = n - 0x100
length = length - 1
while length:
pos[0] = pos[0] + 1
n = (n << 8) | s[pos[0]]
length = length - 1
# advance past the last byte
pos[0] = pos[0] + 1
# this will do the typecast
# XXX ensure this handles the full 32-bit signed range
return n
# almost identical, but note the cast to long, this generates very different code
cdef object decode_long_integer (unsigned char * s, int * pos, int length):
if length == 0:
return 0
else:
n = s[pos[0]]
if n & 0x80:
# negative
n = n - 0x100
# cast to long
n = PyNumber_Long (n)
length = length - 1
while length:
pos[0] = pos[0] + 1
n = (n << 8) | s[pos[0]]
length = length - 1
# advance past the last byte
pos[0] = pos[0] + 1
return n
cdef object decode_structured (unsigned char * s, int * pos, int length):
cdef int start, end
cdef list result = []
start = pos[0]
end = start + length
if length:
while pos[0] < end:
#print 'structured: pos=%d end=%d remain=%d result=%r' % (pos[0], end, end - pos[0], result)
item = _decode (s, pos, end, 0)
result.append (item)
return result
cdef object decode_objid (unsigned char * s, int * pos, int length):
cdef int i, m, n, hi, lo
cdef list r
m = s[pos[0]]
# first * 40 + second
r = [m // 40, m % 40]
n = 0
pos[0] = pos[0] + 1
for i from 1 <= i < length:
m = s[pos[0]]
hi = m & 0x80
lo = m & 0x7f
n = (n << 7) | lo
if not hi:
r.append (n)
n = 0
pos[0] = pos[0] + 1
return r
cdef object decode_boolean (unsigned char * s, int * pos, int length):
pos[0] = pos[0] + 1
if s[pos[0]-1] == 0xff:
return True
else:
return False
cdef int _decode_length (unsigned char * s, int * pos, int lol):
# actually supports only up to 32-bit lengths
cdef unsigned int i, n
n = 0
for i from 0 <= i < lol:
n = (n << 8) | s[pos[0]]
pos[0] = pos[0] + 1
return n
cdef object _decode (unsigned char * s, int * pos, int eos, bint just_tlv):
cdef int tag, lol
cdef unsigned int length
# 1) get tag
tag = <int> s[pos[0]]
if tag & 0x1f == 0x1f:
raise MultiByteTag, pos[0]
else:
pos[0] = pos[0] + 1
# 2) get length
if (pos[0]) > eos:
# assure at least one byte [valid for length == 0]
raise InsufficientData, pos[0]
elif s[pos[0]] < 0x80:
# one-byte length
length = s[pos[0]]
pos[0] = pos[0] + 1
elif s[pos[0]] == 0x80:
raise IndefiniteLength, pos[0]
else:
# long definite length form, lower 7 bits
# give us the number of bytes of length
lol = s[pos[0]] & 0x7f
pos[0] = pos[0] + 1
if lol > 4:
# we don't support lengths > 32 bits
raise LengthTooLong, pos[0]
elif pos[0] + lol > eos:
raise InsufficientData, pos[0]
else:
length = _decode_length (s, pos, lol)
#print '_decode(), pos=%d length=%d eos=%d' % (pos[0], length, eos)
# 3) get value
# assure at least <length> bytes
if (<int> length) < 0:
# length > 2GB... hmmm... thuggery...
raise InsufficientData, pos[0]
elif (pos[0] + length) > eos:
raise InsufficientData, pos[0]
elif just_tlv:
return (tag & 0x1f, tag & 0xe0, length)
elif tag == TAGS_OCTET_STRING:
return decode_string (s, pos, length)
elif tag == TAGS_INTEGER:
if length > 4:
return decode_long_integer (s, pos, length)
else:
return decode_integer (s, pos, length)
elif tag == TAGS_BOOLEAN:
return decode_boolean (s, pos, length)
elif tag == TAGS_SEQUENCE:
return decode_structured (s, pos, length)
elif tag == TAGS_SET:
return decode_structured (s, pos, length)
elif tag == TAGS_ENUMERATED:
return decode_integer (s, pos, length)
elif tag == TAGS_OBJID:
return (kind_oid, decode_objid (s, pos, length))
elif tag == TAGS_BITSTRING:
return (kind_bitstring, decode_bitstring (s, pos, length))
elif tag == TAGS_NULL:
return None
else:
if tag & <int>FLAGS_CONTEXT:
kind = kind_context
elif tag & <int>FLAGS_APPLICATION:
kind = kind_application
elif TAG_TABLE.has_key (tag & 0x1f):
kind = TAG_TABLE[tag & 0x1f]
else:
kind = kind_unknown
if tag & <int>FLAGS_STRUCTURED:
return (kind, tag & 0x1f, decode_structured (s, pos, length))
else:
return (kind, tag & 0x1f, decode_raw (s, pos, length))
def decode (bytes s, int pos=0, just_tlv=0):
return _decode (
<unsigned char *> s,
&pos,
len (s),
just_tlv
), pos
# -*- Mode: Python -*-
from coro.asn1.ber import *
import unittest
# These are mostly positive test cases, need some negative ones as well.
# Though - this code *has* been through the protos c06-ldapv3-enc-r1 test suite,
# but it's a rather large suite (89MB). Consider automating a download of
# the suite here?
class ber_test_case (unittest.TestCase):
pass
class simple_test (ber_test_case):
def runTest (self):
x = SEQUENCE (
SET (INTEGER(34), INTEGER(19), OCTET_STRING('fishing line')),
OBJID ([2,3,4,5,6,88]),
OCTET_STRING ("spaghetti"),
)
self.assertEqual (x, '0(1\x14\x02\x01"\x02\x01\x13\x04\x0cfishing line\x06\x05S\x04\x05\x06X\x04\tspaghetti')
self.assertEqual (decode (x), ([[34, 19, 'fishing line'], ('oid', [2, 3, 4, 5, 6, 88]), 'spaghetti'], 42))
# www.google.com cert
google_cert = """-----BEGIN CERTIFICATE-----
MIIDITCCAoqgAwIBAgIQT52W2WawmStUwpV8tBV9TTANBgkqhkiG9w0BAQUFADBM
MQswCQYDVQQGEwJaQTElMCMGA1UEChMcVGhhd3RlIENvbnN1bHRpbmcgKFB0eSkg
THRkLjEWMBQGA1UEAxMNVGhhd3RlIFNHQyBDQTAeFw0xMTEwMjYwMDAwMDBaFw0x
MzA5MzAyMzU5NTlaMGgxCzAJBgNVBAYTAlVTMRMwEQYDVQQIEwpDYWxpZm9ybmlh
MRYwFAYDVQQHFA1Nb3VudGFpbiBWaWV3MRMwEQYDVQQKFApHb29nbGUgSW5jMRcw
FQYDVQQDFA53d3cuZ29vZ2xlLmNvbTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkC
gYEA3rcmQ6aZhc04pxUJuc8PycNVjIjujI0oJyRLKl6g2Bb6YRhLz21ggNM1QDJy
wI8S2OVOj7my9tkVXlqGMaO6hqpryNlxjMzNJxMenUJdOPanrO/6YvMYgdQkRn8B
d3zGKokUmbuYOR2oGfs5AER9G5RqeC1prcB6LPrQ2iASmNMCAwEAAaOB5zCB5DAM
BgNVHRMBAf8EAjAAMDYGA1UdHwQvMC0wK6ApoCeGJWh0dHA6Ly9jcmwudGhhd3Rl
LmNvbS9UaGF3dGVTR0NDQS5jcmwwKAYDVR0lBCEwHwYIKwYBBQUHAwEGCCsGAQUF
BwMCBglghkgBhvhCBAEwcgYIKwYBBQUHAQEEZjBkMCIGCCsGAQUFBzABhhZodHRw
Oi8vb2NzcC50aGF3dGUuY29tMD4GCCsGAQUFBzAChjJodHRwOi8vd3d3LnRoYXd0
ZS5jb20vcmVwb3NpdG9yeS9UaGF3dGVfU0dDX0NBLmNydDANBgkqhkiG9w0BAQUF
AAOBgQAhrNWuyjSJWsKrUtKyNGadeqvu5nzVfsJcKLt0AMkQH0IT/GmKHiSgAgDp
ulvKGQSy068Bsn5fFNum21K5mvMSf3yinDtvmX3qUA12IxL/92ZzKbeVCq3Yi7Le
IOkKcGQRCMha8X2e7GmlpdWC1ycenlbN0nbVeSv3JUMcafC4+Q==
-----END CERTIFICATE-----"""
class x509_test (ber_test_case):
def runTest (self):
import base64
lines = google_cert.split ('\n')
enc = base64.decodestring (''.join (lines[1:-1]))
self.assertEqual (
decode (enc),
([[('context', 0, [2]),
105827261859531100510423749949966875981L,
[('oid', [1, 2, 840, 113549, 1, 1, 5]), None],
[[[('oid', [2, 5, 4, 6]), ('PRINTABLE_STRING', 19, 'ZA')]],
[[('oid', [2, 5, 4, 10]),
('PRINTABLE_STRING', 19, 'Thawte Consulting (Pty) Ltd.')]],
[[('oid', [2, 5, 4, 3]), ('PRINTABLE_STRING', 19, 'Thawte SGC CA')]]],
[('UTC_TIME', 23, '111026000000Z'), ('UTC_TIME', 23, '130930235959Z')],
[[[('oid', [2, 5, 4, 6]), ('PRINTABLE_STRING', 19, 'US')]],
[[('oid', [2, 5, 4, 8]), ('PRINTABLE_STRING', 19, 'California')]],
[[('oid', [2, 5, 4, 7]), ('T61_STRING', 20, 'Mountain View')]],
[[('oid', [2, 5, 4, 10]), ('T61_STRING', 20, 'Google Inc')]],
[[('oid', [2, 5, 4, 3]), ('T61_STRING', 20, 'www.google.com')]]],
[[('oid', [1, 2, 840, 113549, 1, 1, 1]), None],
('bitstring',
(0,
"0\x81\x89\x02\x81\x81\x00\xde\xb7&C\xa6\x99\x85\xcd8\xa7\x15\t\xb9\xcf\x0f"
"\xc9\xc3U\x8c\x88\xee\x8c\x8d('$K*^\xa0\xd8\x16\xfaa\x18K\xcfm`\x80\xd35@2r"
"\xc0\x8f\x12\xd8\xe5N\x8f\xb9\xb2\xf6\xd9\x15^Z\x861\xa3\xba\x86\xaak\xc8\xd9"
"q\x8c\xcc\xcd'\x13\x1e\x9dB]8\xf6\xa7\xac\xef\xfab\xf3\x18\x81\xd4$F\x7f\x01w|"
"\xc6*\x89\x14\x99\xbb\x989\x1d\xa8\x19\xfb9\x00D}\x1b\x94jx-i\xad\xc0z,\xfa\xd0"
"\xda \x12\x98\xd3\x02\x03\x01\x00\x01"))],
('context',
3,
[[[('oid', [2, 5, 29, 19]), True, '0\x00'],
[('oid', [2, 5, 29, 31]),
"0-0+\xa0)\xa0'\x86%http://crl.thawte.com/ThawteSGCCA.crl"],
[('oid', [2, 5, 29, 37]),
'0\x1f\x06\x08+\x06\x01\x05\x05\x07\x03\x01\x06\x08+\x06\x01\x05\x05\x07\x03'
'\x02\x06\t`\x86H\x01\x86\xf8B\x04\x01'],
[('oid', [1, 3, 6, 1, 5, 5, 7, 1, 1]),
'0d0"\x06\x08+\x06\x01\x05\x05\x070\x01\x86\x16http://ocsp.thawte.com0>\x06'
'\x08+\x06\x01\x05\x05\x070\x02\x862http://www.thawte.com/repository/Thawte_SGC_CA.crt']]])],
[('oid', [1, 2, 840, 113549, 1, 1, 5]), None],
('bitstring',
(0,
"!\xac\xd5\xae\xca4\x89Z\xc2\xabR\xd2\xb24f\x9dz\xab\xee\xe6|\xd5~\xc2\\("
"\xbbt\x00\xc9\x10\x1fB\x13\xfci\x8a\x1e$\xa0\x02\x00\xe9\xba[\xca\x19\x04"
"\xb2\xd3\xaf\x01\xb2~_\x14\xdb\xa6\xdbR\xb9\x9a\xf3\x12\x7f|\xa2\x9c;o\x99"
"}\xeaP\rv#\x12\xff\xf7fs)\xb7\x95\n\xad\xd8\x8b\xb2\xde \xe9\npd\x11\x08"
"\xc8Z\xf1}\x9e\xeci\xa5\xa5\xd5\x82\xd7'\x1e\x9eV\xcd\xd2v\xd5y+\xf7%C\x1c"
"i\xf0\xb8\xf9"))],
805)
)
dec, length = decode (enc)
public_key = dec[0][6][1][1][1]
self.assertEqual (
decode (public_key),
([156396091895984667473837837332877995558144703880815901117439532534031286131520903863087599986938779606924811933611903716377206837300122262900786662124968110191717844999183338594373129421417536020806373385428322642107305024162536996222164292639147591878860587271770855626780464602884552232097424473091745159379L, 65537], 140)
)
class bignum_test (ber_test_case):
def runTest (self):
self.assertEquals (
decode ('\x02\x82\x04\xe3\x01' + '\x00' * 1250),
(1<<10000, 1255)
)
self.assertEquals (
INTEGER (1<<10000),
'\x02\x82\x04\xe3\x01' + '\x00' * 1250,
)
class bignum_test_2 (ber_test_case):
def runTest (self):
for i in range (5):
n = 1 << (10 ** i)
self.assertEquals (
decode (INTEGER (n))[0],
n
)
class bignum_test_3 (ber_test_case):
def runTest (self):
import random
n = 1
for x in range (10000):
n = n * 10 + random.randint (0, 10)
print n
self.assertEquals (decode (INTEGER (n))[0], n)
def suite():
suite = unittest.TestSuite()
suite.addTest (simple_test())
suite.addTest (x509_test())
suite.addTest (bignum_test())
suite.addTest (bignum_test_2())
suite.addTest (bignum_test_3())
return suite
if __name__ == '__main__':
unittest.main (defaultTest='suite')
# -*- Mode: Python -*-
# Copyright (c) 2002-2011 IronPort Systems and Cisco Systems
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# pull in visible bits of the low-level pyrex module
import coro
from coro.asn1.ber import *
from coro.ldap.query import *
import re
W = coro.write_stderr
re_dn = re.compile(r'\s*([,=])\s*')
re_dn_attr = re.compile(r'^([^,]+)(=[^,]+)(,.*)?$')
class ProtocolError (Exception):
"""An LDAP Protocol Error occurred"""
pass
class Exit_Recv_Thread (Exception):
"oob signal the ldap client recv thread to exit"
pass
class LDAP:
BindRequest = 0
BindResponse = 1
UnbindRequest = 2
SearchRequest = 3
SearchResultEntry = 4
SearchResultDone = 5
SearchResultReference = 19 # <--- NOT IN SEQUENCE
ModifyRequest = 6
ModifyResponse = 7
AddRequest = 8
AddResponse = 9
DelRequest = 10
DelResponse = 11
ModifyDNRequest = 12
ModifyDNResponse = 13
CompareRequest = 14
CompareResponse = 15
AbandonRequest = 16
ExtendedRequest = 23 # <--- NOT IN SEQUENCE
ExtendedResponse = 24
class SCOPE:
BASE = 0
ONELEVEL = 1
SUBTREE = 2
class DEREF:
NEVER = 0
SEARCHING = 1
FINDING = 2
ALWAYS = 3
def encode_search_request (
base_object,
scope,
deref_aliases,
size_limit,
time_limit,
types_only,
filter,
which_attrs=None,
compatibility={}
):
if scope is None:
scope = compatibility.get('scope', SCOPE.SUBTREE)
if which_attrs is None:
which_attrs = SEQUENCE()
elif len(which_attrs) == 0:
# Per section 4.5.1 of rfc 2251, if you really mean the empty
# list, you can't pass the empty list because the empty list means
# something else. You need to pass a list consisting of the OID 1.1,
# which really (see sections 4.1.2, 4.1.4, and 4.1.5) isn't an OID
# at all. Except some servers (Exchange 5.5) require something
# different here, hence the lookup in the compatibility dict.
which_attrs = SEQUENCE (
OCTET_STRING (compatibility.get ('no_attr_attr', '1.1'))
)
else:
which_attrs = SEQUENCE (*[OCTET_STRING (x) for x in which_attrs])
return TLV (
APPLICATION (LDAP.SearchRequest),
OCTET_STRING (base_object),
ENUMERATED (scope),
ENUMERATED (deref_aliases),
INTEGER (size_limit),
INTEGER (time_limit),
BOOLEAN (types_only),
parse_query (filter),
which_attrs,
)
class AUTH:
# 1 and 2 are reserved
simple = 0x00
sasl = 0x03
class RESULT:
success = 0
operationsError = 1
protocolError = 2
timeLimitExceeded = 3
sizeLimitExceeded = 4
compareFalse = 5
compareTrue = 6
authMethodNotSupported = 7
strongAuthRequired = 8
referral = 10
adminLimitExceeded = 11
unavailableCriticalExtension = 12
confidentialityRequired = 13
saslBindInProgress = 14
noSuchAttribute = 16
undefinedAttributeType = 17
inappropriateMatching = 18
constraintViolation = 19
attributeOrValueExists = 20
invalidAttributeSyntax = 21
noSuchObject = 32
aliasProblem = 33
invalidDNSyntax = 34
aliasDereferencingProblem = 36
inappropriateAuthentication = 48
invalidCredentials = 49
insufficientAccessRights = 50
busy = 51
unavailable = 52
unwillingToPerform = 53
loopDetect = 54
namingViolation = 64
objectClassViolation = 65
notAllowedOnNonLeaf = 66
notAllowedOnRDN = 67
entryAlreadyExists = 68
objectClassModsProhibited = 69
affectsMultipleDSAs = 71
other = 80
class Error (Exception):
def __init__ (self, answer):
Exception.__init__ (self)
self.code = answer[0]
self.answer = answer
self.error_string = result_string (answer[0])
def __str__ (self):
if len(self.answer) == 3:
# We know how to parse it if it's length 3. Second element is
# the "got DN", and third element is the error message. See
# section 4 of RFC 1777.
if self.answer[2]:
parenthesize_got_dn = 1
err_msg = " %r" % (self.answer[2],)
else:
parenthesize_got_dn = 0
err_msg = ""
if self.answer[1]:
err_msg += " "
if parenthesize_got_dn:
err_msg += "("
err_msg += "Failed after successfully matching partial DN: %r" \
% (self.answer[1],)
if parenthesize_got_dn:
err_msg += ")"
else:
err_msg = " %r" % (self.answer,)
return '<LDAP Error "%s" [0x%x]%s>' % (self.error_string, self.code,
err_msg)
__repr__ = __str__
RESULT._reverse_map = r = {}
for attr in dir(RESULT):
value = getattr (RESULT, attr)
if (type(value) == type(0)):
r[value] = attr
def result_string (result):
try:
return RESULT._reverse_map[result]
except KeyError:
return "unknown error %r" % (result,)
def encode_bind_request (version, name, auth_data):
assert (1 <= version <= 127)
return TLV (
APPLICATION (LDAP.BindRequest),
INTEGER (version),
OCTET_STRING (name),
auth_data
)
def encode_simple_bind (version, name, login):
return encode_bind_request (
version,
name,
TLV (
CHOICE (AUTH.simple, 0),
login
)
)
def encode_sasl_bind (version, name, mechanism, credentials=''):
if credentials:
cred = OCTET_STRING (credentials)
else:
cred = ''
return encode_bind_request (
version,
name,
TLV (
CHOICE (AUTH.sasl),
OCTET_STRING (mechanism),
cred
)
)
def encode_starttls ():
# encode STARTTLS request: RFC 2830, 2.1
return TLV (
APPLICATION (LDAP.ExtendedRequest),
TLV (CHOICE (0, 0), '1.3.6.1.4.1.1466.20037')
)
class client:
# Note: default port is 389
def __init__ (self, addr):
self.msgid = 1
self.addr = addr
if isinstance (addr, tuple):
self.sock = coro.tcp_sock()
else:
self.sock = coro.unix_sock()
self.sock.connect (addr)
self.pending = {}
self.recv_thread_ob = coro.spawn (self.recv_thread)
def recv_exact (self, size):
try:
return self.sock.recv_exact (size)
except AttributeError:
# tlslite has no recv_exact
left = size
r = []
while left:
block = self.sock.recv (left)
if not block:
break
else:
r.append (block)
left -= len (block)
return ''.join (r)
# XXX the ironport code had a simple buffering layer here, might want
# to reinstate that...
def _recv_packet (self):
# All received packets must be BER SEQUENCE. We can tell from
# the header how much data we need to complete the packet.
# ensure we have the sequence header - I'm inlining the (type,
# length) detection here to get good buffering behavior
tl = self.recv_exact (2)
if not tl:
return [None, None]
tag = tl[0]
if tag != '0': # SEQUENCE | STRUCTURED
raise ProtocolError ('bad tag byte: %r' % (tag,))
l = ord (tl[1])
p = [tl]
if l & 0x80:
# <l> tells us how many bytes of actual length
ll = l & 0x7f
len_bytes = self.recv_exact (ll)
p.append (len_bytes)
# fetch length
n = 0
for i in xrange (ll):
n = (n << 8) | ord(len_bytes[i])
if (n < 0) or (n > 1000000):
# let's be reasonable, folks
raise ProtocolError ('invalid packet length: %d' % (n,))
need = n
else:
# <l> is the length of the sequence
need = l
# fetch the rest of the packet...
p.append (self.recv_exact (need))
packet = ''.join (p)
reply, plen = decode (packet)
return reply
def recv_thread (self):
while not self.exit_recv_thread:
[msgid, reply] = self._recv_packet()
if msgid is None:
break
else:
probe = self.pending.get (msgid, None)
if probe is None:
raise ProtocolError ('unknown message id in reply: %d' % (msgid,))
else:
probe.schedule (reply)
default_timeout = 10
def send_message (self, msg):
msgid = self.msgid
self.msgid += 1
self.sock.send (SEQUENCE (INTEGER (msgid), msg))
try:
self.pending[msgid] = me = coro.current()
reply = coro.with_timeout (self.default_timeout, me._yield)
return reply
finally:
del self.pending[msgid]
# server replies NO:
#starttls decoded=[1, ('application', 24, [2, '', 'unsupported extended operation'])]
# server replies YES:
#starttls decoded=[1, ('application', 24, [0, '', ''])]
exit_recv_thread = False
def starttls (self, *future_cert_params):
import tlslite
self.exit_recv_thread = True
reply = self.send_message (encode_starttls())
if reply[2] == 0:
conn = tlslite.TLSConnection (self.sock)
# does ldap allow client-cert authentication?
conn.handshakeClientCert()
self.osock = self.sock
self.sock = conn
# restart recv thread (maybe) with TLS socket wrapper
self.exit_recv_thread = False
self.recv_thread_ob = coro.spawn (self.recv_thread)
return reply
ldap_protocol_version = 3
def simple_bind (self, name, login):
return self.send_message (encode_simple_bind (self.ldap_protocol_version, name, login))
def sasl_bind (self, name, mechanism, credentials):
return self.send_message (encode_sasl_bind (self.ldap_protocol_version, name, mechanism, credentials))
def t0():
sample = encode_message (
3141,
encode_search_request (
'dc=nightmare,dc=com',
SCOPE.SUBTREE,
DEREF.NEVER,
0,
0,
0,
'(&(objectclass=inetorgperson)(userid=srushing))',
#'(&(objectclass=inetorgperson)(userid=newton))',
# ask for these specific attributes only
['mailAlternateAddress', 'rfc822ForwardingMailbox']
)
)
import pprint
import socket
s = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
s.connect (('127.0.0.1', 389))
s.send (sample)
pprint.pprint (decode (s.recv (8192)))
def t1():
c = client (('127.0.0.1', 389))
c.bind_simple (3, 'cn=manager,dc=nightmare,dc=com', 'fnord')
return c
if __name__ == '__main__':
import coro.backdoor
coro.spawn (coro.backdoor.serve, unix_path='/tmp/ldap.bd')
coro.event_loop()
# -*- Mode: Cython -*-
from cpython cimport PyBytes_FromStringAndSize
from coro.asn1.ber cimport *
# ================================================================================
# ldap search filter language parser
# ================================================================================
# this is not yet complete. see rfc2254
class QuerySyntaxError (Exception):
"""Error parsing rfc2254 query filter"""
def __str__(self):
if (len(self.args) == 2) \
and isinstance(self.args[0], str) \
and isinstance(self.args[1], int) \
and (self.args[1] >= 0) \
and (self.args[1] < len(self.args[0])):
return 'LDAP Query Syntax Error: Invalid character \'%c\' at ' \
'position %d of query "%s"' \
% (self.args[0][self.args[1]], self.args[1], self.args[0])
else:
return 'LDAP Query Syntax Error: %s' % Exception.__str__(self)
cdef enum:
SCOPE_BASE = 0
SCOPE_ONELEVEL = 1
SCOPE_SUBTREE = 2
cdef enum:
DEREF_NEVER = 0
DEREF_SEARCHING = 1
DEREF_FINDING = 2
DEREF_ALWAYS = 3
cdef enum:
FILTER_AND = 0
FILTER_OR = 1
FILTER_NOT = 2
FILTER_EQUALITY_MATCH = 3
FILTER_SUBSTRINGS = 4
FILTER_GREATER_OR_EQUAL = 5
FILTER_LESS_OR_EQUAL = 6
FILTER_PRESENT = 7
FILTER_APPROX_MATCH = 8
FILTER_EXTENSIBLE_MATCH = 9
cdef enum:
SUBSTRING_INITIAL = 0
SUBSTRING_ANY = 1
SUBSTRING_FINAL = 2
def parse_query (s, pos=0):
expression, pos = parse_expression (s, pos, 0)
return expression
cdef parse_expression (bytes x, int pos, int depth):
cdef char * s = x
cdef char kind
cdef list expressions
cdef bytes value
cdef bint is_substring
if s[pos] != c'(':
raise QuerySyntaxError, (x, pos)
elif depth > 50:
raise QuerySyntaxError, "expression too complex"
else:
# skip the open-paren
pos = pos + 1
# is this a logical expression or a comparison?
if s[pos] == c'|' or s[pos] == c'&' or s[pos] == c'!':
# logical
kind = s[pos]
expressions = []
pos = pos + 1
while s[pos] != c')':
expression, pos = parse_expression (x, pos, depth+1)
expressions.append (expression)
if kind == c'|':
return _TLV (_CHOICE (FILTER_OR, 1), expressions), pos + 1
elif kind == c'&':
return _TLV (_CHOICE (FILTER_AND, 1), expressions), pos + 1
elif kind == c'!':
return _TLV (_CHOICE (FILTER_NOT, 1), expressions[:1]), pos + 1
else:
# comparison
attr, is_substring, pos = parse_name (x, pos)
operator, pos = parse_operator (x, pos)
value, is_substring, pos = parse_value (x, pos)
attr = unescape (attr)
# we don't unescape <value> yet, because we might need
# some escaped splat chars to make it through parse_substring()
# [where the pieces will be unescaped individually]
if is_substring:
if value == '*' and operator == FILTER_EQUALITY_MATCH:
# (tag=*)
return _TLV (
_CHOICE (FILTER_PRESENT, 0), # unstructured
(attr,) # tag implied by CHOICE
), pos + 1
elif operator == FILTER_EQUALITY_MATCH:
# (tag=sub*strin*g*)
return _TLV (
_CHOICE (FILTER_SUBSTRINGS, 1), (
_OCTET_STRING (attr),
_SEQUENCE (parse_substring (value, 0, len (value)))
)
), pos + 1
else:
raise QuerySyntaxError, "invalid wildcard syntax"
else:
return _TLV (
_CHOICE (operator, 1), (
_OCTET_STRING (attr),
_OCTET_STRING (unescape (value)),
)
), pos + 1
cdef parse_operator (bytes x, int pos):
cdef char * s = x
cdef int slen = len (x)
if (pos + 2) >= slen:
raise QuerySyntaxError, (s, pos)
elif s[pos] == c'=':
return FILTER_EQUALITY_MATCH, pos + 1
elif s[pos] == c'~' and s[pos+1] == c'=':
return FILTER_APPROX_MATCH, pos + 2
elif s[pos] == c'<' and s[pos+1] == c'=':
return FILTER_LESS_OR_EQUAL, pos + 2
elif s[pos] == c'>' and s[pos+1] == c'=':
return FILTER_GREATER_OR_EQUAL, pos + 2
else:
raise QuerySyntaxError, (x, pos)
# [initial]*any*any*any*[final]
cdef object parse_substring (char * s, int pos, int slen):
# assumes the presence of at least one splat
cdef int i, start
cdef list result = []
start = 0
i = 0
while 1:
if i == slen:
if start != i:
# final
result.append (
_TLV (_CHOICE (SUBSTRING_FINAL, 0), (unescape (s[start:]),))
)
return result
elif s[i] == c'*':
if start == 0:
if i > 0:
# initial
result.append (
_TLV (_CHOICE (SUBSTRING_INITIAL, 0), (unescape (s[0:i]),))
)
else:
# any
result.append (
_TLV (_CHOICE (SUBSTRING_ANY, 0), (unescape (s[start:i]),))
)
# next bit will start *after* the splat
start = i + 1
i = i + 1
else:
i = i + 1
def ue (s):
return unescape (s)
# # another possibility would be to access the 'characters'
# # array in stringobject.c directly. [it's static, though]
# cdef bytes char (int ch):
# if (ch < 0) or (ch >= 256):
# raise ValueError, "chr() arg not in range (256)"
# else:
# return <char>ch
cdef int name_punc_table[256]
cdef int i
for i from 0 <= i < 256:
if chr (i) in '()=<>~':
name_punc_table[i] = 1
else:
name_punc_table[i] = 0
cdef object parse_name (bytes x, int pos):
cdef int slen, is_substring, rpos, start
cdef unsigned char * s
s = <unsigned char *>x
slen = len (x)
rpos = 0
start = pos
if name_punc_table[s[pos]]:
raise QuerySyntaxError, (x, pos)
else:
is_substring = 0
# we expect names to be delimited by an operator or a close-paren
while pos < slen:
if not name_punc_table[s[pos]]:
if s[pos] == c'*':
is_substring = 1
rpos = rpos + 1
if rpos == 4096:
raise QuerySyntaxError, (x, pos)
pos = pos + 1
else:
return PyBytes_FromStringAndSize (<char *>(s + start), rpos), is_substring, pos
else:
raise QuerySyntaxError, (x, pos)
cdef object parse_value (bytes x, int pos):
cdef int slen, is_substring, rpos, start
cdef unsigned char * s
s = <unsigned char *>x
slen = len (x)
rpos = 0
start = pos
is_substring = 0
# we expect values to be delimited by a close-paren
while pos < slen:
if s[pos] != c')':
if s[pos] == c'*':
is_substring = 1
rpos = rpos + 1
if rpos == 4096:
raise QuerySyntaxError, (x, pos)
pos = pos + 1
else:
return PyBytes_FromStringAndSize (<char *>(s + start), rpos), is_substring, pos
else:
raise QuerySyntaxError, (x, pos)
cdef object unescape (bytes x):
cdef int rpos, flag, pos
cdef char * s = x
cdef int slen = len (x)
cdef char buffer[4096]
cdef char ch
pos = 0
rpos = 0
flag = 0
while pos < slen:
if s[pos] == c'\\':
flag = 1
pos = pos + 1
ch, pos = parse_hex_escape (s, pos, slen)
else:
ch = s[pos]
pos = pos + 1
buffer[rpos] = ch
rpos = rpos + 1
if rpos == 4096:
raise QuerySyntaxError, (x, pos)
if flag:
# return a new, unescaped string
return PyBytes_FromStringAndSize (buffer, rpos)
else:
# return the original string
return x
cdef int parse_hex_digit (int ch):
if (ch >= 48 and ch <= 57):
return (ch - 48)
elif (ch >= 97 and ch <= 102):
return (ch - 97) + 10
elif (ch >= 65 and ch <= 70):
return (ch - 65) + 10
else:
return -1
cdef object parse_hex_escape (char * s, int pos, int len):
cdef char ch, result
if pos + 2 > len:
raise QuerySyntaxError, (s, pos)
else:
ch = parse_hex_digit (s[pos])
if ch == -1:
raise QuerySyntaxError, (s, pos)
else:
result = ch << 4
pos = pos + 1
ch = parse_hex_digit (s[pos])
if ch == -1:
raise QuerySyntaxError, (s, pos)
else:
result = result | ch
pos = pos + 1
return result, pos
cdef int escape_table[256]
for i from 0 <= i < 256:
if chr (i) in '\\()=<>~*':
escape_table[i] = 1
else:
escape_table[i] = 0
cdef bytes hex_digits = b"0123456789abcdef"
# 525486/sec
def query_escape (bytes s):
cdef int slen, rlen, i, j
cdef unsigned char ch
cdef char * sbuf, * rbuf
sbuf = s
slen = len (s)
rlen = slen
# compute length of result
for i from 0 <= i < slen:
if escape_table[<unsigned char>sbuf[i]]:
rlen = rlen + 2
# create result string
r = PyBytes_FromStringAndSize (NULL, rlen)
rbuf = r
# fill result string
j = 0
for i from 0 <= i < slen:
ch = sbuf[i]
if escape_table[ch]:
rbuf[j+0] = <char> 92
rbuf[j+1] = <char> hex_digits[ch >> 4]
rbuf[j+2] = <char> hex_digits[ch & 0xf]
j = j + 3
else:
rbuf[j] = ch
j = j + 1
return r
# -*- Mode: Python -*-
import unittest
import sys
from coro.asn1.ber import *
from coro.ldap.query import *
C = 'context'
pq_tests = [
# simple equality
('(xxx=yyy)',
((C, 3, ['xxx', 'yyy']),
12)),
# simple expression, plus 'present'
('(|(xx=y)(zz=*))',
((C, 1, [(C, 3, ['xx', 'y']), (C, 7, 'zz')]),
15)),
# nary expressions
('(|(a=b)(b=c)(c=d)(e=f)(f=g)(h=i))',
((C, 1, [(C, 3, ['a', 'b']), (C, 3, ['b', 'c']), (C, 3, ['c', 'd']), (C, 3, ['e', 'f']), (C, 3, ['f', 'g']), (C, 3, ['h', 'i'])]),
50)),
('(|(!(a=*))(&(b=c)(d=e))(x<=y))',
((C, 1, [(C, 2, [(C, 7, 'a')]), (C, 0, [(C, 3, ['b', 'c']), (C, 3, ['d', 'e'])]), (C, 6, ['x', 'y'])]),
33)),
# approximate match
('(zz~=yy)', ((C, 8, ['zz', 'yy']), 10)),
# substring
('(a=ins*tiga*tor)', ((C, 4, ['a', [(C, 0, 'ins'), (C, 1, 'tiga'), (C, 2, 'tor')]]), 23)),
('(a=*y)', ((C, 4, ['a', [(C, 2, 'y')]]), 10)),
('(a=y*)', ((C, 4, ['a', [(C, 0, 'y')]]), 10)),
('(a=*y*)', ((C, 4, ['a', [(C, 1, 'y')]]), 10)),
('(a=*x*y)', ((C, 4, ['a', [(C, 1, 'x'), (C, 2, 'y')]]), 13)),
('(a=*x*y*)', ((C, 4, ['a', [(C, 1, 'x'), (C, 1, 'y')]]), 13)),
('(a=*x*y*z)', ((C, 4, ['a', [(C, 1, 'x'), (C, 1, 'y'), (C, 2, 'z')]]), 16)),
# syntax errors
('(a=', QuerySyntaxError),
('(a<b)', QuerySyntaxError),
# good hex escape
('(a=some\\AAthing)',((C, 3, ['a', 'some\252thing']), 17)),
# bad hex escape
('(a=some\\AZthing)', QuerySyntaxError),
# upper/lower case hex escape
('(a=xy\\Aaz)', ((C, 3, ['a', 'xy\252z']), 11)),
# escaped splat
('(a=x*y\\2az)', ((C, 4, ['a', [(C, 0, 'x'), (C, 2, 'y*z')]]), 15)),
# illegal splat
('(a~=sam*son)', QuerySyntaxError),
# junk/illegal
('junk', QuerySyntaxError),
# lots of parens
(('('*100), QuerySyntaxError),
# expression too complex
(('(!' * 55) + '(x=y)' + (')' * 55), QuerySyntaxError),
# expression not too complex
(('(!' * 10) + '(x=y)' + (')' * 10),
((C, 2, [(C, 2, [(C, 2, [(C, 2, [(C, 2, [(C, 2, [(C, 2, [(C, 2, [(C, 2, [(C, 2, [(C, 3, ['x', 'y'])])])])])])])])])])]),
28)),
]
class parse_query_test (unittest.TestCase):
def runTest (self):
for q, e in pq_tests:
try:
self.assertEqual (decode (parse_query (q)), e)
except AssertionError:
raise
except:
self.assertEqual (sys.exc_info()[0], e)
def suite():
suite = unittest.TestSuite()
suite.addTest (parse_query_test())
return suite
if __name__ == '__main__':
unittest.main (defaultTest='suite')
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment