dirsrvtests/suites/acl/acl_test.py | 953 +++++++++++++++-
dirsrvtests/tickets/ticket47553_ger.py | 553 ---------
dirsrvtests/tickets/ticket47553_rdn_write_test.py | 135 --
dirsrvtests/tickets/ticket47553_single_aci_test.py | 1224 ---------------------
4 files changed, 929 insertions(+), 1936 deletions(-)
New commits:
commit 8b097d1c40facaa3e4d93293bffb3a5e322227a2
Author: Simon Pichugin <spichugi(a)redhat.com>
Date: Wed Sep 2 11:27:36 2015 +0200
Ticket 48264 - Ticket 47553 tests refactoring
Description: Refactor these tests according to the pytest and PEP8 way:
- ticket47553_ger.py;
- ticket47553_rdn_write_test.py;
- ticket47553_single_aci_test.py.
Move them to the dirsrvtests/suites/acl/acl_test.py test suite.
https://fedorahosted.org/389/ticket/48264
Reviewed by: mreynolds(a)redhat.com (Thanks!)
(cherry picked from commit edaa28b1266312ef8dce737af46e09e130c7135a)
diff --git a/dirsrvtests/suites/acl/acl_test.py b/dirsrvtests/suites/acl/acl_test.py
index b85ee22..fd1092b 100644
--- a/dirsrvtests/suites/acl/acl_test.py
+++ b/dirsrvtests/suites/acl/acl_test.py
@@ -10,48 +10,167 @@ from lib389._constants import *
from lib389.properties import *
from lib389.tasks import *
from lib389.utils import *
+from ldap.controls.simple import GetEffectiveRightsControl
logging.getLogger(__name__).setLevel(logging.DEBUG)
log = logging.getLogger(__name__)
+#
+# important part. We can deploy Master1 and Master2 on different versions
+#
installation1_prefix = None
+installation2_prefix = None
+TEST_REPL_DN = "cn=test_repl, %s" % SUFFIX
-class TopologyStandalone(object):
- def __init__(self, standalone):
- standalone.open()
- self.standalone = standalone
+STAGING_CN = "staged user"
+PRODUCTION_CN = "accounts"
+EXCEPT_CN = "excepts"
+
+STAGING_DN = "cn=%s,%s" % (STAGING_CN, SUFFIX)
+PRODUCTION_DN = "cn=%s,%s" % (PRODUCTION_CN, SUFFIX)
+PROD_EXCEPT_DN = "cn=%s,%s" % (EXCEPT_CN, PRODUCTION_DN)
+
+STAGING_PATTERN = "cn=%s*,%s" % (STAGING_CN[:2], SUFFIX)
+PRODUCTION_PATTERN = "cn=%s*,%s" % (PRODUCTION_CN[:2], SUFFIX)
+BAD_STAGING_PATTERN = "cn=bad*,%s" % (SUFFIX)
+BAD_PRODUCTION_PATTERN = "cn=bad*,%s" % (SUFFIX)
+
+BIND_CN = "bind_entry"
+BIND_DN = "cn=%s,%s" % (BIND_CN, SUFFIX)
+BIND_PW = "password"
+
+NEW_ACCOUNT = "new_account"
+MAX_ACCOUNTS = 20
+
+CONFIG_MODDN_ACI_ATTR = "nsslapd-moddn-aci"
+
+SRC_ENTRY_CN = "tuser"
+EXT_RDN = "01"
+DST_ENTRY_CN = SRC_ENTRY_CN + EXT_RDN
+
+SRC_ENTRY_DN = "cn=%s,%s" % (SRC_ENTRY_CN, SUFFIX)
+DST_ENTRY_DN = "cn=%s,%s" % (DST_ENTRY_CN, SUFFIX)
+
+
+class TopologyMaster1Master2(object):
+ def __init__(self, master1, master2):
+ master1.open()
+ self.master1 = master1
+
+ master2.open()
+ self.master2 = master2
@pytest.fixture(scope="module")
def topology(request):
+ """This fixture is used to create a replicated topology for the
'module'.
+ The replicated topology is MASTER1 <-> Master2.
+ """
+
global installation1_prefix
+ global installation2_prefix
+
+ # allocate master1 on a given deployement
+ master1 = DirSrv(verbose=False)
if installation1_prefix:
args_instance[SER_DEPLOYED_DIR] = installation1_prefix
- # Creating standalone instance ...
- standalone = DirSrv(verbose=False)
- args_instance[SER_HOST] = HOST_STANDALONE
- args_instance[SER_PORT] = PORT_STANDALONE
- args_instance[SER_SERVERID_PROP] = SERVERID_STANDALONE
- args_instance[SER_CREATION_SUFFIX] = DEFAULT_SUFFIX
- args_standalone = args_instance.copy()
- standalone.allocate(args_standalone)
- instance_standalone = standalone.exists()
- if instance_standalone:
- standalone.delete()
- standalone.create()
- standalone.open()
-
- # Delete each instance in the end
+ # Args for the master1 instance
+ args_instance[SER_HOST] = HOST_MASTER_1
+ args_instance[SER_PORT] = PORT_MASTER_1
+ args_instance[SER_SERVERID_PROP] = SERVERID_MASTER_1
+ args_master = args_instance.copy()
+ master1.allocate(args_master)
+
+ # allocate master1 on a given deployement
+ master2 = DirSrv(verbose=False)
+ if installation2_prefix:
+ args_instance[SER_DEPLOYED_DIR] = installation2_prefix
+
+ # Args for the consumer instance
+ args_instance[SER_HOST] = HOST_MASTER_2
+ args_instance[SER_PORT] = PORT_MASTER_2
+ args_instance[SER_SERVERID_PROP] = SERVERID_MASTER_2
+ args_master = args_instance.copy()
+ master2.allocate(args_master)
+
+ # Get the status of the instance and restart it if it exists
+ instance_master1 = master1.exists()
+ instance_master2 = master2.exists()
+
+ # Remove all the instances
+ if instance_master1:
+ master1.delete()
+ if instance_master2:
+ master2.delete()
+
+ # Create the instances
+ master1.create()
+ master1.open()
+ master2.create()
+ master2.open()
+
+ #
+ # Now prepare the Master-Consumer topology
+ #
+ # First Enable replication
+ master1.replica.enableReplication(suffix=SUFFIX,
+ role=REPLICAROLE_MASTER,
+ replicaId=REPLICAID_MASTER_1)
+ master2.replica.enableReplication(suffix=SUFFIX,
+ role=REPLICAROLE_MASTER,
+ replicaId=REPLICAID_MASTER_2)
+
+ # Initialize the supplier->consumer
+
+ properties = {RA_NAME: r'meTo_$host:$port',
+ RA_BINDDN: defaultProperties[REPLICATION_BIND_DN],
+ RA_BINDPW: defaultProperties[REPLICATION_BIND_PW],
+ RA_METHOD: defaultProperties[REPLICATION_BIND_METHOD],
+ RA_TRANSPORT_PROT: defaultProperties[REPLICATION_TRANSPORT]}
+ repl_agreement = master1.agreement.create(suffix=SUFFIX,
+ host=master2.host,
+ port=master2.port,
+ properties=properties)
+
+ if not repl_agreement:
+ log.fatal("Fail to create a replica agreement")
+ sys.exit(1)
+
+ log.debug("%s created" % repl_agreement)
+
+ properties = {RA_NAME: r'meTo_$host:$port',
+ RA_BINDDN: defaultProperties[REPLICATION_BIND_DN],
+ RA_BINDPW: defaultProperties[REPLICATION_BIND_PW],
+ RA_METHOD: defaultProperties[REPLICATION_BIND_METHOD],
+ RA_TRANSPORT_PROT: defaultProperties[REPLICATION_TRANSPORT]}
+ master2.agreement.create(suffix=SUFFIX,
+ host=master1.host,
+ port=master1.port,
+ properties=properties)
+
+ master1.agreement.init(SUFFIX, HOST_MASTER_2, PORT_MASTER_2)
+ master1.waitForReplInit(repl_agreement)
+
+ # Check replication is working fine
+ if master1.testReplication(DEFAULT_SUFFIX, master2):
+ log.info('Replication is working.')
+ else:
+ log.fatal('Replication is not working.')
+ assert False
+
def fin():
- standalone.delete()
+ master1.delete()
+ master2.delete()
request.addfinalizer(fin)
- # Clear out the tmp dir
- standalone.clearTmpDir(__file__)
+ # clear the tmp directory
+ master1.clearTmpDir(__file__)
- return TopologyStandalone(standalone)
+ # Here we have two instances master and consumer
+ # with replication working.
+ return TopologyMaster1Master2(master1, master2)
def add_attr(topology, attr_name):
@@ -127,7 +246,7 @@ def test_aci_attr_subtype_targetattr(topology,
aci_with_attr_subtype):
log.info(" Search for the added attribute")
try:
- entries = topology.standalone.search_s(DEFAULT_SUFFIX,
+ entries = topology.master1.search_s(DEFAULT_SUFFIX,
ldap.SCOPE_BASE,
'(objectclass=*)',
['aci'])
entry = str(entries[0])
@@ -139,6 +258,792 @@ def test_aci_attr_subtype_targetattr(topology,
aci_with_attr_subtype):
assert False
+def _bind_manager(topology):
+ topology.master1.log.info("Bind as %s " % DN_DM)
+ topology.master1.simple_bind_s(DN_DM, PASSWORD)
+
+
+def _bind_normal(topology):
+ # bind as bind_entry
+ topology.master1.log.info("Bind as %s" % BIND_DN)
+ topology.master1.simple_bind_s(BIND_DN, BIND_PW)
+
+
+def _moddn_aci_deny_tree(topology, mod_type=None,
+ target_from=STAGING_DN, target_to=PROD_EXCEPT_DN):
+ """It denies the access moddn_to in
cn=except,cn=accounts,SUFFIX"""
+
+ assert mod_type is not None
+
+ ACI_TARGET_FROM = ""
+ ACI_TARGET_TO = ""
+ if target_from:
+ ACI_TARGET_FROM = "(target_from = \"ldap:///%s\")" %
(target_from)
+ if target_to:
+ ACI_TARGET_TO = "(target_to = \"ldap:///%s\")" %
(target_to)
+
+ ACI_ALLOW = "(version 3.0; acl \"Deny MODDN to prod_except\";
deny (moddn)"
+ ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
+ ACI_BODY = ACI_TARGET_TO + ACI_TARGET_FROM + ACI_ALLOW + ACI_SUBJECT
+ mod = [(mod_type, 'aci', ACI_BODY)]
+ #topology.master1.modify_s(SUFFIX, mod)
+ topology.master1.log.info("Add a DENY aci under %s " % PROD_EXCEPT_DN)
+ topology.master1.modify_s(PROD_EXCEPT_DN, mod)
+
+
+def _write_aci_staging(topology, mod_type=None):
+ assert mod_type is not None
+
+ ACI_TARGET = "(targetattr=
\"cn\")(target=\"ldap:///cn=*,%s\")" % STAGING_DN
+ ACI_ALLOW = "(version 3.0; acl \"write staging entries\"; allow
(write)"
+ ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
+ ACI_BODY = ACI_TARGET + ACI_ALLOW + ACI_SUBJECT
+ mod = [(mod_type, 'aci', ACI_BODY)]
+ topology.master1.modify_s(SUFFIX, mod)
+
+
+def _write_aci_production(topology, mod_type=None):
+ assert mod_type is not None
+
+ ACI_TARGET = "(targetattr=
\"cn\")(target=\"ldap:///cn=*,%s\")" % PRODUCTION_DN
+ ACI_ALLOW = "(version 3.0; acl \"write production entries\";
allow (write)"
+ ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
+ ACI_BODY = ACI_TARGET + ACI_ALLOW + ACI_SUBJECT
+ mod = [(mod_type, 'aci', ACI_BODY)]
+ topology.master1.modify_s(SUFFIX, mod)
+
+
+def _moddn_aci_staging_to_production(topology, mod_type=None,
+ target_from=STAGING_DN, target_to=PRODUCTION_DN):
+ assert mod_type is not None
+
+
+ ACI_TARGET_FROM = ""
+ ACI_TARGET_TO = ""
+ if target_from:
+ ACI_TARGET_FROM = "(target_from = \"ldap:///%s\")" %
(target_from)
+ if target_to:
+ ACI_TARGET_TO = "(target_to = \"ldap:///%s\")" %
(target_to)
+
+ ACI_ALLOW = "(version 3.0; acl \"MODDN from staging to
production\"; allow (moddn)"
+ ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
+ ACI_BODY = ACI_TARGET_FROM + ACI_TARGET_TO + ACI_ALLOW + ACI_SUBJECT
+ mod = [(mod_type, 'aci', ACI_BODY)]
+ topology.master1.modify_s(SUFFIX, mod)
+
+ _write_aci_staging(topology, mod_type=mod_type)
+
+
+def _moddn_aci_from_production_to_staging(topology, mod_type=None):
+ assert mod_type is not None
+
+ ACI_TARGET = "(target_from = \"ldap:///%s\") (target_to =
\"ldap:///%s\")" % (
+ PRODUCTION_DN, STAGING_DN)
+ ACI_ALLOW = "(version 3.0; acl \"MODDN from production to
staging\"; allow (moddn)"
+ ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
+ ACI_BODY = ACI_TARGET + ACI_ALLOW + ACI_SUBJECT
+ mod = [(mod_type, 'aci', ACI_BODY)]
+ topology.master1.modify_s(SUFFIX, mod)
+
+ _write_aci_production(topology, mod_type=mod_type)
+
+
+(a)pytest.fixture(scope="module")
+def moddn_setup(topology):
+ """Creates
+ - a staging DIT
+ - a production DIT
+ - add accounts in staging DIT
+ - enable ACL logging (commented for performance reason)
+ """
+
+ topology.master1.log.info("\n\n######## INITIALIZATION ########\n")
+
+ # entry used to bind with
+ topology.master1.log.info("Add %s" % BIND_DN)
+ topology.master1.add_s(Entry((BIND_DN, {
+ 'objectclass': "top
person".split(),
+ 'sn': BIND_CN,
+ 'cn': BIND_CN,
+ 'userpassword': BIND_PW})))
+
+ # DIT for staging
+ topology.master1.log.info("Add %s" % STAGING_DN)
+ topology.master1.add_s(Entry((STAGING_DN, {
+ 'objectclass': "top
organizationalRole".split(),
+ 'cn': STAGING_CN,
+ 'description': "staging
DIT"})))
+
+ # DIT for production
+ topology.master1.log.info("Add %s" % PRODUCTION_DN)
+ topology.master1.add_s(Entry((PRODUCTION_DN, {
+ 'objectclass': "top
organizationalRole".split(),
+ 'cn': PRODUCTION_CN,
+ 'description': "production
DIT"})))
+
+ # DIT for production/except
+ topology.master1.log.info("Add %s" % PROD_EXCEPT_DN)
+ topology.master1.add_s(Entry((PROD_EXCEPT_DN, {
+ 'objectclass': "top
organizationalRole".split(),
+ 'cn': EXCEPT_CN,
+ 'description': "production
except DIT"})))
+
+ # enable acl error logging
+ #mod = [(ldap.MOD_REPLACE, 'nsslapd-errorlog-level', '128')]
+ #topology.master1.modify_s(DN_CONFIG, mod)
+ #topology.master2.modify_s(DN_CONFIG, mod)
+
+ # add dummy entries in the staging DIT
+ for cpt in range(MAX_ACCOUNTS):
+ name = "%s%d" % (NEW_ACCOUNT, cpt)
+ topology.master1.add_s(Entry(("cn=%s,%s" % (name, STAGING_DN), {
+ 'objectclass': "top
person".split(),
+ 'sn': name,
+ 'cn': name})))
+
+
+def test_mode_default_add_deny(topology, moddn_setup):
+ """This test case checks
+ that the ADD operation fails (no ADD aci on production)
+ """
+
+ topology.master1.log.info("\n\n######## mode moddn_aci : ADD (should fail)
########\n")
+
+ _bind_normal(topology)
+
+ #
+ # First try to add an entry in production => INSUFFICIENT_ACCESS
+ #
+ try:
+ topology.master1.log.info("Try to add %s" % PRODUCTION_DN)
+ name = "%s%d" % (NEW_ACCOUNT, 0)
+ topology.master1.add_s(Entry(("cn=%s,%s" % (name, PRODUCTION_DN), {
+ 'objectclass': "top
person".split(),
+ 'sn': name,
+ 'cn': name})))
+ assert 0 # this is an error, we should not be allowed to add an entry in
production
+ except Exception as e:
+ topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
+ assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
+
+
+def test_mode_default_delete_deny(topology, moddn_setup):
+ """This test case checks
+ that the DEL operation fails (no 'delete' aci on production)
+ """
+
+ topology.master1.log.info("\n\n######## DELETE (should fail) ########\n")
+
+ _bind_normal(topology)
+ #
+ # Second try to delete an entry in staging => INSUFFICIENT_ACCESS
+ #
+ try:
+ topology.master1.log.info("Try to delete %s" % STAGING_DN)
+ name = "%s%d" % (NEW_ACCOUNT, 0)
+ topology.master1.delete_s("cn=%s,%s" % (name, STAGING_DN))
+ assert 0 # this is an error, we should not be allowed to add an entry in
production
+ except Exception as e:
+ topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
+ assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
+
+
+(a)pytest.mark.parametrize("index,tfrom,tto,failure",
+ [(0, STAGING_DN, PRODUCTION_DN, False),
+ (1, STAGING_DN, PRODUCTION_DN, False),
+ (2, STAGING_DN, BAD_PRODUCTION_PATTERN, True),
+ (3, STAGING_PATTERN, PRODUCTION_DN, False),
+ (4, BAD_STAGING_PATTERN, PRODUCTION_DN, True),
+ (5, STAGING_PATTERN, PRODUCTION_PATTERN, False),
+ (6, None, PRODUCTION_PATTERN, False),
+ (7, STAGING_PATTERN, None, False),
+ (8, None, None, False)])
+def test_moddn_staging_prod(topology, moddn_setup,
+ index, tfrom, tto, failure):
+ """This test case MOVE entry NEW_ACCOUNT0 from staging to prod
+ target_to/target_from: equality filter
+ """
+
+ topology.master1.log.info("\n\n######## MOVE staging -> Prod (%s)
########\n" % index)
+ _bind_normal(topology)
+
+ old_rdn = "cn=%s%s" % (NEW_ACCOUNT, index)
+ old_dn = "%s,%s" % (old_rdn, STAGING_DN)
+ new_rdn = old_rdn
+ new_superior = PRODUCTION_DN
+
+ #
+ # Try to rename without the apropriate ACI => INSUFFICIENT_ACCESS
+ #
+ try:
+ topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
+ topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
+ assert 0
+ except AssertionError:
+ topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
+ except Exception as e:
+ topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
+ assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
+
+
+ # successfull MOD with the ACI
+ topology.master1.log.info("\n\n######## MOVE to and from equality filter
########\n")
+ _bind_manager(topology)
+ _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
+ target_from=tfrom, target_to=tto)
+ _bind_normal(topology)
+
+ try:
+ topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
+ topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
+ except Exception as e:
+ topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
+ if failure:
+ assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
+
+ # successfull MOD with the both ACI
+ _bind_manager(topology)
+ _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
+ target_from=tfrom, target_to=tto)
+ _bind_normal(topology)
+
+
+def test_moddn_staging_prod_9(topology, moddn_setup):
+ """This test case disable the 'moddn' right so a MODDN
requires a 'add' right
+ to be successfull.
+ It fails to MOVE entry NEW_ACCOUNT9 from staging to prod.
+ Add a 'add' right to prod.
+ Then it succeeds to MOVE NEW_ACCOUNT9 from staging to prod.
+
+ Then enable the 'moddn' right so a MODDN requires a 'moddn' right
+ It fails to MOVE entry NEW_ACCOUNT10 from staging to prod.
+ Add a 'moddn' right to prod.
+ Then it succeeds to MOVE NEW_ACCOUNT10 from staging to prod.
+ """
+
+ topology.master1.log.info("\n\n######## MOVE staging -> Prod (9)
########\n")
+
+ _bind_normal(topology)
+ old_rdn = "cn=%s9" % NEW_ACCOUNT
+ old_dn = "%s,%s" % (old_rdn, STAGING_DN)
+ new_rdn = old_rdn
+ new_superior = PRODUCTION_DN
+
+ #
+ # Try to rename without the apropriate ACI => INSUFFICIENT_ACCESS
+ #
+ try:
+ topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
+ topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
+ assert 0
+ except AssertionError:
+ topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
+ except Exception as e:
+ topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
+ assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
+
+ #############
+ # Now do tests with no support of moddn aci
+ #############
+ topology.master1.log.info("Disable the moddn right")
+ _bind_manager(topology)
+ mod = [(ldap.MOD_REPLACE, CONFIG_MODDN_ACI_ATTR, 'off')]
+ topology.master1.modify_s(DN_CONFIG, mod)
+
+ # Add the moddn aci that will not be evaluated because of the config flag
+ topology.master1.log.info("\n\n######## MOVE to and from equality filter
########\n")
+ _bind_manager(topology)
+ _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
+ target_from=STAGING_DN, target_to=PRODUCTION_DN)
+ _bind_normal(topology)
+
+ # It will fail because it will test the ADD right
+ try:
+ topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
+ topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
+ assert 0
+ except AssertionError:
+ topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
+ except Exception as e:
+ topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
+ assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
+
+ # remove the moddn aci
+ _bind_manager(topology)
+ _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
+ target_from=STAGING_DN, target_to=PRODUCTION_DN)
+ _bind_normal(topology)
+
+ #
+ # add the 'add' right to the production DN
+ # Then do a successfull moddn
+ #
+ ACI_ALLOW = "(version 3.0; acl \"ADD rights to allow moddn\";
allow (add)"
+ ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
+ ACI_BODY = ACI_ALLOW + ACI_SUBJECT
+
+ _bind_manager(topology)
+ mod = [(ldap.MOD_ADD, 'aci', ACI_BODY)]
+ topology.master1.modify_s(PRODUCTION_DN, mod)
+ _write_aci_staging(topology, mod_type=ldap.MOD_ADD)
+ _bind_normal(topology)
+
+ topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn,
new_superior))
+ topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
+
+ _bind_manager(topology)
+ mod = [(ldap.MOD_DELETE, 'aci', ACI_BODY)]
+ topology.master1.modify_s(PRODUCTION_DN, mod)
+ _write_aci_staging(topology, mod_type=ldap.MOD_DELETE)
+ _bind_normal(topology)
+
+ #############
+ # Now do tests with support of moddn aci
+ #############
+ topology.master1.log.info("Enable the moddn right")
+ _bind_manager(topology)
+ mod = [(ldap.MOD_REPLACE, CONFIG_MODDN_ACI_ATTR, 'on')]
+ topology.master1.modify_s(DN_CONFIG, mod)
+
+ topology.master1.log.info("\n\n######## MOVE staging -> Prod (10)
########\n")
+
+ _bind_normal(topology)
+ old_rdn = "cn=%s10" % NEW_ACCOUNT
+ old_dn = "%s,%s" % (old_rdn, STAGING_DN)
+ new_rdn = old_rdn
+ new_superior = PRODUCTION_DN
+
+ #
+ # Try to rename without the apropriate ACI => INSUFFICIENT_ACCESS
+ #
+ try:
+ topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
+ topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
+ assert 0
+ except AssertionError:
+ topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
+ except Exception as e:
+ topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
+ assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
+
+ #
+ # add the 'add' right to the production DN
+ # Then do a failing moddn
+ #
+ ACI_ALLOW = "(version 3.0; acl \"ADD rights to allow moddn\";
allow (add)"
+ ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
+ ACI_BODY = ACI_ALLOW + ACI_SUBJECT
+
+ _bind_manager(topology)
+ mod = [(ldap.MOD_ADD, 'aci', ACI_BODY)]
+ topology.master1.modify_s(PRODUCTION_DN, mod)
+ _write_aci_staging(topology, mod_type=ldap.MOD_ADD)
+ _bind_normal(topology)
+
+ try:
+ topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
+ topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
+ assert 0
+ except AssertionError:
+ topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
+ except Exception as e:
+ topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
+ assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
+
+ _bind_manager(topology)
+ mod = [(ldap.MOD_DELETE, 'aci', ACI_BODY)]
+ topology.master1.modify_s(PRODUCTION_DN, mod)
+ _write_aci_staging(topology, mod_type=ldap.MOD_DELETE)
+ _bind_normal(topology)
+
+ # Add the moddn aci that will be evaluated because of the config flag
+ topology.master1.log.info("\n\n######## MOVE to and from equality filter
########\n")
+ _bind_manager(topology)
+ _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
+ target_from=STAGING_DN, target_to=PRODUCTION_DN)
+ _bind_normal(topology)
+
+ topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn,
new_superior))
+ topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
+
+ # remove the moddn aci
+ _bind_manager(topology)
+ _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
+ target_from=STAGING_DN, target_to=PRODUCTION_DN)
+ _bind_normal(topology)
+
+
+def test_moddn_prod_staging(topology, moddn_setup):
+ """This test checks that we can move ACCOUNT11 from staging to prod
+ but not move back ACCOUNT11 from prod to staging
+ """
+
+ topology.master1.log.info("\n\n######## MOVE staging -> Prod (11)
########\n")
+
+ _bind_normal(topology)
+
+ old_rdn = "cn=%s11" % NEW_ACCOUNT
+ old_dn = "%s,%s" % (old_rdn, STAGING_DN)
+ new_rdn = old_rdn
+ new_superior = PRODUCTION_DN
+
+ #
+ # Try to rename without the apropriate ACI => INSUFFICIENT_ACCESS
+ #
+ try:
+ topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
+ topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
+ assert 0
+ except AssertionError:
+ topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
+ except Exception as e:
+ topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
+ assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
+
+ # successfull MOD with the ACI
+ topology.master1.log.info("\n\n######## MOVE to and from equality filter
########\n")
+ _bind_manager(topology)
+ _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
+ target_from=STAGING_DN, target_to=PRODUCTION_DN)
+ _bind_normal(topology)
+
+ topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn,
new_superior))
+ topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
+
+ # Now check we can not move back the entry to staging
+ old_rdn = "cn=%s11" % NEW_ACCOUNT
+ old_dn = "%s,%s" % (old_rdn, PRODUCTION_DN)
+ new_rdn = old_rdn
+ new_superior = STAGING_DN
+
+ # add the write right because we want to check the moddn
+ _bind_manager(topology)
+ _write_aci_production(topology, mod_type=ldap.MOD_ADD)
+ _bind_normal(topology)
+
+ try:
+ topology.master1.log.info("Try to move back MODDN %s -> %s,%s" %
(old_dn, new_rdn, new_superior))
+ topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
+ assert 0
+ except AssertionError:
+ topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
+ except Exception as e:
+ topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
+ assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
+
+ _bind_manager(topology)
+ _write_aci_production(topology, mod_type=ldap.MOD_DELETE)
+ _bind_normal(topology)
+
+ # successfull MOD with the both ACI
+ _bind_manager(topology)
+ _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
+ target_from=STAGING_DN, target_to=PRODUCTION_DN)
+ _bind_normal(topology)
+
+
+def test_check_repl_M2_to_M1(topology, moddn_setup):
+ """Checks that replication is still working M2->M1, using
ACCOUNT12"""
+
+ topology.master1.log.info("Bind as %s (M2)" % DN_DM)
+ topology.master2.simple_bind_s(DN_DM, PASSWORD)
+
+ rdn = "cn=%s12" % NEW_ACCOUNT
+ dn = "%s,%s" % (rdn, STAGING_DN)
+
+ # First wait for the ACCOUNT19 entry being replicated on M2
+ loop = 0
+ while loop <= 10:
+ try:
+ ent = topology.master2.getEntry(dn, ldap.SCOPE_BASE,
"(objectclass=*)")
+ break
+ except ldap.NO_SUCH_OBJECT:
+ time.sleep(1)
+ loop += 1
+ assert loop <= 10
+
+ attribute = 'description'
+ tested_value = 'Hello world'
+ mod = [(ldap.MOD_ADD, attribute, tested_value)]
+ topology.master1.log.info("Update (M2) %s (%s)" % (dn, attribute))
+ topology.master2.modify_s(dn, mod)
+
+ loop = 0
+ while loop <= 10:
+ ent = topology.master1.getEntry(dn, ldap.SCOPE_BASE,
"(objectclass=*)")
+ assert ent is not None
+ if ent.hasAttr(attribute) and (ent.getValue(attribute) == tested_value):
+ break
+
+ time.sleep(1)
+ loop += 1
+ assert loop < 10
+ topology.master1.log.info("Update %s (%s) replicated on M1" % (dn,
attribute))
+
+
+def test_moddn_staging_prod_except(topology, moddn_setup):
+ """This test case MOVE entry NEW_ACCOUNT13 from staging to prod
+ but fails to move entry NEW_ACCOUNT14 from staging to prod_except
+ """
+
+ topology.master1.log.info("\n\n######## MOVE staging -> Prod (13)
########\n")
+ _bind_normal(topology)
+
+ old_rdn = "cn=%s13" % NEW_ACCOUNT
+ old_dn = "%s,%s" % (old_rdn, STAGING_DN)
+ new_rdn = old_rdn
+ new_superior = PRODUCTION_DN
+
+ #
+ # Try to rename without the apropriate ACI => INSUFFICIENT_ACCESS
+ #
+ try:
+ topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
+ topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
+ assert 0
+ except AssertionError:
+ topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
+ except Exception as e:
+ topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
+ assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
+
+ # successfull MOD with the ACI
+ topology.master1.log.info("\n\n######## MOVE to and from equality filter
########\n")
+ _bind_manager(topology)
+ _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
+ target_from=STAGING_DN, target_to=PRODUCTION_DN)
+ _moddn_aci_deny_tree(topology, mod_type=ldap.MOD_ADD)
+ _bind_normal(topology)
+
+ topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn,
new_superior))
+ topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
+
+ #
+ # Now try to move an entry under except
+ #
+ topology.master1.log.info("\n\n######## MOVE staging -> Prod/Except (14)
########\n")
+ old_rdn = "cn=%s14" % NEW_ACCOUNT
+ old_dn = "%s,%s" % (old_rdn, STAGING_DN)
+ new_rdn = old_rdn
+ new_superior = PROD_EXCEPT_DN
+ try:
+ topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
+ topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
+ assert 0
+ except AssertionError:
+ topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
+ except Exception as e:
+ topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
+ assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
+
+ # successfull MOD with the both ACI
+ _bind_manager(topology)
+ _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
+ target_from=STAGING_DN, target_to=PRODUCTION_DN)
+ _moddn_aci_deny_tree(topology, mod_type=ldap.MOD_DELETE)
+ _bind_normal(topology)
+
+
+def test_mode_default_ger_no_moddn(topology, moddn_setup):
+ topology.master1.log.info("\n\n######## mode moddn_aci : GER no moddn
########\n")
+ request_ctrl = GetEffectiveRightsControl(criticality=True, authzId="dn: " +
BIND_DN)
+ msg_id = topology.master1.search_ext(PRODUCTION_DN,
+ ldap.SCOPE_SUBTREE,
+ "objectclass=*",
+ serverctrls=[request_ctrl])
+ rtype, rdata, rmsgid, response_ctrl = topology.master1.result3(msg_id)
+ #ger={}
+ value = ''
+ for dn, attrs in rdata:
+ topology.master1.log.info("dn: %s" % dn)
+ value = attrs['entryLevelRights'][0]
+
+ topology.master1.log.info("######## entryLevelRights: %r" % value)
+ assert 'n' not in value
+
+
+def test_mode_default_ger_with_moddn(topology, moddn_setup):
+ """This test case adds the moddn aci and check ger contains
'n'"""
+
+ topology.master1.log.info("\n\n######## mode moddn_aci: GER with moddn
########\n")
+
+ # successfull MOD with the ACI
+ _bind_manager(topology)
+ _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
+ target_from=STAGING_DN, target_to=PRODUCTION_DN)
+ _bind_normal(topology)
+
+ request_ctrl = GetEffectiveRightsControl(criticality=True, authzId="dn: " +
BIND_DN)
+ msg_id = topology.master1.search_ext(PRODUCTION_DN,
+ ldap.SCOPE_SUBTREE,
+ "objectclass=*",
+ serverctrls=[request_ctrl])
+ rtype, rdata, rmsgid, response_ctrl = topology.master1.result3(msg_id)
+ #ger={}
+ value = ''
+ for dn, attrs in rdata:
+ topology.master1.log.info("dn: %s" % dn)
+ value = attrs['entryLevelRights'][0]
+
+ topology.master1.log.info("######## entryLevelRights: %r" % value)
+ assert 'n' in value
+
+ # successfull MOD with the both ACI
+ _bind_manager(topology)
+ _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
+ target_from=STAGING_DN, target_to=PRODUCTION_DN)
+ _bind_normal(topology)
+
+
+def test_mode_switch_default_to_legacy(topology, moddn_setup):
+ """This test switch the server from default mode to
legacy"""
+
+ topology.master1.log.info("\n\n######## Disable the moddn aci mod
########\n")
+ _bind_manager(topology)
+ mod = [(ldap.MOD_REPLACE, CONFIG_MODDN_ACI_ATTR, 'off')]
+ topology.master1.modify_s(DN_CONFIG, mod)
+
+
+def test_mode_legacy_ger_no_moddn1(topology, moddn_setup):
+ topology.master1.log.info("\n\n######## mode legacy 1: GER no moddn
########\n")
+ request_ctrl = GetEffectiveRightsControl(criticality=True, authzId="dn: " +
BIND_DN)
+ msg_id = topology.master1.search_ext(PRODUCTION_DN,
+ ldap.SCOPE_SUBTREE,
+ "objectclass=*",
+ serverctrls=[request_ctrl])
+ rtype, rdata, rmsgid, response_ctrl = topology.master1.result3(msg_id)
+ #ger={}
+ value = ''
+ for dn, attrs in rdata:
+ topology.master1.log.info("dn: %s" % dn)
+ value = attrs['entryLevelRights'][0]
+
+ topology.master1.log.info("######## entryLevelRights: %r" % value)
+ assert 'n' not in value
+
+
+def test_mode_legacy_ger_no_moddn2(topology, moddn_setup):
+ topology.master1.log.info("\n\n######## mode legacy 2: GER no moddn
########\n")
+ # successfull MOD with the ACI
+ _bind_manager(topology)
+ _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
+ target_from=STAGING_DN, target_to=PRODUCTION_DN)
+ _bind_normal(topology)
+
+ request_ctrl = GetEffectiveRightsControl(criticality=True, authzId="dn: " +
BIND_DN)
+ msg_id = topology.master1.search_ext(PRODUCTION_DN,
+ ldap.SCOPE_SUBTREE,
+ "objectclass=*",
+ serverctrls=[request_ctrl])
+ rtype, rdata, rmsgid, response_ctrl = topology.master1.result3(msg_id)
+ #ger={}
+ value = ''
+ for dn, attrs in rdata:
+ topology.master1.log.info("dn: %s" % dn)
+ value = attrs['entryLevelRights'][0]
+
+ topology.master1.log.info("######## entryLevelRights: %r" % value)
+ assert 'n' not in value
+
+ # successfull MOD with the both ACI
+ _bind_manager(topology)
+ _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
+ target_from=STAGING_DN, target_to=PRODUCTION_DN)
+ _bind_normal(topology)
+
+
+def test_mode_legacy_ger_with_moddn(topology, moddn_setup):
+ topology.master1.log.info("\n\n######## mode legacy : GER with moddn
########\n")
+
+ # being allowed to read/write the RDN attribute use to allow the RDN
+ ACI_TARGET = "(target =
\"ldap:///%s\")(targetattr=\"cn\")" % (PRODUCTION_DN)
+ ACI_ALLOW = "(version 3.0; acl \"MODDN production changing the RDN
attribute\"; allow (read,search,write)"
+ ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
+ ACI_BODY = ACI_TARGET + ACI_ALLOW + ACI_SUBJECT
+
+ # successfull MOD with the ACI
+ _bind_manager(topology)
+ mod = [(ldap.MOD_ADD, 'aci', ACI_BODY)]
+ topology.master1.modify_s(SUFFIX, mod)
+ _bind_normal(topology)
+
+ request_ctrl = GetEffectiveRightsControl(criticality=True, authzId="dn: " +
BIND_DN)
+ msg_id = topology.master1.search_ext(PRODUCTION_DN,
+ ldap.SCOPE_SUBTREE,
+ "objectclass=*",
+ serverctrls=[request_ctrl])
+ rtype, rdata, rmsgid, response_ctrl = topology.master1.result3(msg_id)
+ #ger={}
+ value = ''
+ for dn, attrs in rdata:
+ topology.master1.log.info("dn: %s" % dn)
+ value = attrs['entryLevelRights'][0]
+
+ topology.master1.log.info("######## entryLevelRights: %r" % value)
+ assert 'n' in value
+
+ # successfull MOD with the both ACI
+ _bind_manager(topology)
+ mod = [(ldap.MOD_DELETE, 'aci', ACI_BODY)]
+ topology.master1.modify_s(SUFFIX, mod)
+ #_bind_normal(topology)
+
+
+(a)pytest.fixture(scope="module")
+def rdn_write_setup(topology):
+ topology.master1.log.info("\n\n######## Add entry tuser ########\n")
+ topology.master1.add_s(Entry((SRC_ENTRY_DN, {
+ 'objectclass': "top
person".split(),
+ 'sn': SRC_ENTRY_CN,
+ 'cn': SRC_ENTRY_CN})))
+
+
+def test_rdn_write_get_ger(topology, rdn_write_setup):
+ ANONYMOUS_DN = ""
+ topology.master1.log.info("\n\n######## GER rights for anonymous
########\n")
+ request_ctrl = GetEffectiveRightsControl(criticality=True,
+ authzId="dn:" + ANONYMOUS_DN)
+ msg_id = topology.master1.search_ext(SUFFIX,
+ ldap.SCOPE_SUBTREE,
+ "objectclass=*",
+ serverctrls=[request_ctrl])
+ rtype, rdata, rmsgid, response_ctrl = topology.master1.result3(msg_id)
+ value = ''
+ for dn, attrs in rdata:
+ topology.master1.log.info("dn: %s" % dn)
+ for value in attrs['entryLevelRights']:
+ topology.master1.log.info("######## entryLevelRights: %r" %
value)
+ assert 'n' not in value
+
+
+def test_rdn_write_modrdn_anonymous(topology, rdn_write_setup):
+ ANONYMOUS_DN = ""
+ topology.master1.close()
+ topology.master1.binddn = ANONYMOUS_DN
+ topology.master1.open()
+ msg_id = topology.master1.search_ext("", ldap.SCOPE_BASE,
"objectclass=*")
+ rtype, rdata, rmsgid, response_ctrl = topology.master1.result3(msg_id)
+ for dn, attrs in rdata:
+ topology.master1.log.info("dn: %s" % dn)
+ for attr in attrs:
+ topology.master1.log.info("######## %r: %r" % (attr,
attrs[attr]))
+
+ try:
+ topology.master1.rename_s(SRC_ENTRY_DN, "cn=%s" % DST_ENTRY_CN,
delold=True)
+ except Exception as e:
+ topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
+ isinstance(e, ldap.INSUFFICIENT_ACCESS)
+
+ try:
+ topology.master1.getEntry(DST_ENTRY_DN, ldap.SCOPE_BASE,
"objectclass=*")
+ assert False
+ except Exception as e:
+ topology.master1.log.info("The entry was not renamed (expected)")
+ isinstance(e, ldap.NO_SUCH_OBJECT)
+
+ _bind_manager(topology)
+
+
if __name__ == '__main__':
# Run isolated
# -s for DEBUG mode
diff --git a/dirsrvtests/tickets/ticket47553_ger.py
b/dirsrvtests/tickets/ticket47553_ger.py
deleted file mode 100644
index d688c70..0000000
--- a/dirsrvtests/tickets/ticket47553_ger.py
+++ /dev/null
@@ -1,553 +0,0 @@
-'''
-Created on Nov 7, 2013
-
-@author: tbordaz
-'''
-import os
-import sys
-import time
-import ldap
-import logging
-import socket
-import time
-import logging
-import pytest
-import re
-from lib389 import DirSrv, Entry, tools
-from lib389.tools import DirSrvTools
-from lib389._constants import *
-from lib389.properties import *
-from constants import *
-from lib389._constants import REPLICAROLE_MASTER
-from ldap.controls.simple import GetEffectiveRightsControl
-
-logging.getLogger(__name__).setLevel(logging.DEBUG)
-log = logging.getLogger(__name__)
-
-#
-# important part. We can deploy Master1 and Master2 on different versions
-#
-installation1_prefix = None
-installation2_prefix = None
-
-TEST_REPL_DN = "cn=test_repl, %s" % SUFFIX
-
-STAGING_CN = "staged user"
-PRODUCTION_CN = "accounts"
-EXCEPT_CN = "excepts"
-
-STAGING_DN = "cn=%s,%s" % (STAGING_CN, SUFFIX)
-PRODUCTION_DN = "cn=%s,%s" % (PRODUCTION_CN, SUFFIX)
-PROD_EXCEPT_DN = "cn=%s,%s" % (EXCEPT_CN, PRODUCTION_DN)
-
-STAGING_PATTERN = "cn=%s*,%s" % (STAGING_CN[:2], SUFFIX)
-PRODUCTION_PATTERN = "cn=%s*,%s" % (PRODUCTION_CN[:2], SUFFIX)
-BAD_STAGING_PATTERN = "cn=bad*,%s" % (SUFFIX)
-BAD_PRODUCTION_PATTERN = "cn=bad*,%s" % (SUFFIX)
-
-BIND_CN = "bind_entry"
-BIND_DN = "cn=%s,%s" % (BIND_CN, SUFFIX)
-BIND_PW = "password"
-
-NEW_ACCOUNT = "new_account"
-MAX_ACCOUNTS = 20
-
-CONFIG_MODDN_ACI_ATTR = "nsslapd-moddn-aci"
-
-class TopologyMaster1Master2(object):
- def __init__(self, master1, master2):
- master1.open()
- self.master1 = master1
-
- master2.open()
- self.master2 = master2
-
-
-(a)pytest.fixture(scope="module")
-def topology(request):
- '''
- This fixture is used to create a replicated topology for the 'module'.
- The replicated topology is MASTER1 <-> Master2.
- At the beginning, It may exists a master2 instance and/or a master2 instance.
- It may also exists a backup for the master1 and/or the master2.
-
- Principle:
- If master1 instance exists:
- restart it
- If master2 instance exists:
- restart it
- If backup of master1 AND backup of master2 exists:
- create or rebind to master1
- create or rebind to master2
-
- restore master1 from backup
- restore master2 from backup
- else:
- Cleanup everything
- remove instances
- remove backups
- Create instances
- Initialize replication
- Create backups
- '''
- global installation1_prefix
- global installation2_prefix
-
- # allocate master1 on a given deployement
- master1 = DirSrv(verbose=False)
- if installation1_prefix:
- args_instance[SER_DEPLOYED_DIR] = installation1_prefix
-
- # Args for the master1 instance
- args_instance[SER_HOST] = HOST_MASTER_1
- args_instance[SER_PORT] = PORT_MASTER_1
- args_instance[SER_SERVERID_PROP] = SERVERID_MASTER_1
- args_master = args_instance.copy()
- master1.allocate(args_master)
-
- # allocate master1 on a given deployement
- master2 = DirSrv(verbose=False)
- if installation2_prefix:
- args_instance[SER_DEPLOYED_DIR] = installation2_prefix
-
- # Args for the consumer instance
- args_instance[SER_HOST] = HOST_MASTER_2
- args_instance[SER_PORT] = PORT_MASTER_2
- args_instance[SER_SERVERID_PROP] = SERVERID_MASTER_2
- args_master = args_instance.copy()
- master2.allocate(args_master)
-
-
- # Get the status of the backups
- backup_master1 = master1.checkBackupFS()
- backup_master2 = master2.checkBackupFS()
-
- # Get the status of the instance and restart it if it exists
- instance_master1 = master1.exists()
- if instance_master1:
- master1.stop(timeout=10)
- master1.start(timeout=10)
-
- instance_master2 = master2.exists()
- if instance_master2:
- master2.stop(timeout=10)
- master2.start(timeout=10)
-
- if backup_master1 and backup_master2:
- # The backups exist, assuming they are correct
- # we just re-init the instances with them
- if not instance_master1:
- master1.create()
- # Used to retrieve configuration information (dbdir, confdir...)
- master1.open()
-
- if not instance_master2:
- master2.create()
- # Used to retrieve configuration information (dbdir, confdir...)
- master2.open()
-
- # restore master1 from backup
- master1.stop(timeout=10)
- master1.restoreFS(backup_master1)
- master1.start(timeout=10)
-
- # restore master2 from backup
- master2.stop(timeout=10)
- master2.restoreFS(backup_master2)
- master2.start(timeout=10)
- else:
- # We should be here only in two conditions
- # - This is the first time a test involve master-consumer
- # so we need to create everything
- # - Something weird happened (instance/backup destroyed)
- # so we discard everything and recreate all
-
- # Remove all the backups. So even if we have a specific backup file
- # (e.g backup_master) we clear all backups that an instance my have created
- if backup_master1:
- master1.clearBackupFS()
- if backup_master2:
- master2.clearBackupFS()
-
- # Remove all the instances
- if instance_master1:
- master1.delete()
- if instance_master2:
- master2.delete()
-
- # Create the instances
- master1.create()
- master1.open()
- master2.create()
- master2.open()
-
- #
- # Now prepare the Master-Consumer topology
- #
- # First Enable replication
- master1.replica.enableReplication(suffix=SUFFIX, role=REPLICAROLE_MASTER,
replicaId=REPLICAID_MASTER_1)
- master2.replica.enableReplication(suffix=SUFFIX, role=REPLICAROLE_MASTER,
replicaId=REPLICAID_MASTER_2)
-
- # Initialize the supplier->consumer
-
- properties = {RA_NAME: r'meTo_$host:$port',
- RA_BINDDN: defaultProperties[REPLICATION_BIND_DN],
- RA_BINDPW: defaultProperties[REPLICATION_BIND_PW],
- RA_METHOD: defaultProperties[REPLICATION_BIND_METHOD],
- RA_TRANSPORT_PROT: defaultProperties[REPLICATION_TRANSPORT]}
- repl_agreement = master1.agreement.create(suffix=SUFFIX, host=master2.host,
port=master2.port, properties=properties)
-
- if not repl_agreement:
- log.fatal("Fail to create a replica agreement")
- sys.exit(1)
-
- log.debug("%s created" % repl_agreement)
-
- properties = {RA_NAME: r'meTo_$host:$port',
- RA_BINDDN: defaultProperties[REPLICATION_BIND_DN],
- RA_BINDPW: defaultProperties[REPLICATION_BIND_PW],
- RA_METHOD: defaultProperties[REPLICATION_BIND_METHOD],
- RA_TRANSPORT_PROT: defaultProperties[REPLICATION_TRANSPORT]}
- master2.agreement.create(suffix=SUFFIX, host=master1.host, port=master1.port,
properties=properties)
-
- master1.agreement.init(SUFFIX, HOST_MASTER_2, PORT_MASTER_2)
- master1.waitForReplInit(repl_agreement)
-
- # Check replication is working fine
- master1.add_s(Entry((TEST_REPL_DN, {
- 'objectclass': "top
person".split(),
- 'sn': 'test_repl',
- 'cn': 'test_repl'})))
- loop = 0
- while loop <= 10:
- try:
- ent = master2.getEntry(TEST_REPL_DN, ldap.SCOPE_BASE,
"(objectclass=*)")
- break
- except ldap.NO_SUCH_OBJECT:
- time.sleep(1)
- loop += 1
-
- # Time to create the backups
- master1.stop(timeout=10)
- master1.backupfile = master1.backupFS()
- master1.start(timeout=10)
-
- master2.stop(timeout=10)
- master2.backupfile = master2.backupFS()
- master2.start(timeout=10)
-
- # clear the tmp directory
- master1.clearTmpDir(__file__)
-
- #
- # Here we have two instances master and consumer
- # with replication working. Either coming from a backup recovery
- # or from a fresh (re)init
- # Time to return the topology
- return TopologyMaster1Master2(master1, master2)
-
-
-
-def _bind_manager(topology):
- topology.master1.log.info("Bind as %s " % DN_DM)
- topology.master1.simple_bind_s(DN_DM, PASSWORD)
-
-def _bind_normal(topology):
- # bind as bind_entry
- topology.master1.log.info("Bind as %s" % BIND_DN)
- topology.master1.simple_bind_s(BIND_DN, BIND_PW)
-
-def _moddn_aci_deny_tree(topology, mod_type=None, target_from=STAGING_DN,
target_to=PROD_EXCEPT_DN):
- '''
- It denies the access moddn_to in cn=except,cn=accounts,SUFFIX
- '''
- assert mod_type != None
-
- ACI_TARGET_FROM = ""
- ACI_TARGET_TO = ""
- if target_from:
- ACI_TARGET_FROM = "(target_from = \"ldap:///%s\")" %
(target_from)
- if target_to:
- ACI_TARGET_TO = "(target_to = \"ldap:///%s\")" %
(target_to)
-
- ACI_ALLOW = "(version 3.0; acl \"Deny MODDN to prod_except\";
deny (moddn)"
- ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
- ACI_BODY = ACI_TARGET_TO + ACI_TARGET_FROM + ACI_ALLOW + ACI_SUBJECT
- mod = [(mod_type, 'aci', ACI_BODY)]
- #topology.master1.modify_s(SUFFIX, mod)
- topology.master1.log.info("Add a DENY aci under %s " % PROD_EXCEPT_DN)
- topology.master1.modify_s(PROD_EXCEPT_DN, mod)
-
-def _moddn_aci_staging_to_production(topology, mod_type=None, target_from=STAGING_DN,
target_to=PRODUCTION_DN):
- assert mod_type != None
-
-
- ACI_TARGET_FROM = ""
- ACI_TARGET_TO = ""
- if target_from:
- ACI_TARGET_FROM = "(target_from = \"ldap:///%s\")" %
(target_from)
- if target_to:
- ACI_TARGET_TO = "(target_to = \"ldap:///%s\")" %
(target_to)
-
- ACI_ALLOW = "(version 3.0; acl \"MODDN from staging to
production\"; allow (moddn)"
- ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
- ACI_BODY = ACI_TARGET_FROM + ACI_TARGET_TO + ACI_ALLOW + ACI_SUBJECT
- mod = [(mod_type, 'aci', ACI_BODY)]
- topology.master1.modify_s(SUFFIX, mod)
-
-def _moddn_aci_from_production_to_staging(topology, mod_type=None):
- assert mod_type != None
-
- ACI_TARGET = "(target_from = \"ldap:///%s\") (target_to =
\"ldap:///%s\")" % (PRODUCTION_DN, STAGING_DN)
- ACI_ALLOW = "(version 3.0; acl \"MODDN from production to
staging\"; allow (moddn)"
- ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
- ACI_BODY = ACI_TARGET + ACI_ALLOW + ACI_SUBJECT
- mod = [(mod_type, 'aci', ACI_BODY)]
- topology.master1.modify_s(SUFFIX, mod)
-
-
-def test_ticket47553_init(topology):
- """
- Creates
- - a staging DIT
- - a production DIT
- - add accounts in staging DIT
- - enable ACL logging (commented for performance reason)
-
- """
-
- topology.master1.log.info("\n\n######################### INITIALIZATION
######################\n")
-
- # entry used to bind with
- topology.master1.log.info("Add %s" % BIND_DN)
- topology.master1.add_s(Entry((BIND_DN, {
- 'objectclass': "top
person".split(),
- 'sn': BIND_CN,
- 'cn': BIND_CN,
- 'userpassword': BIND_PW})))
-
- # DIT for staging
- topology.master1.log.info("Add %s" % STAGING_DN)
- topology.master1.add_s(Entry((STAGING_DN, {
- 'objectclass': "top
organizationalRole".split(),
- 'cn': STAGING_CN,
- 'description': "staging
DIT"})))
-
- # DIT for production
- topology.master1.log.info("Add %s" % PRODUCTION_DN)
- topology.master1.add_s(Entry((PRODUCTION_DN, {
- 'objectclass': "top
organizationalRole".split(),
- 'cn': PRODUCTION_CN,
- 'description': "production
DIT"})))
-
- # DIT for production/except
- topology.master1.log.info("Add %s" % PROD_EXCEPT_DN)
- topology.master1.add_s(Entry((PROD_EXCEPT_DN, {
- 'objectclass': "top
organizationalRole".split(),
- 'cn': EXCEPT_CN,
- 'description': "production
except DIT"})))
-
- # enable acl error logging
- #mod = [(ldap.MOD_REPLACE, 'nsslapd-errorlog-level', '128')]
- #topology.master1.modify_s(DN_CONFIG, mod)
- #topology.master2.modify_s(DN_CONFIG, mod)
-
-
-
-
-
- # add dummy entries in the staging DIT
- for cpt in range(MAX_ACCOUNTS):
- name = "%s%d" % (NEW_ACCOUNT, cpt)
- topology.master1.add_s(Entry(("cn=%s,%s" % (name, STAGING_DN), {
- 'objectclass': "top
person".split(),
- 'sn': name,
- 'cn': name})))
-
-
-def test_ticket47553_mode_default_add_deny(topology):
- '''
- This test case checks that the ADD operation fails (no ADD aci on production)
- '''
-
- topology.master1.log.info("\n\n######################### mode moddn_aci : ADD
(should fail) ######################\n")
-
- _bind_normal(topology)
-
- #
- # First try to add an entry in production => INSUFFICIENT_ACCESS
- #
- try:
- topology.master1.log.info("Try to add %s" % PRODUCTION_DN)
- name = "%s%d" % (NEW_ACCOUNT, 0)
- topology.master1.add_s(Entry(("cn=%s,%s" % (name, PRODUCTION_DN), {
- 'objectclass': "top
person".split(),
- 'sn': name,
- 'cn': name})))
- assert 0 # this is an error, we should not be allowed to add an entry in
production
- except Exception as e:
- topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
- assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
-
-def test_ticket47553_mode_default_ger_no_moddn(topology):
- topology.master1.log.info("\n\n######################### mode moddn_aci : GER no
moddn ######################\n")
- request_ctrl = GetEffectiveRightsControl(criticality=True,authzId="dn: " +
BIND_DN)
- msg_id = topology.master1.search_ext(PRODUCTION_DN, ldap.SCOPE_SUBTREE,
"objectclass=*", serverctrls=[request_ctrl])
- rtype,rdata,rmsgid,response_ctrl = topology.master1.result3(msg_id)
- ger={}
- value=''
- for dn, attrs in rdata:
- topology.master1.log.info ("dn: %s" % dn)
- value = attrs['entryLevelRights'][0]
-
- topology.master1.log.info ("############### entryLevelRights: %r" %
value)
- assert 'n' not in value
-
-def test_ticket47553_mode_default_ger_with_moddn(topology):
- '''
- This test case adds the moddn aci and check ger contains 'n'
- '''
-
- topology.master1.log.info("\n\n######################### mode moddn_aci: GER
with moddn ######################\n")
-
- # successfull MOD with the ACI
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
target_from=STAGING_DN, target_to=PRODUCTION_DN)
- _bind_normal(topology)
-
- request_ctrl = GetEffectiveRightsControl(criticality=True,authzId="dn: " +
BIND_DN)
- msg_id = topology.master1.search_ext(PRODUCTION_DN, ldap.SCOPE_SUBTREE,
"objectclass=*", serverctrls=[request_ctrl])
- rtype,rdata,rmsgid,response_ctrl = topology.master1.result3(msg_id)
- ger={}
- value = ''
- for dn, attrs in rdata:
- topology.master1.log.info ("dn: %s" % dn)
- value = attrs['entryLevelRights'][0]
-
- topology.master1.log.info ("############### entryLevelRights: %r" %
value)
- assert 'n' in value
-
- # successfull MOD with the both ACI
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
target_from=STAGING_DN, target_to=PRODUCTION_DN)
- _bind_normal(topology)
-
-def test_ticket47553_mode_switch_default_to_legacy(topology):
- '''
- This test switch the server from default mode to legacy
- '''
- topology.master1.log.info("\n\n######################### Disable the moddn aci
mod ######################\n" )
- _bind_manager(topology)
- mod = [(ldap.MOD_REPLACE, CONFIG_MODDN_ACI_ATTR, 'off')]
- topology.master1.modify_s(DN_CONFIG, mod)
-
-def test_ticket47553_mode_legacy_ger_no_moddn1(topology):
- topology.master1.log.info("\n\n######################### mode legacy 1: GER no
moddn ######################\n")
- request_ctrl = GetEffectiveRightsControl(criticality=True,authzId="dn: " +
BIND_DN)
- msg_id = topology.master1.search_ext(PRODUCTION_DN, ldap.SCOPE_SUBTREE,
"objectclass=*", serverctrls=[request_ctrl])
- rtype,rdata,rmsgid,response_ctrl = topology.master1.result3(msg_id)
- ger={}
- value=''
- for dn, attrs in rdata:
- topology.master1.log.info ("dn: %s" % dn)
- value = attrs['entryLevelRights'][0]
-
- topology.master1.log.info ("############### entryLevelRights: %r" %
value)
- assert 'n' not in value
-
-def test_ticket47553_mode_legacy_ger_no_moddn2(topology):
- topology.master1.log.info("\n\n######################### mode legacy 2: GER no
moddn ######################\n")
- # successfull MOD with the ACI
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
target_from=STAGING_DN, target_to=PRODUCTION_DN)
- _bind_normal(topology)
-
- request_ctrl = GetEffectiveRightsControl(criticality=True,authzId="dn: " +
BIND_DN)
- msg_id = topology.master1.search_ext(PRODUCTION_DN, ldap.SCOPE_SUBTREE,
"objectclass=*", serverctrls=[request_ctrl])
- rtype,rdata,rmsgid,response_ctrl = topology.master1.result3(msg_id)
- ger={}
- value=''
- for dn, attrs in rdata:
- topology.master1.log.info ("dn: %s" % dn)
- value = attrs['entryLevelRights'][0]
-
- topology.master1.log.info ("############### entryLevelRights: %r" %
value)
- assert 'n' not in value
-
- # successfull MOD with the both ACI
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
target_from=STAGING_DN, target_to=PRODUCTION_DN)
- _bind_normal(topology)
-
-def test_ticket47553_mode_legacy_ger_with_moddn(topology):
- topology.master1.log.info("\n\n######################### mode legacy : GER with
moddn ######################\n")
-
- # being allowed to read/write the RDN attribute use to allow the RDN
- ACI_TARGET = "(target =
\"ldap:///%s\")(targetattr=\"cn\")" % (PRODUCTION_DN)
- ACI_ALLOW = "(version 3.0; acl \"MODDN production changing the RDN
attribute\"; allow (read,search,write)"
- ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
- ACI_BODY = ACI_TARGET + ACI_ALLOW + ACI_SUBJECT
-
- # successfull MOD with the ACI
- _bind_manager(topology)
- mod = [(ldap.MOD_ADD, 'aci', ACI_BODY)]
- topology.master1.modify_s(SUFFIX, mod)
- _bind_normal(topology)
-
- request_ctrl = GetEffectiveRightsControl(criticality=True,authzId="dn: " +
BIND_DN)
- msg_id = topology.master1.search_ext(PRODUCTION_DN, ldap.SCOPE_SUBTREE,
"objectclass=*", serverctrls=[request_ctrl])
- rtype,rdata,rmsgid,response_ctrl = topology.master1.result3(msg_id)
- ger={}
- value=''
- for dn, attrs in rdata:
- topology.master1.log.info ("dn: %s" % dn)
- value = attrs['entryLevelRights'][0]
-
- topology.master1.log.info ("############### entryLevelRights: %r" %
value)
- assert 'n' in value
-
- # successfull MOD with the both ACI
- _bind_manager(topology)
- mod = [(ldap.MOD_DELETE, 'aci', ACI_BODY)]
- topology.master1.modify_s(SUFFIX, mod)
- _bind_normal(topology)
-
-
-def test_ticket47553_final(topology):
- topology.master1.stop(timeout=10)
- topology.master2.stop(timeout=10)
-
-def run_isolated():
- '''
- run_isolated is used to run these test cases independently of a test scheduler
(xunit, py.test..)
- To run isolated without py.test, you need to
- - edit this file and comment '(a)pytest.fixture' line before
'topology' function.
- - set the installation prefix
- - run this program
- '''
- global installation1_prefix
- global installation2_prefix
- installation1_prefix = None
- installation2_prefix = None
-
- topo = topology(True)
- topo.master1.log.info("\n\n######################### Ticket 47553
######################\n")
- test_ticket47553_init(topo)
-
- # Check that without appropriate aci we are not allowed to add/delete
- test_ticket47553_mode_default_add_deny(topo)
- test_ticket47553_mode_default_ger_no_moddn(topo)
- test_ticket47553_mode_default_ger_with_moddn(topo)
- test_ticket47553_mode_switch_default_to_legacy(topo)
- test_ticket47553_mode_legacy_ger_no_moddn1(topo)
- test_ticket47553_mode_legacy_ger_no_moddn2(topo)
- test_ticket47553_mode_legacy_ger_with_moddn(topo)
-
- test_ticket47553_final(topo)
-
-
-
-
-if __name__ == '__main__':
- run_isolated()
-
diff --git a/dirsrvtests/tickets/ticket47553_rdn_write_test.py
b/dirsrvtests/tickets/ticket47553_rdn_write_test.py
deleted file mode 100644
index 826e709..0000000
--- a/dirsrvtests/tickets/ticket47553_rdn_write_test.py
+++ /dev/null
@@ -1,135 +0,0 @@
-import os
-import sys
-import time
-import ldap
-import logging
-import pytest
-from lib389 import DirSrv, Entry, tools, tasks
-from lib389.tools import DirSrvTools
-from lib389._constants import *
-from lib389.properties import *
-from lib389.tasks import *
-from lib389.utils import *
-from ldap.controls.simple import GetEffectiveRightsControl
-
-logging.getLogger(__name__).setLevel(logging.DEBUG)
-log = logging.getLogger(__name__)
-
-installation1_prefix = None
-
-SRC_ENTRY_CN = "tuser"
-EXT_RDN = "01"
-DST_ENTRY_CN = SRC_ENTRY_CN + EXT_RDN
-
-SRC_ENTRY_DN = "cn=%s,%s" % (SRC_ENTRY_CN, SUFFIX)
-DST_ENTRY_DN = "cn=%s,%s" % (DST_ENTRY_CN, SUFFIX)
-
-
-class TopologyStandalone(object):
- def __init__(self, standalone):
- standalone.open()
- self.standalone = standalone
-
-
-(a)pytest.fixture(scope="module")
-def topology(request):
- global installation1_prefix
-
- # Creating standalone instance ...
- standalone = DirSrv(verbose=False)
- if installation1_prefix:
- args_instance[SER_DEPLOYED_DIR] = installation1_prefix
- args_instance[SER_HOST] = HOST_STANDALONE
- args_instance[SER_PORT] = PORT_STANDALONE
- args_instance[SER_SERVERID_PROP] = SERVERID_STANDALONE
- args_instance[SER_CREATION_SUFFIX] = DEFAULT_SUFFIX
- args_standalone = args_instance.copy()
- standalone.allocate(args_standalone)
- instance_standalone = standalone.exists()
- if instance_standalone:
- standalone.delete()
- standalone.create()
- standalone.open()
-
- # Clear out the tmp dir
- standalone.clearTmpDir(__file__)
-
- return TopologyStandalone(standalone)
-
-
-def test_ticket47553_rdn_write_init(topology):
- topology.standalone.log.info("\n\n######################### Add entry tuser
######################\n")
- topology.standalone.add_s(Entry((SRC_ENTRY_DN, {
- 'objectclass': "top
person".split(),
- 'sn': SRC_ENTRY_CN,
- 'cn': SRC_ENTRY_CN})))
-
-
-def test_ticket47553_rdn_write_get_ger(topology):
- ANONYMOUS_DN = ""
- topology.standalone.log.info("\n\n######################### GER rights for
anonymous ######################\n")
- request_ctrl = GetEffectiveRightsControl(criticality=True, authzId="dn:" +
ANONYMOUS_DN)
- msg_id = topology.standalone.search_ext(SUFFIX, ldap.SCOPE_SUBTREE,
"objectclass=*", serverctrls=[request_ctrl])
- rtype, rdata, rmsgid, response_ctrl = topology.standalone.result3(msg_id)
- value = ''
- for dn, attrs in rdata:
- topology.standalone.log.info("dn: %s" % dn)
- for value in attrs['entryLevelRights']:
- topology.standalone.log.info("############### entryLevelRights:
%r" % value)
- assert 'n' not in value
-
-
-def test_ticket47553_rdn_write_modrdn_anonymous(topology):
- ANONYMOUS_DN = ""
- topology.standalone.close()
- topology.standalone.binddn = ANONYMOUS_DN
- topology.standalone.open()
- msg_id = topology.standalone.search_ext("", ldap.SCOPE_BASE,
"objectclass=*")
- rtype, rdata, rmsgid, response_ctrl = topology.standalone.result3(msg_id)
- for dn, attrs in rdata:
- topology.standalone.log.info("dn: %s" % dn)
- for attr in attrs:
- topology.standalone.log.info("############### %r: %r" % (attr,
attrs[attr]))
-
- try:
- topology.standalone.rename_s(SRC_ENTRY_DN, "cn=%s" % DST_ENTRY_CN,
delold=True)
- except Exception as e:
- topology.standalone.log.info("Exception (expected): %s" %
type(e).__name__)
- isinstance(e, ldap.INSUFFICIENT_ACCESS)
-
- try:
- topology.standalone.getEntry(DST_ENTRY_DN, ldap.SCOPE_BASE,
"objectclass=*")
- assert False
- except Exception as e:
- topology.standalone.log.info("The entry was not renamed (expected)")
- isinstance(e, ldap.NO_SUCH_OBJECT)
-
-
-def test_ticket47553_rdn_write(topology):
- '''
- Write your testcase here...
- '''
-
- log.info('Test complete')
-
-
-def test_ticket47553_rdn_write_final(topology):
- topology.standalone.delete()
- log.info('Testcase PASSED')
-
-
-def run_isolated():
- global installation1_prefix
- installation1_prefix = '/home/tbordaz/install_master'
-
- topo = topology(True)
- test_ticket47553_rdn_write_init(topo)
- test_ticket47553_rdn_write_get_ger(topo)
- test_ticket47553_rdn_write(topo)
- test_ticket47553_rdn_write_modrdn_anonymous(topo)
- test_ticket47553_rdn_write_final(topo)
-
-
-if __name__ == '__main__':
- run_isolated()
-
diff --git a/dirsrvtests/tickets/ticket47553_single_aci_test.py
b/dirsrvtests/tickets/ticket47553_single_aci_test.py
deleted file mode 100644
index 0c8d7e9..0000000
--- a/dirsrvtests/tickets/ticket47553_single_aci_test.py
+++ /dev/null
@@ -1,1224 +0,0 @@
-'''
-Created on Nov 7, 2013
-
-@author: tbordaz
-'''
-import os
-import sys
-import time
-import ldap
-import logging
-import socket
-import time
-import logging
-import pytest
-import re
-from lib389 import DirSrv, Entry, tools
-from lib389.tools import DirSrvTools
-from lib389._constants import *
-from lib389.properties import *
-from constants import *
-from lib389._constants import REPLICAROLE_MASTER
-
-logging.getLogger(__name__).setLevel(logging.DEBUG)
-log = logging.getLogger(__name__)
-
-#
-# important part. We can deploy Master1 and Master2 on different versions
-#
-installation1_prefix = None
-installation2_prefix = None
-
-TEST_REPL_DN = "cn=test_repl, %s" % SUFFIX
-
-STAGING_CN = "staged user"
-PRODUCTION_CN = "accounts"
-EXCEPT_CN = "excepts"
-
-STAGING_DN = "cn=%s,%s" % (STAGING_CN, SUFFIX)
-PRODUCTION_DN = "cn=%s,%s" % (PRODUCTION_CN, SUFFIX)
-PROD_EXCEPT_DN = "cn=%s,%s" % (EXCEPT_CN, PRODUCTION_DN)
-
-STAGING_PATTERN = "cn=%s*,%s" % (STAGING_CN[:2], SUFFIX)
-PRODUCTION_PATTERN = "cn=%s*,%s" % (PRODUCTION_CN[:2], SUFFIX)
-BAD_STAGING_PATTERN = "cn=bad*,%s" % (SUFFIX)
-BAD_PRODUCTION_PATTERN = "cn=bad*,%s" % (SUFFIX)
-
-BIND_CN = "bind_entry"
-BIND_DN = "cn=%s,%s" % (BIND_CN, SUFFIX)
-BIND_PW = "password"
-
-NEW_ACCOUNT = "new_account"
-MAX_ACCOUNTS = 20
-
-CONFIG_MODDN_ACI_ATTR = "nsslapd-moddn-aci"
-
-class TopologyMaster1Master2(object):
- def __init__(self, master1, master2):
- master1.open()
- self.master1 = master1
-
- master2.open()
- self.master2 = master2
-
-
-(a)pytest.fixture(scope="module")
-def topology(request):
- '''
- This fixture is used to create a replicated topology for the 'module'.
- The replicated topology is MASTER1 <-> Master2.
- At the beginning, It may exists a master2 instance and/or a master2 instance.
- It may also exists a backup for the master1 and/or the master2.
-
- Principle:
- If master1 instance exists:
- restart it
- If master2 instance exists:
- restart it
- If backup of master1 AND backup of master2 exists:
- create or rebind to master1
- create or rebind to master2
-
- restore master1 from backup
- restore master2 from backup
- else:
- Cleanup everything
- remove instances
- remove backups
- Create instances
- Initialize replication
- Create backups
- '''
- global installation1_prefix
- global installation2_prefix
-
- # allocate master1 on a given deployement
- master1 = DirSrv(verbose=False)
- if installation1_prefix:
- args_instance[SER_DEPLOYED_DIR] = installation1_prefix
-
- # Args for the master1 instance
- args_instance[SER_HOST] = HOST_MASTER_1
- args_instance[SER_PORT] = PORT_MASTER_1
- args_instance[SER_SERVERID_PROP] = SERVERID_MASTER_1
- args_master = args_instance.copy()
- master1.allocate(args_master)
-
- # allocate master1 on a given deployement
- master2 = DirSrv(verbose=False)
- if installation2_prefix:
- args_instance[SER_DEPLOYED_DIR] = installation2_prefix
-
- # Args for the consumer instance
- args_instance[SER_HOST] = HOST_MASTER_2
- args_instance[SER_PORT] = PORT_MASTER_2
- args_instance[SER_SERVERID_PROP] = SERVERID_MASTER_2
- args_master = args_instance.copy()
- master2.allocate(args_master)
-
-
- # Get the status of the backups
- backup_master1 = master1.checkBackupFS()
- backup_master2 = master2.checkBackupFS()
-
- # Get the status of the instance and restart it if it exists
- instance_master1 = master1.exists()
- if instance_master1:
- master1.stop(timeout=10)
- master1.start(timeout=10)
-
- instance_master2 = master2.exists()
- if instance_master2:
- master2.stop(timeout=10)
- master2.start(timeout=10)
-
- if backup_master1 and backup_master2:
- # The backups exist, assuming they are correct
- # we just re-init the instances with them
- if not instance_master1:
- master1.create()
- # Used to retrieve configuration information (dbdir, confdir...)
- master1.open()
-
- if not instance_master2:
- master2.create()
- # Used to retrieve configuration information (dbdir, confdir...)
- master2.open()
-
- # restore master1 from backup
- master1.stop(timeout=10)
- master1.restoreFS(backup_master1)
- master1.start(timeout=10)
-
- # restore master2 from backup
- master2.stop(timeout=10)
- master2.restoreFS(backup_master2)
- master2.start(timeout=10)
- else:
- # We should be here only in two conditions
- # - This is the first time a test involve master-consumer
- # so we need to create everything
- # - Something weird happened (instance/backup destroyed)
- # so we discard everything and recreate all
-
- # Remove all the backups. So even if we have a specific backup file
- # (e.g backup_master) we clear all backups that an instance my have created
- if backup_master1:
- master1.clearBackupFS()
- if backup_master2:
- master2.clearBackupFS()
-
- # Remove all the instances
- if instance_master1:
- master1.delete()
- if instance_master2:
- master2.delete()
-
- # Create the instances
- master1.create()
- master1.open()
- master2.create()
- master2.open()
-
- #
- # Now prepare the Master-Consumer topology
- #
- # First Enable replication
- master1.replica.enableReplication(suffix=SUFFIX, role=REPLICAROLE_MASTER,
replicaId=REPLICAID_MASTER_1)
- master2.replica.enableReplication(suffix=SUFFIX, role=REPLICAROLE_MASTER,
replicaId=REPLICAID_MASTER_2)
-
- # Initialize the supplier->consumer
-
- properties = {RA_NAME: r'meTo_$host:$port',
- RA_BINDDN: defaultProperties[REPLICATION_BIND_DN],
- RA_BINDPW: defaultProperties[REPLICATION_BIND_PW],
- RA_METHOD: defaultProperties[REPLICATION_BIND_METHOD],
- RA_TRANSPORT_PROT: defaultProperties[REPLICATION_TRANSPORT]}
- repl_agreement = master1.agreement.create(suffix=SUFFIX, host=master2.host,
port=master2.port, properties=properties)
-
- if not repl_agreement:
- log.fatal("Fail to create a replica agreement")
- sys.exit(1)
-
- log.debug("%s created" % repl_agreement)
-
- properties = {RA_NAME: r'meTo_$host:$port',
- RA_BINDDN: defaultProperties[REPLICATION_BIND_DN],
- RA_BINDPW: defaultProperties[REPLICATION_BIND_PW],
- RA_METHOD: defaultProperties[REPLICATION_BIND_METHOD],
- RA_TRANSPORT_PROT: defaultProperties[REPLICATION_TRANSPORT]}
- master2.agreement.create(suffix=SUFFIX, host=master1.host, port=master1.port,
properties=properties)
-
- master1.agreement.init(SUFFIX, HOST_MASTER_2, PORT_MASTER_2)
- master1.waitForReplInit(repl_agreement)
-
- # Check replication is working fine
- master1.add_s(Entry((TEST_REPL_DN, {
- 'objectclass': "top
person".split(),
- 'sn': 'test_repl',
- 'cn': 'test_repl'})))
- loop = 0
- while loop <= 10:
- try:
- ent = master2.getEntry(TEST_REPL_DN, ldap.SCOPE_BASE,
"(objectclass=*)")
- break
- except ldap.NO_SUCH_OBJECT:
- time.sleep(1)
- loop += 1
-
- # Time to create the backups
- master1.stop(timeout=10)
- master1.backupfile = master1.backupFS()
- master1.start(timeout=10)
-
- master2.stop(timeout=10)
- master2.backupfile = master2.backupFS()
- master2.start(timeout=10)
-
- # clear the tmp directory
- master1.clearTmpDir(__file__)
-
- #
- # Here we have two instances master and consumer
- # with replication working. Either coming from a backup recovery
- # or from a fresh (re)init
- # Time to return the topology
- return TopologyMaster1Master2(master1, master2)
-
-
-
-def _bind_manager(topology):
- topology.master1.log.info("Bind as %s " % DN_DM)
- topology.master1.simple_bind_s(DN_DM, PASSWORD)
-
-def _bind_normal(topology):
- # bind as bind_entry
- topology.master1.log.info("Bind as %s" % BIND_DN)
- topology.master1.simple_bind_s(BIND_DN, BIND_PW)
-
-def _moddn_aci_deny_tree(topology, mod_type=None, target_from=STAGING_DN,
target_to=PROD_EXCEPT_DN):
- '''
- It denies the access moddn_to in cn=except,cn=accounts,SUFFIX
- '''
- assert mod_type != None
-
- ACI_TARGET_FROM = ""
- ACI_TARGET_TO = ""
- if target_from:
- ACI_TARGET_FROM = "(target_from = \"ldap:///%s\")" %
(target_from)
- if target_to:
- ACI_TARGET_TO = "(target_to = \"ldap:///%s\")" %
(target_to)
-
- ACI_ALLOW = "(version 3.0; acl \"Deny MODDN to prod_except\";
deny (moddn)"
- ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
- ACI_BODY = ACI_TARGET_TO + ACI_TARGET_FROM + ACI_ALLOW + ACI_SUBJECT
- mod = [(mod_type, 'aci', ACI_BODY)]
- #topology.master1.modify_s(SUFFIX, mod)
- topology.master1.log.info("Add a DENY aci under %s " % PROD_EXCEPT_DN)
- topology.master1.modify_s(PROD_EXCEPT_DN, mod)
-
-def _write_aci_staging(topology, mod_type=None):
- assert mod_type is not None
-
- ACI_TARGET = "(targetattr=
\"cn\")(target=\"ldap:///cn=*,%s\")" % STAGING_DN
- ACI_ALLOW = "(version 3.0; acl \"write staging entries\"; allow
(write)"
- ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
- ACI_BODY = ACI_TARGET + ACI_ALLOW + ACI_SUBJECT
- mod = [(mod_type, 'aci', ACI_BODY)]
- topology.master1.modify_s(SUFFIX, mod)
-
-def _write_aci_production(topology, mod_type=None):
- assert mod_type is not None
-
- ACI_TARGET = "(targetattr=
\"cn\")(target=\"ldap:///cn=*,%s\")" % PRODUCTION_DN
- ACI_ALLOW = "(version 3.0; acl \"write production entries\";
allow (write)"
- ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
- ACI_BODY = ACI_TARGET + ACI_ALLOW + ACI_SUBJECT
- mod = [(mod_type, 'aci', ACI_BODY)]
- topology.master1.modify_s(SUFFIX, mod)
-
-def _moddn_aci_staging_to_production(topology, mod_type=None, target_from=STAGING_DN,
target_to=PRODUCTION_DN):
- assert mod_type != None
-
-
- ACI_TARGET_FROM = ""
- ACI_TARGET_TO = ""
- if target_from:
- ACI_TARGET_FROM = "(target_from = \"ldap:///%s\")" %
(target_from)
- if target_to:
- ACI_TARGET_TO = "(target_to = \"ldap:///%s\")" %
(target_to)
-
- ACI_ALLOW = "(version 3.0; acl \"MODDN from staging to
production\"; allow (moddn)"
- ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
- ACI_BODY = ACI_TARGET_FROM + ACI_TARGET_TO + ACI_ALLOW + ACI_SUBJECT
- mod = [(mod_type, 'aci', ACI_BODY)]
- topology.master1.modify_s(SUFFIX, mod)
-
- _write_aci_staging(topology, mod_type=mod_type)
-
-def _moddn_aci_from_production_to_staging(topology, mod_type=None):
- assert mod_type != None
-
- ACI_TARGET = "(target_from = \"ldap:///%s\") (target_to =
\"ldap:///%s\")" % (PRODUCTION_DN, STAGING_DN)
- ACI_ALLOW = "(version 3.0; acl \"MODDN from production to
staging\"; allow (moddn)"
- ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
- ACI_BODY = ACI_TARGET + ACI_ALLOW + ACI_SUBJECT
- mod = [(mod_type, 'aci', ACI_BODY)]
- topology.master1.modify_s(SUFFIX, mod)
-
- _write_aci_production(topology, mod_type=mod_type)
-
-
-def test_ticket47553_init(topology):
- """
- Creates
- - a staging DIT
- - a production DIT
- - add accounts in staging DIT
- - enable ACL logging (commented for performance reason)
-
- """
-
- topology.master1.log.info("\n\n######################### INITIALIZATION
######################\n")
-
- # entry used to bind with
- topology.master1.log.info("Add %s" % BIND_DN)
- topology.master1.add_s(Entry((BIND_DN, {
- 'objectclass': "top
person".split(),
- 'sn': BIND_CN,
- 'cn': BIND_CN,
- 'userpassword': BIND_PW})))
-
- # DIT for staging
- topology.master1.log.info("Add %s" % STAGING_DN)
- topology.master1.add_s(Entry((STAGING_DN, {
- 'objectclass': "top
organizationalRole".split(),
- 'cn': STAGING_CN,
- 'description': "staging
DIT"})))
-
- # DIT for production
- topology.master1.log.info("Add %s" % PRODUCTION_DN)
- topology.master1.add_s(Entry((PRODUCTION_DN, {
- 'objectclass': "top
organizationalRole".split(),
- 'cn': PRODUCTION_CN,
- 'description': "production
DIT"})))
-
- # DIT for production/except
- topology.master1.log.info("Add %s" % PROD_EXCEPT_DN)
- topology.master1.add_s(Entry((PROD_EXCEPT_DN, {
- 'objectclass': "top
organizationalRole".split(),
- 'cn': EXCEPT_CN,
- 'description': "production
except DIT"})))
-
- # enable acl error logging
- mod = [(ldap.MOD_REPLACE, 'nsslapd-errorlog-level', str(128+262144))]
- topology.master1.modify_s(DN_CONFIG, mod)
- topology.master2.modify_s(DN_CONFIG, mod)
-
-
- # add dummy entries in the staging DIT
- for cpt in range(MAX_ACCOUNTS):
- name = "%s%d" % (NEW_ACCOUNT, cpt)
- topology.master1.add_s(Entry(("cn=%s,%s" % (name, STAGING_DN), {
- 'objectclass': "top
person".split(),
- 'sn': name,
- 'cn': name})))
-
-
-def test_ticket47553_add(topology):
- '''
- This test case checks that the ADD operation fails (no ADD aci on production)
- '''
-
- topology.master1.log.info("\n\n######################### ADD (should fail)
######################\n")
-
- _bind_normal(topology)
-
- #
- # First try to add an entry in production => INSUFFICIENT_ACCESS
- #
- try:
- topology.master1.log.info("Try to add %s" % PRODUCTION_DN)
- name = "%s%d" % (NEW_ACCOUNT, 0)
- topology.master1.add_s(Entry(("cn=%s,%s" % (name, PRODUCTION_DN), {
- 'objectclass': "top
person".split(),
- 'sn': name,
- 'cn': name})))
- assert 0 # this is an error, we should not be allowed to add an entry in
production
- except Exception as e:
- topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
- assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
-
-def test_ticket47553_delete(topology):
- '''
- This test case checks that the DEL operation fails (no 'delete' aci on
production)
- '''
-
- topology.master1.log.info("\n\n######################### DELETE (should fail)
######################\n")
-
- _bind_normal(topology)
- #
- # Second try to delete an entry in staging => INSUFFICIENT_ACCESS
- #
- try:
- topology.master1.log.info("Try to delete %s" % STAGING_DN)
- name = "%s%d" % (NEW_ACCOUNT, 0)
- topology.master1.delete_s("cn=%s,%s" % (name, STAGING_DN))
- assert 0 # this is an error, we should not be allowed to add an entry in
production
- except Exception as e:
- topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
- assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
-
-
-def test_ticket47553_moddn_staging_prod_0(topology):
- '''
- This test case MOVE entry NEW_ACCOUNT0 from staging to prod
- target_to/target_from: equality filter
- '''
-
- topology.master1.log.info("\n\n######################### MOVE staging -> Prod
(0) ######################\n")
- _bind_normal(topology)
-
- old_rdn = "cn=%s0" % NEW_ACCOUNT
- old_dn = "%s,%s" % (old_rdn, STAGING_DN)
- new_rdn = old_rdn
- new_superior = PRODUCTION_DN
-
- #
- # Try to rename without the apropriate ACI => INSUFFICIENT_ACCESS
- #
- try:
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
- assert 0
- except AssertionError:
- topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
- except Exception as e:
- topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
- assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
-
-
- # successfull MOD with the ACI
- topology.master1.log.info("\n\n######################### MOVE to and from
equality filter ######################\n")
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
target_from=STAGING_DN, target_to=PRODUCTION_DN)
- _bind_normal(topology)
-
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn,
new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
-
- # successfull MOD with the both ACI
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
target_from=STAGING_DN, target_to=PRODUCTION_DN)
- _bind_normal(topology)
-
-
-def test_ticket47553_moddn_staging_prod_1(topology):
- '''
- This test case MOVE entry NEW_ACCOUNT1 from staging to prod
- target_to/target_from: substring/equality filter
- '''
-
- topology.master1.log.info("\n\n######################### MOVE staging -> Prod
(1) ######################\n")
- _bind_normal(topology)
-
- old_rdn = "cn=%s1" % NEW_ACCOUNT
- old_dn = "%s,%s" % (old_rdn, STAGING_DN)
- new_rdn = old_rdn
- new_superior = PRODUCTION_DN
-
- #
- # Try to rename without the apropriate ACI => INSUFFICIENT_ACCESS
- #
- try:
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
- assert 0
- except AssertionError:
- topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
- except Exception as e:
- topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
- assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
-
-
- # successfull MOD with the ACI
- topology.master1.log.info("\n\n######################### MOVE to substring/ from
equality filter ######################\n")
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
target_from=STAGING_DN, target_to=PRODUCTION_PATTERN)
- _bind_normal(topology)
-
-
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn,
new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
-
- # successfull MOD with the both ACI
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
target_from=STAGING_DN, target_to=PRODUCTION_PATTERN)
- _bind_normal(topology)
-
-def test_ticket47553_moddn_staging_prod_2(topology):
- '''
- This test case fails to MOVE entry NEW_ACCOUNT2 from staging to prod
- because of bad pattern
- '''
-
- topology.master1.log.info("\n\n######################### MOVE staging -> Prod
(2) ######################\n")
- _bind_normal(topology)
-
- old_rdn = "cn=%s2" % NEW_ACCOUNT
- old_dn = "%s,%s" % (old_rdn, STAGING_DN)
- new_rdn = old_rdn
- new_superior = PRODUCTION_DN
-
- #
- # Try to rename without the apropriate ACI => INSUFFICIENT_ACCESS
- #
- try:
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
- assert 0
- except AssertionError:
- topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
- except Exception as e:
- topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
- assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
-
-
- # successfull MOD with the ACI
- topology.master1.log.info("\n\n######################### MOVE to substring
(BAD)/ from equality filter ######################\n")
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
target_from=STAGING_DN, target_to=BAD_PRODUCTION_PATTERN)
- _bind_normal(topology)
-
- try:
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
- except AssertionError:
- topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
- except Exception as e:
- topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
- assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
-
- # successfull MOD with the both ACI
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
target_from=STAGING_DN, target_to=BAD_PRODUCTION_PATTERN)
- _bind_normal(topology)
-
-def test_ticket47553_moddn_staging_prod_3(topology):
- '''
- This test case MOVE entry NEW_ACCOUNT3 from staging to prod
- target_to/target_from: equality/substring filter
- '''
-
- topology.master1.log.info("\n\n######################### MOVE staging -> Prod
(3) ######################\n")
- _bind_normal(topology)
-
- old_rdn = "cn=%s3" % NEW_ACCOUNT
- old_dn = "%s,%s" % (old_rdn, STAGING_DN)
- new_rdn = old_rdn
- new_superior = PRODUCTION_DN
-
- #
- # Try to rename without the apropriate ACI => INSUFFICIENT_ACCESS
- #
- try:
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
- assert 0
- except AssertionError:
- topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
- except Exception as e:
- topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
- assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
-
-
- # successfull MOD with the ACI
- topology.master1.log.info("\n\n######################### MOVE to:equality filter
/ from substring filter ######################\n")
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
target_from=STAGING_PATTERN, target_to=PRODUCTION_DN)
- _bind_normal(topology)
-
-
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn,
new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
-
- # successfull MOD with the both ACI
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
target_from=STAGING_PATTERN, target_to=PRODUCTION_DN)
- _bind_normal(topology)
-
-def test_ticket47553_moddn_staging_prod_4(topology):
- '''
- This test case fails to MOVE entry NEW_ACCOUNT4 from staging to prod
- because of bad pattern
- '''
-
- topology.master1.log.info("\n\n######################### MOVE staging -> Prod
(4) ######################\n")
- _bind_normal(topology)
-
- old_rdn = "cn=%s4" % NEW_ACCOUNT
- old_dn = "%s,%s" % (old_rdn, STAGING_DN)
- new_rdn = old_rdn
- new_superior = PRODUCTION_DN
-
- #
- # Try to rename without the apropriate ACI => INSUFFICIENT_ACCESS
- #
- try:
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
- assert 0
- except AssertionError:
- topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
- except Exception as e:
- topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
- assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
-
-
- # successfull MOD with the ACI
- topology.master1.log.info("\n\n######################### MOVE to: equality
filter/ from: substring (BAD) ######################\n")
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
target_from=BAD_STAGING_PATTERN, target_to=PRODUCTION_DN)
- _bind_normal(topology)
-
- try:
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
- except AssertionError:
- topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
- except Exception as e:
- topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
- assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
-
- # successfull MOD with the both ACI
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
target_from=BAD_STAGING_PATTERN, target_to=PRODUCTION_DN)
- _bind_normal(topology)
-
-def test_ticket47553_moddn_staging_prod_5(topology):
- '''
- This test case MOVE entry NEW_ACCOUNT5 from staging to prod
- target_to/target_from: substring/substring filter
- '''
-
- topology.master1.log.info("\n\n######################### MOVE staging -> Prod
(5) ######################\n")
- _bind_normal(topology)
-
- old_rdn = "cn=%s5" % NEW_ACCOUNT
- old_dn = "%s,%s" % (old_rdn, STAGING_DN)
- new_rdn = old_rdn
- new_superior = PRODUCTION_DN
-
- #
- # Try to rename without the apropriate ACI => INSUFFICIENT_ACCESS
- #
- try:
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
- assert 0
- except AssertionError:
- topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
- except Exception as e:
- topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
- assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
-
-
- # successfull MOD with the ACI
- topology.master1.log.info("\n\n######################### MOVE to:substring
filter / from: substring filter ######################\n")
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
target_from=STAGING_PATTERN, target_to=PRODUCTION_PATTERN)
- _bind_normal(topology)
-
-
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn,
new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
-
- # successfull MOD with the both ACI
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
target_from=STAGING_PATTERN, target_to=PRODUCTION_PATTERN)
- _bind_normal(topology)
-
-def test_ticket47553_moddn_staging_prod_6(topology):
- '''
- This test case MOVE entry NEW_ACCOUNT6 from staging to prod
- target_to/target_from: substring/<enmpty> filter
- '''
-
- topology.master1.log.info("\n\n######################### MOVE staging -> Prod
(6) ######################\n")
- _bind_normal(topology)
-
- old_rdn = "cn=%s6" % NEW_ACCOUNT
- old_dn = "%s,%s" % (old_rdn, STAGING_DN)
- new_rdn = old_rdn
- new_superior = PRODUCTION_DN
-
- #
- # Try to rename without the apropriate ACI => INSUFFICIENT_ACCESS
- #
- try:
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
- assert 0
- except AssertionError:
- topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
- except Exception as e:
- topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
- assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
-
-
- # successfull MOD with the ACI
- topology.master1.log.info("\n\n######################### MOVE to:substring
filter / from: empty ######################\n")
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD, target_from=None,
target_to=PRODUCTION_PATTERN)
- _bind_normal(topology)
-
-
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn,
new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
-
- # successfull MOD with the both ACI
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
target_from=None, target_to=PRODUCTION_PATTERN)
- _bind_normal(topology)
-
-def test_ticket47553_moddn_staging_prod_7(topology):
- '''
- This test case MOVE entry NEW_ACCOUNT7 from staging to prod
- target_to/target_from: <empty>/substring filter
- '''
-
- topology.master1.log.info("\n\n######################### MOVE staging -> Prod
(7) ######################\n")
- _bind_normal(topology)
-
- old_rdn = "cn=%s7" % NEW_ACCOUNT
- old_dn = "%s,%s" % (old_rdn, STAGING_DN)
- new_rdn = old_rdn
- new_superior = PRODUCTION_DN
-
- #
- # Try to rename without the apropriate ACI => INSUFFICIENT_ACCESS
- #
- try:
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
- assert 0
- except AssertionError:
- topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
- except Exception as e:
- topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
- assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
-
-
- # successfull MOD with the ACI
- topology.master1.log.info("\n\n######################### MOVE to: empty/ from:
substring filter ######################\n")
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
target_from=STAGING_PATTERN, target_to=None)
- _bind_normal(topology)
-
-
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn,
new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
-
- # successfull MOD with the both ACI
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
target_from=STAGING_PATTERN, target_to=None)
- _bind_normal(topology)
-
-
-def test_ticket47553_moddn_staging_prod_8(topology):
- '''
- This test case MOVE entry NEW_ACCOUNT8 from staging to prod
- target_to/target_from: <empty>/<empty> filter
- '''
-
- topology.master1.log.info("\n\n######################### MOVE staging -> Prod
(8) ######################\n")
- _bind_normal(topology)
-
- old_rdn = "cn=%s8" % NEW_ACCOUNT
- old_dn = "%s,%s" % (old_rdn, STAGING_DN)
- new_rdn = old_rdn
- new_superior = PRODUCTION_DN
-
- #
- # Try to rename without the apropriate ACI => INSUFFICIENT_ACCESS
- #
- try:
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
- assert 0
- except AssertionError:
- topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
- except Exception as e:
- topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
- assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
-
-
- # successfull MOD with the ACI
- topology.master1.log.info("\n\n######################### MOVE to: empty/ from:
empty ######################\n")
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD, target_from=None,
target_to=None)
- _bind_normal(topology)
-
-
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn,
new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
-
- # successfull MOD with the both ACI
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
target_from=None, target_to=None)
- _bind_normal(topology)
-
-def test_ticket47553_moddn_staging_prod_9(topology):
- '''
- This test case disable the 'moddn' right so a MODDN requires a
'add' right
- to be successfull.
- It fails to MOVE entry NEW_ACCOUNT9 from staging to prod.
- Add a 'add' right to prod.
- Then it succeeds to MOVE NEW_ACCOUNT9 from staging to prod.
-
- Then enable the 'moddn' right so a MODDN requires a 'moddn'
right
- It fails to MOVE entry NEW_ACCOUNT10 from staging to prod.
- Add a 'moddn' right to prod.
- Then it succeeds to MOVE NEW_ACCOUNT10 from staging to prod.
- '''
-
- topology.master1.log.info("\n\n######################### MOVE staging -> Prod
(9) ######################\n")
-
- _bind_normal(topology)
- old_rdn = "cn=%s9" % NEW_ACCOUNT
- old_dn = "%s,%s" % (old_rdn, STAGING_DN)
- new_rdn = old_rdn
- new_superior = PRODUCTION_DN
-
- #
- # Try to rename without the apropriate ACI => INSUFFICIENT_ACCESS
- #
- try:
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
- assert 0
- except AssertionError:
- topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
- except Exception as e:
- topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
- assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
-
- ############################################
- # Now do tests with no support of moddn aci
- ############################################
- topology.master1.log.info("Disable the moddn right" )
- _bind_manager(topology)
- mod = [(ldap.MOD_REPLACE, CONFIG_MODDN_ACI_ATTR, 'off')]
- topology.master1.modify_s(DN_CONFIG, mod)
-
- # Add the moddn aci that will not be evaluated because of the config flag
- topology.master1.log.info("\n\n######################### MOVE to and from
equality filter ######################\n")
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
target_from=STAGING_DN, target_to=PRODUCTION_DN)
- _bind_normal(topology)
-
- # It will fail because it will test the ADD right
- try:
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
- assert 0
- except AssertionError:
- topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
- except Exception as e:
- topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
- assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
-
- # remove the moddn aci
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
target_from=STAGING_DN, target_to=PRODUCTION_DN)
- _bind_normal(topology)
-
- #
- # add the 'add' right to the production DN
- # Then do a successfull moddn
- #
- ACI_ALLOW = "(version 3.0; acl \"ADD rights to allow moddn\";
allow (add)"
- ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
- ACI_BODY = ACI_ALLOW + ACI_SUBJECT
-
- _bind_manager(topology)
- mod = [(ldap.MOD_ADD, 'aci', ACI_BODY)]
- topology.master1.modify_s(PRODUCTION_DN, mod)
- _write_aci_staging(topology, mod_type=ldap.MOD_ADD)
- _bind_normal(topology)
-
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn,
new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
-
- _bind_manager(topology)
- mod = [(ldap.MOD_DELETE, 'aci', ACI_BODY)]
- topology.master1.modify_s(PRODUCTION_DN, mod)
- _write_aci_staging(topology, mod_type=ldap.MOD_DELETE)
- _bind_normal(topology)
-
-
- ############################################
- # Now do tests with support of moddn aci
- ############################################
- topology.master1.log.info("Enable the moddn right" )
- _bind_manager(topology)
- mod = [(ldap.MOD_REPLACE, CONFIG_MODDN_ACI_ATTR, 'on')]
- topology.master1.modify_s(DN_CONFIG, mod)
-
- topology.master1.log.info("\n\n######################### MOVE staging -> Prod
(10) ######################\n")
-
- _bind_normal(topology)
- old_rdn = "cn=%s10" % NEW_ACCOUNT
- old_dn = "%s,%s" % (old_rdn, STAGING_DN)
- new_rdn = old_rdn
- new_superior = PRODUCTION_DN
-
- #
- # Try to rename without the apropriate ACI => INSUFFICIENT_ACCESS
- #
- try:
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
- assert 0
- except AssertionError:
- topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
- except Exception as e:
- topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
- assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
-
- #
- # add the 'add' right to the production DN
- # Then do a failing moddn
- #
- ACI_ALLOW = "(version 3.0; acl \"ADD rights to allow moddn\";
allow (add)"
- ACI_SUBJECT = " userdn = \"ldap:///%s\";)" % BIND_DN
- ACI_BODY = ACI_ALLOW + ACI_SUBJECT
-
- _bind_manager(topology)
- mod = [(ldap.MOD_ADD, 'aci', ACI_BODY)]
- topology.master1.modify_s(PRODUCTION_DN, mod)
- _write_aci_staging(topology, mod_type=ldap.MOD_ADD)
- _bind_normal(topology)
-
- try:
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
- assert 0
- except AssertionError:
- topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
- except Exception as e:
- topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
- assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
-
- _bind_manager(topology)
- mod = [(ldap.MOD_DELETE, 'aci', ACI_BODY)]
- topology.master1.modify_s(PRODUCTION_DN, mod)
- _write_aci_staging(topology, mod_type=ldap.MOD_DELETE)
- _bind_normal(topology)
-
- # Add the moddn aci that will be evaluated because of the config flag
- topology.master1.log.info("\n\n######################### MOVE to and from
equality filter ######################\n")
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
target_from=STAGING_DN, target_to=PRODUCTION_DN)
- _bind_normal(topology)
-
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn,
new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
-
- # remove the moddn aci
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
target_from=STAGING_DN, target_to=PRODUCTION_DN)
- _bind_normal(topology)
-
-def test_ticket47553_moddn_prod_staging(topology):
- '''
- This test checks that we can move ACCOUNT11 from staging to prod
- but not move back ACCOUNT11 from prod to staging
- '''
- topology.master1.log.info("\n\n######################### MOVE staging -> Prod
(11) ######################\n")
-
- _bind_normal(topology)
-
- old_rdn = "cn=%s11" % NEW_ACCOUNT
- old_dn = "%s,%s" % (old_rdn, STAGING_DN)
- new_rdn = old_rdn
- new_superior = PRODUCTION_DN
-
- #
- # Try to rename without the apropriate ACI => INSUFFICIENT_ACCESS
- #
- try:
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
- assert 0
- except AssertionError:
- topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
- except Exception as e:
- topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
- assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
-
-
- # successfull MOD with the ACI
- topology.master1.log.info("\n\n######################### MOVE to and from
equality filter ######################\n")
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
target_from=STAGING_DN, target_to=PRODUCTION_DN)
- _bind_normal(topology)
-
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn,
new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
-
-
- #
- # Now check we can not move back the entry to staging
- old_rdn = "cn=%s11" % NEW_ACCOUNT
- old_dn = "%s,%s" % (old_rdn, PRODUCTION_DN)
- new_rdn = old_rdn
- new_superior = STAGING_DN
-
- # add the write right because we want to check the moddn
- _bind_manager(topology)
- _write_aci_production(topology, mod_type=ldap.MOD_ADD)
- _bind_normal(topology)
-
- try:
- topology.master1.log.info("Try to move back MODDN %s -> %s,%s" %
(old_dn, new_rdn, new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
- assert 0
- except AssertionError:
- topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
- except Exception as e:
- topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
- assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
-
- _bind_manager(topology)
- _write_aci_production(topology, mod_type=ldap.MOD_DELETE)
- _bind_normal(topology)
-
- # successfull MOD with the both ACI
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
target_from=STAGING_DN, target_to=PRODUCTION_DN)
- _bind_normal(topology)
-
-
-def test_ticket47553_check_repl_M2_to_M1(topology):
- '''
- Checks that replication is still working M2->M1, using ACCOUNT12
- '''
-
- topology.master1.log.info("Bind as %s (M2)" % DN_DM)
- topology.master2.simple_bind_s(DN_DM, PASSWORD)
-
- rdn = "cn=%s12" % NEW_ACCOUNT
- dn = "%s,%s" % (rdn, STAGING_DN)
-
- # First wait for the ACCOUNT19 entry being replicated on M2
- loop = 0
- while loop <= 10:
- try:
- ent = topology.master2.getEntry(dn, ldap.SCOPE_BASE,
"(objectclass=*)")
- break
- except ldap.NO_SUCH_OBJECT:
- time.sleep(1)
- loop += 1
- assert loop <= 10
-
-
- attribute = 'description'
- tested_value = 'Hello world'
- mod = [(ldap.MOD_ADD, attribute, tested_value)]
- topology.master1.log.info("Update (M2) %s (%s)" % (dn, attribute))
- topology.master2.modify_s(dn, mod)
-
- loop = 0
- while loop <= 10:
- ent = topology.master1.getEntry(dn, ldap.SCOPE_BASE,
"(objectclass=*)")
- assert ent != None
- if ent.hasAttr(attribute) and (ent.getValue(attribute) == tested_value):
- break
-
- time.sleep(1)
- loop += 1
- assert loop < 10
- topology.master1.log.info("Update %s (%s) replicated on M1" % (dn,
attribute))
-
-def test_ticket47553_moddn_staging_prod_except(topology):
- '''
- This test case MOVE entry NEW_ACCOUNT13 from staging to prod
- but fails to move entry NEW_ACCOUNT14 from staging to prod_except
- '''
-
- topology.master1.log.info("\n\n######################### MOVE staging -> Prod
(13) ######################\n")
- _bind_normal(topology)
-
- old_rdn = "cn=%s13" % NEW_ACCOUNT
- old_dn = "%s,%s" % (old_rdn, STAGING_DN)
- new_rdn = old_rdn
- new_superior = PRODUCTION_DN
-
- #
- # Try to rename without the apropriate ACI => INSUFFICIENT_ACCESS
- #
- try:
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
- assert 0
- except AssertionError:
- topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
- except Exception as e:
- topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
- assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
-
-
- # successfull MOD with the ACI
- topology.master1.log.info("\n\n######################### MOVE to and from
equality filter ######################\n")
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_ADD,
target_from=STAGING_DN, target_to=PRODUCTION_DN)
- _moddn_aci_deny_tree(topology, mod_type=ldap.MOD_ADD)
- _bind_normal(topology)
-
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn, new_rdn,
new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
-
- #
- # Now try to move an entry under except
- #
- topology.master1.log.info("\n\n######################### MOVE staging ->
Prod/Except (14) ######################\n")
- old_rdn = "cn=%s14" % NEW_ACCOUNT
- old_dn = "%s,%s" % (old_rdn, STAGING_DN)
- new_rdn = old_rdn
- new_superior = PROD_EXCEPT_DN
- try:
- topology.master1.log.info("Try to MODDN %s -> %s,%s" % (old_dn,
new_rdn, new_superior))
- topology.master1.rename_s(old_dn, new_rdn, newsuperior=new_superior)
- assert 0
- except AssertionError:
- topology.master1.log.info("Exception (not really expected exception but that
is fine as it fails to rename)")
- except Exception as e:
- topology.master1.log.info("Exception (expected): %s" %
type(e).__name__)
- assert isinstance(e, ldap.INSUFFICIENT_ACCESS)
-
- # successfull MOD with the both ACI
- _bind_manager(topology)
- _moddn_aci_staging_to_production(topology, mod_type=ldap.MOD_DELETE,
target_from=STAGING_DN, target_to=PRODUCTION_DN)
- _moddn_aci_deny_tree(topology, mod_type=ldap.MOD_DELETE)
- _bind_normal(topology)
-
-def test_ticket47553_final(topology):
- topology.master1.stop(timeout=10)
- topology.master2.stop(timeout=10)
-
-def run_isolated():
- '''
- run_isolated is used to run these test cases independently of a test scheduler
(xunit, py.test..)
- To run isolated without py.test, you need to
- - edit this file and comment '(a)pytest.fixture' line before
'topology' function.
- - set the installation prefix
- - run this program
- '''
- global installation1_prefix
- global installation2_prefix
- installation1_prefix = None
- installation2_prefix = None
-
- topo = topology(True)
- topo.master1.log.info("\n\n######################### Ticket 47553
######################\n")
- test_ticket47553_init(topo)
-
-
- # Check that without appropriate aci we are not allowed to add/delete
- test_ticket47553_add(topo)
- test_ticket47553_delete(topo)
-
- # tests the ACI as equality/substring filter
- test_ticket47553_moddn_staging_prod_0(topo)
- test_ticket47553_moddn_staging_prod_1(topo)
- test_ticket47553_moddn_staging_prod_2(topo)
- test_ticket47553_moddn_staging_prod_3(topo)
- test_ticket47553_moddn_staging_prod_4(topo)
- test_ticket47553_moddn_staging_prod_5(topo)
-
- # tests the ACI with undefined 'target_to'/'target_from'
- test_ticket47553_moddn_staging_prod_6(topo)
- test_ticket47553_moddn_staging_prod_7(topo)
- test_ticket47553_moddn_staging_prod_8(topo)
-
- # Check we can control the behavior with nsslapd-moddn-aci
- test_ticket47553_moddn_staging_prod_9(topo)
-
- # Check we can move entry 'from' -> 'to' but not 'to'
-> 'from'
- test_ticket47553_moddn_prod_staging(topo)
-
- # check replication is still working
- test_ticket47553_check_repl_M2_to_M1(topo)
-
- # check DENY rule is working
- test_ticket47553_moddn_staging_prod_except(topo)
-
- test_ticket47553_final(topo)
-
-
-
-
-if __name__ == '__main__':
- run_isolated()
-