[Python-kolab-commits] r6 - trunk/python-kolab/kolab/kldap
scm-commit@wald.intevation.org
scm-commit at wald.intevation.org
Thu Feb 22 18:26:42 CET 2007
Author: wrobel
Date: 2007-02-22 18:26:41 +0100 (Thu, 22 Feb 2007)
New Revision: 6
Added:
trunk/python-kolab/kolab/kldap/base.py
Removed:
trunk/python-kolab/kolab/kldap/kldap.py
Log:
May not have a kolab module in here.
Copied: trunk/python-kolab/kolab/kldap/base.py (from rev 5, trunk/python-kolab/kolab/kldap/kldap.py)
Deleted: trunk/python-kolab/kolab/kldap/kldap.py
===================================================================
--- trunk/python-kolab/kolab/kldap/kldap.py 2007-02-22 15:29:10 UTC (rev 5)
+++ trunk/python-kolab/kolab/kldap/kldap.py 2007-02-22 17:26:41 UTC (rev 6)
@@ -1,356 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-#################################################################################
-# PYTHON-KOLAB LDAP HANDLER
-#################################################################################
-# File: kldap.py
-#
-# A class for LDAP management.
-#
-# Copyright:
-# (c) 2005 - 2007 Pardus
-# Distributed under the terms of the GNU General Public License v2
-#
-# Author(s):
-# Gunnar Wrobel <p at rdus.de>
-#
-# $Id$
-'''A class that provides access to values stored in a LDAP database.'''
-
-__version__ = "$Id$"
-
-#================================================================================
-#
-# Dependencies
-#
-#--------------------------------------------------------------------------------
-
-import ldap
-import ldap.modlist
-
-from kolab.exceptions import LdapException
-from kolab.kldap.utils import objectclass_filter
-
-from kolab.globals import OUT
-
-#================================================================================
-#
-# LdapStorage
-#
-#--------------------------------------------------------------------------------
-
-class Ldap(object):
- '''
- This class can be used to handle objects stored in an LDAP database.
- '''
-
- def __init__(self, uri, base_dn, users = {'default': ('', '')}):
- self.ldap_uri = uri
- self.base_dn = base_dn
- self.users = users
-
- if not 'default' in self.users.keys():
- self.users['default'] = ('', '')
-
- self.connected = False
- self.ldap = None
-
- self.set_auth()
-
- def connect(self, user = 'default'):
- ''' Connect to Ldap using the specified user. '''
- self.set_auth(user)
- self.require_connect()
-
- def set_auth(self, user = 'default'):
- ''' Change the user to the specified user. '''
- self.require_disconnect()
- self.current_user = user
- self.ldapbinddn = self.users[user][0]
- self.ldappass = self.users[user][1]
-
- def require_connect(self):
- ''' Connect to the LDAP server. '''
-
- if self.connected:
- return
- self.ldap = ldap.initialize(self.ldap_uri)
- self.ldap.protocol_version = ldap.VERSION3
- self.ldap.simple_bind_s(self.ldapbinddn, self.ldappass)
- self.connected = True
-
- def require_disconnect(self):
- ''' Disconnect from LDAP server.'''
-
- if not self.connected:
- return
- self.ldap.unbind()
- self.connected = False
-
- def add_base(self, dist_name):
- ''' Complete an object name with the base DN.'''
-
- if dist_name:
- return dist_name + ',' + self.base_dn
- else:
- return self.base_dn
-
- def del_base(self, dist_name):
- ''' Removes the base dn from an object name.'''
-
- if dist_name[-len(self.base_dn):] == self.base_dn:
- dist_name = dist_name[:-len(self.base_dn)]
- if dist_name[-1] == ',':
- return dist_name[:-1]
- else:
- return dist_name
- else:
- raise LdapException('Object "' + dist_name +
- '" does not match server name "'
- + self.base_dn + '"!')
-
- def reconnect(self):
- ''' Reconnect to the LDAP server. '''
-
- self.require_disconnect()
- self.require_connect()
-
- def fetch(self, dist_name, classes = ['*']):
- '''
- Tries to fetch the object for the given dn.
-
- -- DOCTEST START
-
- >>> class DummyLdap:
- ... def search_s(self, dn, scope, ofilter):
- ... if dn != 'cn=test,dc=example,dc=com':
- ... raise ldap.NO_SUCH_OBJECT('Not found')
- ... return [('cn=test,dc=example,dc=com',
- ... {'cn': ['test'], 'test': ['another test'],
- ... 'objectClass': ['top', 'inetOrgPerson',
- ... 'kolabInetOrgPerson',
- ... 'hordePerson']})]
- >>> class TestLdap(Ldap):
- ... def require_connect(self):
- ... self.connected = True
- ... self.ldap = DummyLdap()
- ... def require_disconnect(self):
- ... self.connected = False
- >>> a = TestLdap("ldap://127.0.0.1", "dc=example,dc=com")
- >>> a.fetch('cn=test', ['*'])
- ('cn=test', {'test': ['another test'], 'objectClass': ['top', 'inetOrgPerson', 'kolabInetOrgPerson', 'hordePerson'], 'cn': ['test']})
- >>> a.fetch('cn=test2', ['*'])
-
- -- DOCTEST END
- '''
- ofilter = objectclass_filter(classes, '&')
-
- dist_name = self.add_base(dist_name)
-
- result = []
-
- try:
- self.reconnect()
-
- OUT.debug('Searching for object', 7)
-
- result = self.ldap.search_s(dist_name, ldap.SCOPE_BASE, ofilter)
- except ldap.NO_SUCH_OBJECT, error:
- pass
- except ldap.SERVER_DOWN, error:
- raise LdapException('Server down (Filter: "' + ofilter +
- '", Bound as: "' + self.ldapbinddn +
- '", Error: ' + str(error) + ').')
-
- if len(result) == 1:
- # Returns the first object found
- return (self.del_base(result[0][0]), result[0][1])
- else:
- return None
-
- def store(self, dist_name, store_values):
- '''
- Stores the values as a new object in LDAP.
-
- -- DOCTEST START
-
- >>> class DummyLdap:
- ... def search_s(self, dn, scope, ofilter):
- ... if dn != 'cn=test,dc=example,dc=com':
- ... raise ldap.NO_SUCH_OBJECT('Not found')
- ... return [('cn=test,dc=example,dc=com',
- ... {'cn': ['test'], 'test': ['another test'],
- ... 'objectClass': ['top', 'inetOrgPerson',
- ... 'kolabInetOrgPerson',
- ... 'hordePerson']})]
- ... def add_s(self, dn, modlist):
- ... pass
- ... def rename_s(self, olddn, newdn, a, b):
- ... pass
- ... def modify_s(self, dn, modlist):
- ... pass
- ... def unbind(self):
- ... pass
- >>> class FailLdap(DummyLdap):
- ... def add_s(self, dn, modlist):
- ... raise ldap.SERVER_DOWN('Not found')
- ... def rename_s(self, olddn, newdn, a, b):
- ... raise ldap.SERVER_DOWN('Not found')
- ... def modify_s(self, dn, modlist):
- ... raise ldap.SERVER_DOWN('Not found')
- >>> class TestLdap(Ldap):
- ... def require_connect(self):
- ... self.connected = True
- ... self.ldap = DummyLdap()
- ... def require_disconnect(self):
- ... self.connected = False
- >>> class TestLdapFail(Ldap):
- ... def require_connect(self):
- ... self.connected = True
- ... self.ldap = FailLdap()
-
- >>> a = TestLdap("ldap://127.0.0.1", "dc=example,dc=com")
- >>> b = TestLdapFail("ldap://127.0.0.1", "dc=example,dc=com")
-
- Test adding a new object:
-
- >>> data = {'test': ['another test'], 'objectClass': ['top', 'inetOrgPerson', 'kolabInetOrgPerson', 'hordePerson'], 'cn': ['test2']}
-
- >>> a.store('cn=test2', data)
- True
-
- Test for failure:
-
- >>> b.store('cn=test2', data)
- Traceback (most recent call last):
- ...
- LdapException: Failed to create object "cn=test2,dc=example,dc=com".
- Error was: Not found
-
- Object update:
-
- >>> data = {'test': ['another test'], 'objectClass': ['top', 'inetOrgPerson', 'kolabInetOrgPerson', 'hordePerson'], 'cn': ['test']}
-
- >>> a.store('cn=test', data)
- True
-
- Test for failure:
-
- >>> b.store('cn=test', data)
- Traceback (most recent call last):
- ...
- LdapException: Failed to update object "cn=test,dc=example,dc=com".
- Error was: Problem modifying the LDAP object (DN: "cn=test,dc=example,dc=com,dc=example,dc=com", Bound as: "", Error: Not found).
-
- -- DOCTEST END
- '''
-
- OUT.debug('Creating/updating object in LDAP', 6)
-
- old_object = self.fetch(dist_name, ['*'])
-
- dist_name = self.add_base(dist_name)
-
- # Does the object already exist?
- if not old_object:
-
- OUT.debug('Creating new object in LDAP', 7)
-
- try:
-
- OUT.debug('Generating modlist', 7)
-
- modlist = ldap.modlist.addModlist(store_values)
-
- self.reconnect()
-
- OUT.debug('Adding object', 7)
-
- self.ldap.add_s(dist_name, modlist)
- except Exception, error:
- raise LdapException('Failed to create object "' + dist_name
- + '".\nError was: ' + str(error))
-
- return True
-
- # Object comes from LDAP or already exists
- else:
-
- OUT.debug('Object update', 7)
-
- try:
- self.reconnect()
-
- self.update(dist_name, old_object[1], store_values)
- except Exception, e:
- raise LdapException('Failed to update object "' + dist_name
- + '".\nError was: ' + str(e))
-
- OUT.debug('Object update successful', 7)
-
- return True
-
- def update(self, dist_name, old_values, new_values):
- ''' Update an object in LDAP. '''
-
- dist_name = self.add_base(dist_name)
-
- modlist = ldap.modlist.modifyModlist(old_values, new_values)
-
- self.reconnect()
- try:
- self.ldap.modify_s(dist_name, modlist)
- except ldap.SERVER_DOWN, error:
- raise LdapException('Problem modifying the LDAP object (DN: "'
- + dist_name + '", Bound as: "' + self.ldapbinddn
- + '", Error: ' + str(error) + ').')
-
- def rename(self, old_dist_name, new_dist_name):
- ''' Rename an object in LDAP. '''
-
- old_dist_name = self.add_base(old_dist_name)
- new_dist_name = self.add_base(new_dist_name)
-
- new_dist_name = ldap.explode_dn(new_dist_name)
- if len(new_dist_name) < 2:
- raise LdapException('Did not expect a single rdn as new dn!')
-
- # Delete the storage object from ldap
- try:
- self.reconnect()
- self.ldap.rename_s(old_dist_name,
- new_dist_name[0],
- ','.join(new_dist_name[1:]), 1)
- except Exception, error:
- LdapException('Failed to rename object "' + old_dist_name
- + '" to "' + new_dist_name
- + '".\nError was: ' + str(error))
-
- return True
-
- def delete(self, dist_name, object_classes = ['*']):
- ''' Delete the object from LDAP.'''
-
- OUT.debug('Deleting object', 7)
-
- dist_name = self.add_base(dist_name)
-
- # Delete the storage object from ldap
- try:
- self.reconnect()
- self.ldap.delete_s(dist_name)
- except Exception, error:
- LdapException('Failed to delete object "' + dist_name
- + '".\nError was: ' + str(error))
-
- return True
-
-#================================================================================
-#
-# Testing
-#
-#--------------------------------------------------------------------------------
-
-if __name__ == '__main__':
- import doctest, sys
- doctest.testmod(sys.modules[__name__])
More information about the Python-kolab-commits
mailing list