[Python-kolab-commits] r6 - trunk/python-kolab/kolab/kldap

scm-commit@wald.intevation.org scm-commit at wald.intevation.org
Thu Feb 22 18:26:42 CET 2007


Author: wrobel
Date: 2007-02-22 18:26:41 +0100 (Thu, 22 Feb 2007)
New Revision: 6

Added:
   trunk/python-kolab/kolab/kldap/base.py
Removed:
   trunk/python-kolab/kolab/kldap/kldap.py
Log:
May not have a kolab module in here.

Copied: trunk/python-kolab/kolab/kldap/base.py (from rev 5, trunk/python-kolab/kolab/kldap/kldap.py)

Deleted: trunk/python-kolab/kolab/kldap/kldap.py
===================================================================
--- trunk/python-kolab/kolab/kldap/kldap.py	2007-02-22 15:29:10 UTC (rev 5)
+++ trunk/python-kolab/kolab/kldap/kldap.py	2007-02-22 17:26:41 UTC (rev 6)
@@ -1,356 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-#################################################################################
-# PYTHON-KOLAB LDAP HANDLER
-#################################################################################
-# File:       kldap.py
-#
-#             A class for LDAP management.
-#
-# Copyright:
-#             (c) 2005 - 2007 Pardus
-#             Distributed under the terms of the GNU General Public License v2
-#
-# Author(s):
-#             Gunnar Wrobel <p at rdus.de>
-#
-# $Id$
-'''A class that provides access to values stored in a LDAP database.'''
-
-__version__ = "$Id$"
-
-#================================================================================
-#
-# Dependencies
-#
-#--------------------------------------------------------------------------------
-
-import ldap
-import ldap.modlist
-
-from   kolab.exceptions     import LdapException
-from   kolab.kldap.utils    import objectclass_filter
-
-from   kolab.globals        import OUT
-
-#================================================================================
-#
-# LdapStorage
-#
-#--------------------------------------------------------------------------------
-
-class Ldap(object):
-    '''
-    This class can be used to handle objects stored in an LDAP database.
-    '''
-
-    def __init__(self, uri, base_dn, users = {'default': ('', '')}):
-        self.ldap_uri  = uri
-        self.base_dn   = base_dn
-        self.users     = users
-
-        if not 'default' in self.users.keys():
-            self.users['default'] = ('', '')
-
-        self.connected = False
-        self.ldap        = None
-
-        self.set_auth()
-
-    def connect(self, user = 'default'):
-        ''' Connect to Ldap using the specified user. '''
-        self.set_auth(user)
-        self.require_connect()
-
-    def set_auth(self, user = 'default'):
-        ''' Change the user to the specified user. '''
-        self.require_disconnect()
-        self.current_user = user
-        self.ldapbinddn  = self.users[user][0]
-        self.ldappass    = self.users[user][1]
-
-    def require_connect(self):
-        ''' Connect to the LDAP server.  '''
-
-        if self.connected:
-            return
-        self.ldap = ldap.initialize(self.ldap_uri)
-        self.ldap.protocol_version = ldap.VERSION3
-        self.ldap.simple_bind_s(self.ldapbinddn, self.ldappass)
-        self.connected = True
-
-    def require_disconnect(self):
-        ''' Disconnect from LDAP server.'''
-
-        if not self.connected:
-            return
-        self.ldap.unbind()
-        self.connected = False
-
-    def add_base(self, dist_name):
-        ''' Complete an object name with the base DN.'''
-
-        if dist_name:
-            return dist_name + ',' + self.base_dn
-        else:
-            return self.base_dn
-
-    def del_base(self, dist_name):
-        ''' Removes the base dn from an object name.'''
-
-        if dist_name[-len(self.base_dn):] == self.base_dn:
-            dist_name = dist_name[:-len(self.base_dn)]
-            if dist_name[-1] == ',':
-                return dist_name[:-1]
-            else:
-                return dist_name
-        else:
-            raise LdapException('Object "' + dist_name + 
-                                '" does not match server name "'
-                                + self.base_dn + '"!')
-
-    def reconnect(self):
-        ''' Reconnect to the LDAP server.  '''
-
-        self.require_disconnect()
-        self.require_connect()
-
-    def fetch(self, dist_name, classes = ['*']):
-        '''
-        Tries to fetch the object for the given dn.
-
-        -- DOCTEST START
-
-        >>> class DummyLdap:
-        ...     def search_s(self, dn, scope, ofilter):
-        ...         if dn != 'cn=test,dc=example,dc=com':
-        ...             raise ldap.NO_SUCH_OBJECT('Not found')
-        ...         return [('cn=test,dc=example,dc=com',
-        ...                 {'cn': ['test'], 'test': ['another test'],
-        ...                  'objectClass': ['top', 'inetOrgPerson',
-        ...                                  'kolabInetOrgPerson',
-        ...                                  'hordePerson']})]
-        >>> class TestLdap(Ldap):
-        ...     def require_connect(self):
-        ...         self.connected = True
-        ...         self.ldap = DummyLdap()
-        ...     def require_disconnect(self):
-        ...         self.connected = False
-        >>> a = TestLdap("ldap://127.0.0.1", "dc=example,dc=com")
-        >>> a.fetch('cn=test', ['*'])
-        ('cn=test', {'test': ['another test'], 'objectClass': ['top', 'inetOrgPerson', 'kolabInetOrgPerson', 'hordePerson'], 'cn': ['test']})
-        >>> a.fetch('cn=test2', ['*'])
-
-        -- DOCTEST END
-        '''
-        ofilter = objectclass_filter(classes, '&')
-
-        dist_name = self.add_base(dist_name)
-
-        result = []
-
-        try:
-            self.reconnect()
-            
-            OUT.debug('Searching for object', 7)
-
-            result = self.ldap.search_s(dist_name, ldap.SCOPE_BASE, ofilter)
-        except ldap.NO_SUCH_OBJECT, error:
-            pass
-        except ldap.SERVER_DOWN, error:
-            raise LdapException('Server down (Filter: "' + ofilter +
-                                   '", Bound as: "' + self.ldapbinddn +
-                                   '", Error: ' + str(error) + ').')
-
-        if len(result) == 1:
-            # Returns the first object found
-            return (self.del_base(result[0][0]), result[0][1])
-        else:
-            return None
-
-    def store(self, dist_name, store_values):
-        ''' 
-        Stores the values as a new object in LDAP.  
-
-        -- DOCTEST START
-
-        >>> class DummyLdap:
-        ...     def search_s(self, dn, scope, ofilter):
-        ...         if dn != 'cn=test,dc=example,dc=com':
-        ...             raise ldap.NO_SUCH_OBJECT('Not found')
-        ...         return [('cn=test,dc=example,dc=com',
-        ...                 {'cn': ['test'], 'test': ['another test'],
-        ...                  'objectClass': ['top', 'inetOrgPerson',
-        ...                                  'kolabInetOrgPerson',
-        ...                                  'hordePerson']})]
-        ...     def add_s(self, dn, modlist):
-        ...         pass
-        ...     def rename_s(self, olddn, newdn, a, b):
-        ...         pass
-        ...     def modify_s(self, dn, modlist):
-        ...         pass
-        ...     def unbind(self):
-        ...         pass
-        >>> class FailLdap(DummyLdap):
-        ...     def add_s(self, dn, modlist):
-        ...         raise ldap.SERVER_DOWN('Not found')
-        ...     def rename_s(self, olddn, newdn, a, b):
-        ...         raise ldap.SERVER_DOWN('Not found')
-        ...     def modify_s(self, dn, modlist):
-        ...         raise ldap.SERVER_DOWN('Not found')
-        >>> class TestLdap(Ldap):
-        ...     def require_connect(self):
-        ...         self.connected = True
-        ...         self.ldap = DummyLdap()
-        ...     def require_disconnect(self):
-        ...         self.connected = False
-        >>> class TestLdapFail(Ldap):
-        ...     def require_connect(self):
-        ...         self.connected = True
-        ...         self.ldap = FailLdap()
-
-        >>> a = TestLdap("ldap://127.0.0.1", "dc=example,dc=com")
-        >>> b = TestLdapFail("ldap://127.0.0.1", "dc=example,dc=com")
-
-        Test adding a new object:
-
-        >>> data = {'test': ['another test'], 'objectClass': ['top', 'inetOrgPerson', 'kolabInetOrgPerson', 'hordePerson'], 'cn': ['test2']}
-
-        >>> a.store('cn=test2', data)
-        True
-
-        Test for failure:
-
-        >>> b.store('cn=test2', data)
-        Traceback (most recent call last):
-        ...
-        LdapException: Failed to create object "cn=test2,dc=example,dc=com".
-        Error was: Not found
-
-        Object update:
-
-        >>> data = {'test': ['another test'], 'objectClass': ['top', 'inetOrgPerson', 'kolabInetOrgPerson', 'hordePerson'], 'cn': ['test']}
-
-        >>> a.store('cn=test', data)
-        True
-
-        Test for failure:
-
-        >>> b.store('cn=test', data)
-        Traceback (most recent call last):
-        ...
-        LdapException: Failed to update object "cn=test,dc=example,dc=com".
-        Error was: Problem modifying the LDAP object (DN: "cn=test,dc=example,dc=com,dc=example,dc=com", Bound as: "", Error: Not found).
-
-        -- DOCTEST END
-        '''
-
-        OUT.debug('Creating/updating object in LDAP', 6)
-
-        old_object = self.fetch(dist_name, ['*'])
-
-        dist_name = self.add_base(dist_name)
-
-        # Does the object already exist?
-        if not old_object:
-
-            OUT.debug('Creating new object in LDAP', 7)
-
-            try:
-
-                OUT.debug('Generating modlist', 7)
-
-                modlist = ldap.modlist.addModlist(store_values)
-
-                self.reconnect()
-
-                OUT.debug('Adding object', 7)
-
-                self.ldap.add_s(dist_name, modlist)
-            except Exception, error:
-                raise LdapException('Failed to create object "' + dist_name
-                                    + '".\nError was: ' + str(error))
-
-            return True
-
-        # Object comes from LDAP or already exists
-        else:
-
-            OUT.debug('Object update', 7)
-
-            try:
-                self.reconnect()
-
-                self.update(dist_name, old_object[1], store_values)
-            except Exception, e:
-                raise LdapException('Failed to update object "' + dist_name
-                                       + '".\nError was: ' + str(e))
-
-            OUT.debug('Object update successful', 7)
-
-            return True
-
-    def update(self, dist_name, old_values, new_values):
-        ''' Update an object in LDAP.  '''
-
-        dist_name = self.add_base(dist_name)
-
-        modlist = ldap.modlist.modifyModlist(old_values, new_values)
-
-        self.reconnect()
-        try:
-            self.ldap.modify_s(dist_name, modlist)
-        except ldap.SERVER_DOWN, error:
-            raise LdapException('Problem modifying the LDAP object (DN: "' 
-                                + dist_name + '", Bound as: "' + self.ldapbinddn 
-                                + '", Error: ' + str(error) + ').')
-
-    def rename(self, old_dist_name, new_dist_name):
-        ''' Rename an object in LDAP.  '''
-
-        old_dist_name = self.add_base(old_dist_name)
-        new_dist_name = self.add_base(new_dist_name)
-
-        new_dist_name = ldap.explode_dn(new_dist_name)
-        if len(new_dist_name) < 2:
-            raise LdapException('Did not expect a single rdn as new dn!')
-
-        # Delete the storage object from ldap
-        try:
-            self.reconnect()
-            self.ldap.rename_s(old_dist_name, 
-                               new_dist_name[0], 
-                               ','.join(new_dist_name[1:]), 1)
-        except Exception, error:
-            LdapException('Failed to rename object "' + old_dist_name 
-                          + '" to "' + new_dist_name 
-                          + '".\nError was: ' + str(error))
-
-        return True
-
-    def delete(self, dist_name, object_classes = ['*']):
-        ''' Delete the object from LDAP.'''
-
-        OUT.debug('Deleting object', 7)
-
-        dist_name = self.add_base(dist_name)
-
-        # Delete the storage object from ldap
-        try:
-            self.reconnect()
-            self.ldap.delete_s(dist_name)
-        except Exception, error:
-            LdapException('Failed to delete object "' + dist_name
-                          + '".\nError was: ' + str(error))
-
-        return True
-
-#================================================================================
-#
-# Testing
-#
-#--------------------------------------------------------------------------------
-
-if __name__ == '__main__':
-    import doctest, sys
-    doctest.testmod(sys.modules[__name__])



More information about the Python-kolab-commits mailing list