[Python-kolab-commits] r5 - in trunk/python-kolab: . kolab kolab/kldap kolab/tests
scm-commit@wald.intevation.org
scm-commit at wald.intevation.org
Thu Feb 22 16:29:11 CET 2007
Author: wrobel
Date: 2007-02-22 16:29:10 +0100 (Thu, 22 Feb 2007)
New Revision: 5
Added:
trunk/python-kolab/ChangeLog
trunk/python-kolab/kolab/exceptions.py
trunk/python-kolab/kolab/globals.py
trunk/python-kolab/kolab/kldap/kolab.py
trunk/python-kolab/kolab/kldap/utils.py
trunk/python-kolab/kolab/tests/
trunk/python-kolab/kolab/tests/dtest.py
Modified:
trunk/python-kolab/
trunk/python-kolab/HACKING
trunk/python-kolab/kolab/kldap/kldap.py
Log:
2007-02-22 Gunnar Wrobel <p at rdus.de>
* kolab/kldap/kolab.py:
Added kolab specific functionality.
* kolab/kldap/kldap.py:
Fixed basic version.
* kolab/tests/dtest.py:
Started doc test aggregator.
* kolab/kldap/utils.py:
Splitted of LDAP utility functions.
* kolab/globals.py:
Provided debugging features.
* kolab/exceptions.py:
Started python-kolab specific exceptions.
* HACKING:
Added instructions for doc testing.
Property changes on: trunk/python-kolab
___________________________________________________________________
Name: svn:externals
+ pardus https://svn.pardus.de/scripts/trunk/pardus/pardus
Added: trunk/python-kolab/ChangeLog
===================================================================
--- trunk/python-kolab/ChangeLog 2007-02-22 13:11:45 UTC (rev 4)
+++ trunk/python-kolab/ChangeLog 2007-02-22 15:29:10 UTC (rev 5)
@@ -0,0 +1,30 @@
+2007-02-22 Gunnar Wrobel <p at rdus.de>
+
+ * kolab/kldap/kolab.py:
+
+ Added kolab specific functionality.
+
+ * kolab/kldap/kldap.py:
+
+ Fixed basic version.
+
+ * kolab/tests/dtest.py:
+
+ Started doc test aggregator.
+
+ * kolab/kldap/utils.py:
+
+ Splitted of LDAP utility functions.
+
+ * kolab/globals.py:
+
+ Provided debugging features.
+
+ * kolab/exceptions.py:
+
+ Started python-kolab specific exceptions.
+
+ * HACKING:
+
+ Added instructions for doc testing.
+
Modified: trunk/python-kolab/HACKING
===================================================================
--- trunk/python-kolab/HACKING 2007-02-22 13:11:45 UTC (rev 4)
+++ trunk/python-kolab/HACKING 2007-02-22 15:29:10 UTC (rev 5)
@@ -1 +1,4 @@
-Development guidelines still missing.
+Run the doc tests with:
+
+cd python-kolab
+PYTHONPATH="." python kolab/tests/dtest.py
Added: trunk/python-kolab/kolab/exceptions.py
===================================================================
--- trunk/python-kolab/kolab/exceptions.py 2007-02-22 13:11:45 UTC (rev 4)
+++ trunk/python-kolab/kolab/exceptions.py 2007-02-22 15:29:10 UTC (rev 5)
@@ -0,0 +1,40 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+#################################################################################
+# PYTHON-KOLAB EXCEPTIONS
+#################################################################################
+# File: exceptions.py
+#
+# Provides exception classes.
+#
+# Copyright:
+# (c) 2007 p at rdus <http://www.pardus.de>
+# Distributed under the terms of the GNU General Public License v2
+#
+# Author(s):
+# Gunnar Wrobel <p at rdus.de>
+#
+# $Id$
+'''A set of possible python-kolab exceptions.'''
+
+__version__ = "$Id$"
+
+#================================================================================
+#
+# Exceptions
+#
+#--------------------------------------------------------------------------------
+
+class LdapException(Exception):
+ '''
+ Indicates a problem with the LDAP connection.
+ '''
+
+#================================================================================
+#
+# Testing
+#
+#--------------------------------------------------------------------------------
+
+if __name__ == '__main__':
+ print "python-kolab v" + VERSION
Added: trunk/python-kolab/kolab/globals.py
===================================================================
--- trunk/python-kolab/kolab/globals.py 2007-02-22 13:11:45 UTC (rev 4)
+++ trunk/python-kolab/kolab/globals.py 2007-02-22 15:29:10 UTC (rev 5)
@@ -0,0 +1,36 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+#################################################################################
+# PYTHON-KOLAB GLOBALS
+#################################################################################
+# File: globals.py
+#
+# Provides global variables for python-kolab.
+#
+# Copyright:
+# (c) 2007 p at rdus <http://www.pardus.de>
+# Distributed under the terms of the GNU General Public License v2
+#
+# Author(s):
+# Gunnar Wrobel <p at rdus.de>
+#
+# $Id$
+'''Some global variables.'''
+
+__version__ = "$Id$"
+
+#================================================================================
+#
+# Dependencies
+#
+#--------------------------------------------------------------------------------
+
+from pardus.debug import setup_message
+
+#================================================================================
+#
+# GLOBALS
+#
+#--------------------------------------------------------------------------------
+
+OUT = setup_message('python-kolab')
Modified: trunk/python-kolab/kolab/kldap/kldap.py
===================================================================
--- trunk/python-kolab/kolab/kldap/kldap.py 2007-02-22 13:11:45 UTC (rev 4)
+++ trunk/python-kolab/kolab/kldap/kldap.py 2007-02-22 15:29:10 UTC (rev 5)
@@ -27,130 +27,23 @@
import ldap
import ldap.modlist
-import types, re
-#================================================================================
-#
-# Helper functions
-#
-#--------------------------------------------------------------------------------
+from kolab.exceptions import LdapException
+from kolab.kldap.utils import objectclass_filter
-def escape(ldapstr, rep_map):
- '''
- Replace all characters according to the given map.
- '''
- for in_c, outc in rep_map:
- ldapstr = ldapstr.replace(in_c, outc)
- return ldapstr
+from kolab.globals import OUT
-def escape_for_filter(filter_str):
- '''
- From RFC-2254:
-
- If a value should contain any of the following characters
-
- Character ASCII value
- ---------------------------
- * 0x2a
- ( 0x28
- ) 0x29
- \ 0x5c
- NUL 0x00
-
- the character must be encoded as the backslash "\" character (ASCII
- 0x5c) followed by the two hexadecimal digits representing the ASCII
- value of the encoded character. The case of the two hexadecimal
- digits is not significant.
-
- Simple test (the four slashes are necessary for the sake of the DocTest):
-
- >>> escape_for_filter("\\\\*()\\0()*\\\\")
- '\\\\5c\\\\2a\\\\28\\\\29\\\\00\\\\28\\\\29\\\\2a\\\\5c'
- '''
- rep_map = (( '\\', '\\5c'),
- ( '*', '\\2a'),
- ( '(', '\\28'),
- ( ')', '\\29'),
- ( '\0', '\\00'))
- return escape(filter_str, rep_map);
-
-def escape_for_dn(dn_str):
- '''
- DN component escaping as described in RFC-2253
-
- Simple test (the four slashes are necessary for the sake of the DocTest):
-
- >>> escape_for_dn("\\\\,+<><<>;;+")
- '\\\\\\\\\\\\,\\\\+\\\\<\\\\>\\\\<\\\\<\\\\>\\\\;\\\\;\\\\+'
- '''
- rep_map = (( '\\', '\\\\'),
- ( ',', '\\,'),
- ( '+', '\\+'),
- ( '<', '\\<'),
- ( '>', '\\>'),
- ( ';', '\\;'))
- dn_str = escape(dn_str, rep_map)
- if dn_str[0] == '#':
- dn_str = '\\' + dn_str
- head = re.compile('^ *').search(dn_str)
- tail = re.compile(' *$').search(dn_str)
- if head:
- begin = head.span()[1]
- else:
- begin = 0
- if tail:
- end = tail.span()[0]
- else:
- end = len(str)
- return '\\ ' * begin + dn_str[begin:end] + '\\ ' * (len(dn_str) - end)
-
-
-def objectclass_filter(classes, concat = '|'):
- ''' Generate a object class filter string.
-
- >>> objectclass_filter(['a','b'])
- '(|(objectclass=a)(objectclass=b))'
-
- >>> objectclass_filter(['a','b'], ':')
- '(:(objectclass=a)(objectclass=b))'
-
- >>> objectclass_filter(['a','b', 'c'])
- '(|(|(objectclass=a)(objectclass=b))(objectclass=c))'
-
- >>> objectclass_filter('a', ':')
- '(objectclass=a)'
- '''
- if isinstance(classes, types.StringTypes):
- return '(objectclass=' + classes + ')'
- else:
- def class_expand(cls):
- ''' Expand array to a filter string.'''
- if len(cls) == 1:
- return objectclass_filter(cls[0])
- if len(cls) == 2:
- return '(' + concat + objectclass_filter(cls[0]) \
- + objectclass_filter(cls[1]) + ')'
- else:
- return '(' + concat + class_expand(cls[:-1]) \
- + objectclass_filter(cls[-1]) + ')'
-
- return class_expand(classes)
-
-
#================================================================================
#
# LdapStorage
#
#--------------------------------------------------------------------------------
-class LdapStorage(object):
+class Ldap(object):
'''
This class can be used to handle objects stored in an LDAP database.
'''
- implements(IStorage)
- info = 'LDAP'
-
def __init__(self, uri, base_dn, users = {'default': ('', '')}):
self.ldap_uri = uri
self.base_dn = base_dn
@@ -165,26 +58,69 @@
self.set_auth()
def connect(self, user = 'default'):
- ''' This initializes the storage handler. This is not handled within
- the __init__ call since the storage handlers are most
- certainly also services. And the service setup should be
- seperate from the storage setup. '''
+ ''' Connect to Ldap using the specified user. '''
self.set_auth(user)
self.require_connect()
def set_auth(self, user = 'default'):
- ''' Set the authentication to a desired level. The lower the level the
- less rights are provided. The maximum level acceptable can be
- decided by the class actually implementing this interface. '''
+ ''' Change the user to the specified user. '''
self.require_disconnect()
self.current_user = user
self.ldapbinddn = self.users[user][0]
self.ldappass = self.users[user][1]
- def fetch(self, oid, object_classes = ['*']):
+ def require_connect(self):
+ ''' Connect to the LDAP server. '''
+
+ if self.connected:
+ return
+ self.ldap = ldap.initialize(self.ldap_uri)
+ self.ldap.protocol_version = ldap.VERSION3
+ self.ldap.simple_bind_s(self.ldapbinddn, self.ldappass)
+ self.connected = True
+
+ def require_disconnect(self):
+ ''' Disconnect from LDAP server.'''
+
+ if not self.connected:
+ return
+ self.ldap.unbind()
+ self.connected = False
+
+ def add_base(self, dist_name):
+ ''' Complete an object name with the base DN.'''
+
+ if dist_name:
+ return dist_name + ',' + self.base_dn
+ else:
+ return self.base_dn
+
+ def del_base(self, dist_name):
+ ''' Removes the base dn from an object name.'''
+
+ if dist_name[-len(self.base_dn):] == self.base_dn:
+ dist_name = dist_name[:-len(self.base_dn)]
+ if dist_name[-1] == ',':
+ return dist_name[:-1]
+ else:
+ return dist_name
+ else:
+ raise LdapException('Object "' + dist_name +
+ '" does not match server name "'
+ + self.base_dn + '"!')
+
+ def reconnect(self):
+ ''' Reconnect to the LDAP server. '''
+
+ self.require_disconnect()
+ self.require_connect()
+
+ def fetch(self, dist_name, classes = ['*']):
'''
- Fetch an object from storage.
+ Tries to fetch the object for the given dn.
+ -- DOCTEST START
+
>>> class DummyLdap:
... def search_s(self, dn, scope, ofilter):
... if dn != 'cn=test,dc=example,dc=com':
@@ -194,47 +130,50 @@
... 'objectClass': ['top', 'inetOrgPerson',
... 'kolabInetOrgPerson',
... 'hordePerson']})]
- >>> class DummyLdapStorage(LdapStorage):
+ >>> class TestLdap(Ldap):
... def require_connect(self):
... self.connected = True
... self.ldap = DummyLdap()
... def require_disconnect(self):
... self.connected = False
- >>> a = DummyLdapStorage("ldap://127.0.0.1", "dc=example,dc=com")
+ >>> a = TestLdap("ldap://127.0.0.1", "dc=example,dc=com")
>>> a.fetch('cn=test', ['*'])
- {'test': ['another test'], 'objectClass': ['top', 'inetOrgPerson', 'kolabInetOrgPerson', 'hordePerson'], 'dn': ['cn=test'], 'cn': ['test']}
+ ('cn=test', {'test': ['another test'], 'objectClass': ['top', 'inetOrgPerson', 'kolabInetOrgPerson', 'hordePerson'], 'cn': ['test']})
>>> a.fetch('cn=test2', ['*'])
+
+ -- DOCTEST END
'''
+ ofilter = objectclass_filter(classes, '&')
- OUT.debug('Updating storage object', 6)
+ dist_name = self.add_base(dist_name)
- ldap_result = []
+ result = []
- # Has a dn been given
- if oid:
+ try:
+ self.reconnect()
+
+ OUT.debug('Searching for object', 7)
- OUT.debug('Fetching object values', 7)
+ result = self.ldap.search_s(dist_name, ldap.SCOPE_BASE, ofilter)
+ except ldap.NO_SUCH_OBJECT, error:
+ pass
+ except ldap.SERVER_DOWN, error:
+ raise LdapException('Server down (Filter: "' + ofilter +
+ '", Bound as: "' + self.ldapbinddn +
+ '", Error: ' + str(error) + ').')
- # Try to fetch the result
- ldap_result = self.ldap_fetch(oid,
- object_classes)
-
- OUT.debug('Got object values', 7)
-
- if (ldap_result and ldap_result[1] and
- oid == ldap_result[0][0:len(oid)]):
- # Need that for the mapping to object attributes
- ldap_result[1]['dn'] = [ldap_result[0][0:len(oid)]]
-
- return ldap_result[1]
+ if len(result) == 1:
+ # Returns the first object found
+ return (self.del_base(result[0][0]), result[0][1])
else:
- return {}
+ return None
- def store(self, data, oid, input_data, object_classes):
- ''' This storage method will write the output values to ldap. The
- configuration context provided by the corresponding storage
- object needs to provide a valid ldap connection.
+ def store(self, dist_name, store_values):
+ '''
+ Stores the values as a new object in LDAP.
+ -- DOCTEST START
+
>>> class DummyLdap:
... def search_s(self, dn, scope, ofilter):
... if dn != 'cn=test,dc=example,dc=com':
@@ -250,6 +189,8 @@
... pass
... def modify_s(self, dn, modlist):
... pass
+ ... def unbind(self):
+ ... pass
>>> class FailLdap(DummyLdap):
... def add_s(self, dn, modlist):
... raise ldap.SERVER_DOWN('Not found')
@@ -257,267 +198,153 @@
... raise ldap.SERVER_DOWN('Not found')
... def modify_s(self, dn, modlist):
... raise ldap.SERVER_DOWN('Not found')
- >>> class DummyLdapStorage(LdapStorage):
+ >>> class TestLdap(Ldap):
... def require_connect(self):
... self.connected = True
... self.ldap = DummyLdap()
... def require_disconnect(self):
... self.connected = False
- >>> class FailLdapStorage(DummyLdapStorage):
+ >>> class TestLdapFail(Ldap):
... def require_connect(self):
... self.connected = True
... self.ldap = FailLdap()
- >>> a = DummyLdapStorage("ldap://127.0.0.1", "dc=example,dc=com")
+ >>> a = TestLdap("ldap://127.0.0.1", "dc=example,dc=com")
+ >>> b = TestLdapFail("ldap://127.0.0.1", "dc=example,dc=com")
Test adding a new object:
- >>> input = {}
- >>> output = {'test': ['another test'], 'objectClass': ['top', 'inetOrgPerson', 'kolabInetOrgPerson', 'hordePerson'], 'dn': ['cn=test2'], 'cn': ['test2']}
+ >>> data = {'test': ['another test'], 'objectClass': ['top', 'inetOrgPerson', 'kolabInetOrgPerson', 'hordePerson'], 'cn': ['test2']}
- >>> a.store(output, 'cn=test2', input, ['*'])
+ >>> a.store('cn=test2', data)
True
- Test adding an object that existed in LDAP (but is missing now):
-
- >>> input = {'test': ['another test'], 'objectClass': ['top', 'inetOrgPerson', 'kolabInetOrgPerson', 'hordePerson'], 'dn': ['cn=test2'], 'cn': ['test2']}
- >>> output = {'test': ['another test'], 'objectClass': ['top', 'inetOrgPerson', 'kolabInetOrgPerson', 'hordePerson'], 'dn': ['cn=test2'], 'cn': ['test2']}
-
- >>> a.store(output, 'cn=test2', input, ['*'])
- True
-
Test for failure:
- >>> b = FailLdapStorage("ldap://127.0.0.1", "dc=example,dc=com")
- >>> b.store(output, 'cn=test2', input, ['*'])
+ >>> b.store('cn=test2', data)
Traceback (most recent call last):
...
- StorageException: Failed to create object "cn=test2".
+ LdapException: Failed to create object "cn=test2,dc=example,dc=com".
Error was: Not found
- Renaming an object:
-
- >>> output = {'test': ['another test'], 'objectClass': ['top', 'inetOrgPerson', 'kolabInetOrgPerson', 'hordePerson'], 'dn': ['cn=test2'], 'cn': ['test2']}
- >>> input = {'test': ['another test'], 'objectClass': ['top', 'inetOrgPerson', 'kolabInetOrgPerson', 'hordePerson'], 'dn': ['cn=test'], 'cn': ['test']}
-
- >>> a.store(output, 'cn=test2', input, ['*'])
- True
-
- Test for failure:
-
- >>> input = {'test': ['another test'], 'objectClass': ['top', 'inetOrgPerson', 'kolabInetOrgPerson', 'hordePerson'], 'dn': ['cn=test'], 'cn': ['test']}
- >>> b = FailLdapStorage("ldap://127.0.0.1", "dc=example,dc=com")
- >>> b.store(output, 'cn=test2', input, ['*'])
- Traceback (most recent call last):
- ...
- StorageException: Failed to rename object "cn=test2".
- Error was: Not found
-
Object update:
- >>> output = {'test': ['another test2'], 'objectClass': ['top', 'inetOrgPerson', 'kolabInetOrgPerson', 'hordePerson'], 'dn': ['cn=test'], 'cn': ['test']}
- >>> input = {'test': ['another test'], 'objectClass': ['top', 'inetOrgPerson', 'kolabInetOrgPerson', 'hordePerson'], 'dn': ['cn=test'], 'cn': ['test']}
+ >>> data = {'test': ['another test'], 'objectClass': ['top', 'inetOrgPerson', 'kolabInetOrgPerson', 'hordePerson'], 'cn': ['test']}
- >>> a.store(output, 'cn=test', input, ['*'])
+ >>> a.store('cn=test', data)
True
Test for failure:
- >>> input = {'test': ['another test'], 'objectClass': ['top', 'inetOrgPerson', 'kolabInetOrgPerson', 'hordePerson'], 'dn': ['cn=test'], 'cn': ['test']}
- >>> b = FailLdapStorage("ldap://127.0.0.1", "dc=example,dc=com")
- >>> b.store(output, 'cn=test', input, ['*'])
+ >>> b.store('cn=test', data)
Traceback (most recent call last):
...
- StorageException: Failed to update object "cn=test".
- Error was: Not found
+ LdapException: Failed to update object "cn=test,dc=example,dc=com".
+ Error was: Problem modifying the LDAP object (DN: "cn=test,dc=example,dc=com,dc=example,dc=com", Bound as: "", Error: Not found).
+
+ -- DOCTEST END
'''
OUT.debug('Creating/updating object in LDAP', 6)
- dist_name = ''
+ old_object = self.fetch(dist_name, ['*'])
- # The dn entry may not be part of the dict in case we update
- # an object
- if 'dn' in input_data.keys():
+ dist_name = self.add_base(dist_name)
- dist_name = input_data['dn'][0]
+ # Does the object already exist?
+ if not old_object:
- del input_data['dn']
+ OUT.debug('Creating new object in LDAP', 7)
- # Hm, we apparently got the object from ldap
- # but it does not exist anymore. This means
- # we add a new object
- if not self.ldap_fetch(dist_name, object_classes):
- dist_name = ''
+ try:
- # The object did not originate from LDAP and does not
- # yet exist in LDAP
- if not dist_name and not self.ldap_fetch(oid, object_classes):
+ OUT.debug('Generating modlist', 7)
- OUT.debug('Creating new object in LDAP', 7)
+ modlist = ldap.modlist.addModlist(store_values)
- try:
- self.ldap_store(oid, data)
+ self.reconnect()
+
+ OUT.debug('Adding object', 7)
+
+ self.ldap.add_s(dist_name, modlist)
except Exception, error:
- raise StorageException('Failed to create object "' + oid
- + '".\nError was: ' + str(error))
+ raise LdapException('Failed to create object "' + dist_name
+ + '".\nError was: ' + str(error))
return True
# Object comes from LDAP or already exists
else:
- OUT.debug('Object modification', 7)
-
- if dist_name != oid:
-
- OUT.debug('Object needs renaming', 7)
-
- try:
- self.ldap_rename(dist_name, oid)
- except Exception, e:
- raise StorageException('Failed to rename object "' + oid
- + '".\nError was: ' + str(e))
-
OUT.debug('Object update', 7)
try:
- self.ldap_update(oid, input_data, data)
+ self.reconnect()
+
+ self.update(dist_name, old_object[1], store_values)
except Exception, e:
- raise StorageException('Failed to update object "' + oid
+ raise LdapException('Failed to update object "' + dist_name
+ '".\nError was: ' + str(e))
OUT.debug('Object update successful', 7)
return True
- def delete(self, oid, object_classes):
- ''' Delete the object from LDAP.'''
+ def update(self, dist_name, old_values, new_values):
+ ''' Update an object in LDAP. '''
- OUT.debug('Deleting object', 7)
+ dist_name = self.add_base(dist_name)
- if not self.ldap_fetch(oid, object_classes):
- raise StorageException('Cannot delete object "' + oid + '". Did '
- 'not find it in storage.')
+ modlist = ldap.modlist.modifyModlist(old_values, new_values)
- ## Delete the storage object from ldap
- try:
- self.ldap_delete(oid)
- except Exception, error:
- StorageException('Failed to delete object "' + oid
- + '".\nError was: ' + str(error))
-
- return True
-
- def ldap_fetch(self, dist_name, classes = ['*']):
- '''
- Tries to fetch the object for the given dn.
- '''
- ofilter = objectclass_filter(classes, '&')
-
- dist_name = self.complete_dn(dist_name)
-
- result = []
-
self.reconnect()
try:
-
- OUT.debug('Searching for object', 7)
-
- result = self.ldap.search_s(dist_name, ldap.SCOPE_BASE, ofilter)
- except ldap.NO_SUCH_OBJECT, error:
- pass
+ self.ldap.modify_s(dist_name, modlist)
except ldap.SERVER_DOWN, error:
- raise StorageException('Server down (Filter: "' + ofilter +
- '", Bound as: "' + self.ldapbinddn +
- '", Error: ' + str(error) + ').')
+ raise LdapException('Problem modifying the LDAP object (DN: "'
+ + dist_name + '", Bound as: "' + self.ldapbinddn
+ + '", Error: ' + str(error) + ').')
- if len(result) == 1:
- ## Returns the first object found
- return result[0]
- else:
- return None
-
- def ldap_store(self, dist_name, store_values):
- ''' Stores the values as a new object in LDAP. '''
-
- OUT.debug('Creating new object in LDAP', 6)
-
- dist_name = self.complete_dn(dist_name)
-
- OUT.debug('Generating modlist', 7)
-
- modlist = ldap.modlist.addModlist(store_values)
-
- self.reconnect()
-
- OUT.debug('Adding object', 7)
-
- self.ldap.add_s(dist_name, modlist)
-
- def ldap_update(self, dist_name, oldvalues, newvalues):
- ''' Update an object in LDAP. '''
-
- dist_name = self.complete_dn(dist_name)
-
- modlist = ldap.modlist.modifyModlist(oldvalues, newvalues)
-
- self.reconnect()
- self.ldap.modify_s(dist_name, modlist)
-
- def ldap_rename(self, olddn, newdn):
+ def rename(self, old_dist_name, new_dist_name):
''' Rename an object in LDAP. '''
- olddn = self.complete_dn(olddn)
- newdn = self.complete_dn(newdn)
+ old_dist_name = self.add_base(old_dist_name)
+ new_dist_name = self.add_base(new_dist_name)
- newdn = ldap.explode_dn(newdn)
- if len(newdn) < 2:
- raise StorageException('Did not expect a single rdn as new dn!')
+ new_dist_name = ldap.explode_dn(new_dist_name)
+ if len(new_dist_name) < 2:
+ raise LdapException('Did not expect a single rdn as new dn!')
- self.reconnect()
- self.ldap.rename_s(olddn, newdn[0], ','.join(newdn[1:]), 1)
+ # Delete the storage object from ldap
+ try:
+ self.reconnect()
+ self.ldap.rename_s(old_dist_name,
+ new_dist_name[0],
+ ','.join(new_dist_name[1:]), 1)
+ except Exception, error:
+ LdapException('Failed to rename object "' + old_dist_name
+ + '" to "' + new_dist_name
+ + '".\nError was: ' + str(error))
- def ldap_delete(self, dist_name):
- ''' Delete an object from LDAP. '''
+ return True
- dist_name = self.complete_dn(dist_name)
+ def delete(self, dist_name, object_classes = ['*']):
+ ''' Delete the object from LDAP.'''
- self.reconnect()
- self.ldap.delete_s(dist_name)
+ OUT.debug('Deleting object', 7)
- def require_connect(self):
- ''' Connect to the LDAP server. '''
+ dist_name = self.add_base(dist_name)
- if self.connected:
- return
- self.ldap = ldap.initialize(self.ldap_uri)
- self.ldap.protocol_version = ldap.VERSION3
- self.ldap.simple_bind_s(self.ldapbinddn, self.ldappass)
- self.connected = True
+ # Delete the storage object from ldap
+ try:
+ self.reconnect()
+ self.ldap.delete_s(dist_name)
+ except Exception, error:
+ LdapException('Failed to delete object "' + dist_name
+ + '".\nError was: ' + str(error))
- def require_disconnect(self):
- ''' Disconnect from LDAP server. '''
+ return True
- if not self.connected:
- return
- self.ldap.unbind()
- self.connected = False
-
- def reconnect(self):
- ''' Reconnect to the LDAP server. '''
-
- self.require_disconnect()
- self.require_connect()
-
- def complete_dn(self, dist_name):
- ''' Complete an object name with the base DN.. '''
-
- if dist_name:
- return dist_name + ',' + self.base_dn
- else:
- return self.base_dn
-
#================================================================================
#
# Testing
Added: trunk/python-kolab/kolab/kldap/kolab.py
===================================================================
--- trunk/python-kolab/kolab/kldap/kolab.py 2007-02-22 13:11:45 UTC (rev 4)
+++ trunk/python-kolab/kolab/kldap/kolab.py 2007-02-22 15:29:10 UTC (rev 5)
@@ -0,0 +1,100 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#################################################################################
+# PYTHON-KOLAB KOLAB-LDAP HANDLER
+#################################################################################
+# File: kolab.py
+#
+# Kolab specific LDAP management.
+#
+# Copyright:
+# (c) 2005 - 2007 Pardus
+# Distributed under the terms of the GNU General Public License v2
+#
+# Author(s):
+# Gunnar Wrobel <p at rdus.de>
+#
+# $Id$
+'''Kolab specific LDAP functions.'''
+
+__version__ = '$Id$'
+
+#===============================================================================
+#
+# Dependencies
+#
+#-------------------------------------------------------------------------------
+
+from kolab.kldap.kldap import Ldap
+from kolab.kldap.utils import objectclass_filter
+
+#================================================================================
+#
+# KolabLdap class
+#
+#--------------------------------------------------------------------------------
+
+class KolabLdap(Ldap):
+ ''' This class handles configuration data stored in a ldap
+ directory. The values are being read using the ldap modules. It
+ extends the LdapStorage definition and adds a few kolab specific
+ features.
+ '''
+
+ def enable_nobody(self):
+ '''Temporarily activates the "nobody" user.'''
+
+ if self.current_user != 'nobody':
+ self.old_user = self.current_user
+ self.set_auth('nobody')
+
+ def disable_nobody(self):
+ '''Deactivates the "nobody" user.'''
+
+ if self.current_user == 'nobody' and self.old_user:
+ self.old_user = ''
+ self.set_auth(self.old_user)
+
+ def dn_for_object(self, value, attr, classes = ['*']):
+ '''Returns a list of dn's that match the attr=value combination.'''
+
+ ofilter = objectclass_filter(classes)
+
+ self.require_connect()
+
+ result = self.ldap.search_s(self.base_dn, ldap.SCOPE_SUBTREE, '(&' +
+ ofilter + '(' + attr + '=' +
+ escape_for_filter(value) + '))',
+ attrsonly = 1)
+ if result:
+ # Returns the first half of the tuple which is the dn
+ # The base dn is removed from the entries
+ return [i[0][:-(len(self.base_dn) + 1)] for i in result]
+ else:
+ raise StorageException('No object found!')
+
+ def dn_for_mail(self, mail, classes = ['*']):
+ '''Returns a list of dn's that own the given e-mail address. This
+ should usually result in a maximum of one entry.'''
+
+ self.require_connect()
+
+ dist_name = []
+ for i in ['uid', 'mail', 'alias']:
+ try:
+ dist_name.append((self.dn_for_object(mail, i, classes), i))
+ break
+ except StorageException, e:
+ pass
+
+ return dist_name
+
+#================================================================================
+#
+# Testing
+#
+#--------------------------------------------------------------------------------
+
+if __name__ == '__main__':
+ import doctest, sys
+ doctest.testmod(sys.modules[__name__])
Added: trunk/python-kolab/kolab/kldap/utils.py
===================================================================
--- trunk/python-kolab/kolab/kldap/utils.py 2007-02-22 13:11:45 UTC (rev 4)
+++ trunk/python-kolab/kolab/kldap/utils.py 2007-02-22 15:29:10 UTC (rev 5)
@@ -0,0 +1,145 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#################################################################################
+# PYTHON-KOLAB LDAP UTILITIES
+#################################################################################
+# File: utils.py
+#
+# Some utilities for LDAP management.
+#
+# Copyright:
+# (c) 2005 - 2007 Pardus
+# Distributed under the terms of the GNU General Public License v2
+#
+# Author(s):
+# Gunnar Wrobel <p at rdus.de>
+#
+# $Id$
+'''Utility functions for LDAP.'''
+
+__version__ = "$Id$"
+
+#================================================================================
+#
+# Dependencies
+#
+#--------------------------------------------------------------------------------
+
+import types, re
+
+#================================================================================
+#
+# Helper functions
+#
+#--------------------------------------------------------------------------------
+
+def escape(ldapstr, rep_map):
+ '''
+ Replace all characters according to the given map.
+ '''
+ for in_c, outc in rep_map:
+ ldapstr = ldapstr.replace(in_c, outc)
+ return ldapstr
+
+def escape_for_filter(filter_str):
+ '''
+ From RFC-2254:
+
+ If a value should contain any of the following characters
+
+ Character ASCII value
+ ---------------------------
+ * 0x2a
+ ( 0x28
+ ) 0x29
+ \ 0x5c
+ NUL 0x00
+
+ the character must be encoded as the backslash "\" character (ASCII
+ 0x5c) followed by the two hexadecimal digits representing the ASCII
+ value of the encoded character. The case of the two hexadecimal
+ digits is not significant.
+
+ Simple test (the four slashes are necessary for the sake of the DocTest):
+
+ >>> escape_for_filter("\\\\*()\\0()*\\\\")
+ '\\\\5c\\\\2a\\\\28\\\\29\\\\00\\\\28\\\\29\\\\2a\\\\5c'
+ '''
+ rep_map = (( '\\', '\\5c'),
+ ( '*', '\\2a'),
+ ( '(', '\\28'),
+ ( ')', '\\29'),
+ ( '\0', '\\00'))
+ return escape(filter_str, rep_map);
+
+def escape_for_dn(dn_str):
+ '''
+ DN component escaping as described in RFC-2253
+
+ Simple test (the four slashes are necessary for the sake of the DocTest):
+
+ >>> escape_for_dn("\\\\,+<><<>;;+")
+ '\\\\\\\\\\\\,\\\\+\\\\<\\\\>\\\\<\\\\<\\\\>\\\\;\\\\;\\\\+'
+ '''
+ rep_map = (( '\\', '\\\\'),
+ ( ',', '\\,'),
+ ( '+', '\\+'),
+ ( '<', '\\<'),
+ ( '>', '\\>'),
+ ( ';', '\\;'))
+ dn_str = escape(dn_str, rep_map)
+ if dn_str[0] == '#':
+ dn_str = '\\' + dn_str
+ head = re.compile('^ *').search(dn_str)
+ tail = re.compile(' *$').search(dn_str)
+ if head:
+ begin = head.span()[1]
+ else:
+ begin = 0
+ if tail:
+ end = tail.span()[0]
+ else:
+ end = len(str)
+ return '\\ ' * begin + dn_str[begin:end] + '\\ ' * (len(dn_str) - end)
+
+
+def objectclass_filter(classes, concat = '|'):
+ ''' Generate a object class filter string.
+
+ >>> objectclass_filter(['a','b'])
+ '(|(objectclass=a)(objectclass=b))'
+
+ >>> objectclass_filter(['a','b'], ':')
+ '(:(objectclass=a)(objectclass=b))'
+
+ >>> objectclass_filter(['a','b', 'c'])
+ '(|(|(objectclass=a)(objectclass=b))(objectclass=c))'
+
+ >>> objectclass_filter('a', ':')
+ '(objectclass=a)'
+ '''
+ if isinstance(classes, types.StringTypes):
+ return '(objectclass=' + classes + ')'
+ else:
+ def class_expand(cls):
+ ''' Expand array to a filter string.'''
+ if len(cls) == 1:
+ return objectclass_filter(cls[0])
+ if len(cls) == 2:
+ return '(' + concat + objectclass_filter(cls[0]) \
+ + objectclass_filter(cls[1]) + ')'
+ else:
+ return '(' + concat + class_expand(cls[:-1]) \
+ + objectclass_filter(cls[-1]) + ')'
+
+ return class_expand(classes)
+
+#================================================================================
+#
+# Testing
+#
+#--------------------------------------------------------------------------------
+
+if __name__ == '__main__':
+ import doctest, sys
+ doctest.testmod(sys.modules[__name__])
Added: trunk/python-kolab/kolab/tests/dtest.py
===================================================================
--- trunk/python-kolab/kolab/tests/dtest.py 2007-02-22 13:11:45 UTC (rev 4)
+++ trunk/python-kolab/kolab/tests/dtest.py 2007-02-22 15:29:10 UTC (rev 5)
@@ -0,0 +1,76 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+#################################################################################
+# PYTHON-KOLAB DOCTEST AGGREGATOR
+#################################################################################
+# File: dtest.py
+#
+# Combines the doctests that are available for the different modules
+#
+# Copyright:
+# (c) 2005 - 2007 Pardus
+# Distributed under the terms of the GNU General Public License v2
+#
+# Author(s):
+# Gunnar Wrobel <p at rdus.de>
+#
+# $Id$
+'''Aggregates doctests from all modules that provide such tests.'''
+
+__version__ = '$Id$'
+
+#================================================================================
+#
+# Dependencies
+#
+#--------------------------------------------------------------------------------
+
+import unittest, doctest, sys
+
+# On module creation:
+
+# 1.) Check header section (copyright notice)
+# 2.) Add module doc string
+# 3.) Add version string
+# 4.) Add testing handler at bottom of module
+# 5.) Add module into tests/dtest.py. Check that tests run through
+# 6.) Run pylint over the code. Fix any reasonable complaints.
+# 7.) Whitespace clean the buffer.
+# 8.) Add svn:keywords "Id" to file.
+
+# On module change:
+
+# 1.) Check header section (copyright notice)
+# 5.) Check that tests run through
+# 6.) Run pylint over the code. Fix any reasonable complaints.
+# 7.) Whitespace clean the buffer.
+
+# clean modules : CT
+# not yet clean : UT
+# clean but no testing : CN
+# unclean but no testing: UN
+
+import kolab.kldap.kldap #UN
+import kolab.kldap.utils #UN
+#import kolab.version #CN
+
+#================================================================================
+#
+# Test Suite
+#
+#--------------------------------------------------------------------------------
+
+def test_suite():
+ return unittest.TestSuite((
+ doctest.DocTestSuite(kolab.kldap.kldap),
+ doctest.DocTestSuite(kolab.kldap.utils),
+ ))
+
+#================================================================================
+#
+# Run Testing
+#
+#--------------------------------------------------------------------------------
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='test_suite')
More information about the Python-kolab-commits
mailing list