Long story short, when the batcave upgrade happened on Friday we found out that rbac_playbook doesn't work on el7 due to an issue with nss-altfiles.
I figured out how to sidestep the issue by changing the approach that rbac_playbook takes. It used to get all the groups for the user running the script and check for an intersection between those groups and the required groups for the playbook being run.
The new version looks at the groups required for the playbook being run, gathers all the users in those groups and verifies that the user running rbac_playbook is in that list before proceeding.
I've included the changes below for security review before updating anything on batcave01
Tim
diff --git a/ansible_utils.spec b/ansible_utils.spec index 2e26a70..f097020 100644 --- a/ansible_utils.spec +++ b/ansible_utils.spec @@ -20,12 +20,14 @@ Requires: PyYAML BuildRequires: python2-devel BuildRequires: python-setuptools BuildRequires: python-kitchen +BuildRequires: python-mock
# python-dingus isn't built for el7, can't run tests %if 0%{?rhel} > 6 BuildRequires: pytest BuildRequires: PyYAML BuildRequires: python-dingus +BuildRequires: python-mock %endif
# the cli uses argparse, which is not part of the standard libaray in python2.6 diff --git a/ansible_utils/rbac_playbook.py b/ansible_utils/rbac_playbook.py index 42be23f..c6a952c 100755 --- a/ansible_utils/rbac_playbook.py +++ b/ansible_utils/rbac_playbook.py @@ -140,7 +140,7 @@ def generate_message(result, username, playbook_name, args, checksum): :return: rendered string summarizing rbac activity """ subject = "[rbac-playbook] {0} {1} ran {2}".format(result, - username.pw_name, + username, playbook_name) body = ['Details:'] body.extend(['{0}: {1}'.format(key, value) for key, value in args.iteritems()]) @@ -188,10 +188,13 @@ def run_sudo_command(playbook_file, playbook_args): full_filename = os.path.abspath(os.path.join(config['config']['PLAYBOOKS_DIR'], playbook_file)) + python_args = ['cd', config['config']['ANSIBLE_DIR'], ';', + '/usr/bin/python2', config['config']['ANSIBLE_PLAYBOOK'], + full_filename] + python_args.extend(playbook_args) executable = '/usr/bin/sudo' - args = ['-i', '/usr/bin/python2', - config['config']['ANSIBLE_PLAYBOOK'], full_filename] - args.extend(playbook_args) + args = ['-i', '/bin/bash', '-i', '-c', + ' '.join(python_args)]
# Note: (1) The function used here has to pass the environment to # ansible-playbook so that it can connect to the ssh-agent correctly @@ -206,36 +209,56 @@ def run_sudo_command(playbook_file, playbook_args): print "EXECV: %s %s" % (executable, ' '.join(args)) os.execv(executable, args)
+def _get_username(): + """Retrieve the username for the user which started execution of rbac_playbook""" -def can_run(acl, groups, playbook_file): + user = os.getlogin() + username = pwd.getpwnam(user) + return username.pw_name + +def _get_group_members(groupname): + """Find the members of a specific group + + :param groupname: name of group to find members of + :return: list of usernames for members of the given group, empty list if the group does not exist""" + + group_data = None + try: + group_data = grp.getgrnam(groupname) + except KeyError: + return [] + + return group_data.gr_mem + +def _get_playbook_authorized_users(grouplist): + """Retrieve a set of all users who are members of one or more groups + + :param grouplist: list of one or more group names + :return: set of usernames corresponding to the union of members for each group in the grouplist""" + + userlist = [] + for groupname in grouplist: + userlist.extend(_get_group_members(groupname)) + + return set(userlist) + +def can_run(acl, username, playbook_file): """ determines whether the current user is allowed to run a playbook :param acl: dictionary of acl information - :param groups: groups of which the user is a member + :param username: username of user running the utility :param playbook_file: playbook file that is being run :return: True if the user is authorized, False if unauthorized """ - # exact match quick route + + authorized_groups = acl[playbook_file] + if playbook_file in acl: - pb_groups = frozenset(acl[playbook_file]) - if groups.intersection(pb_groups): + pb_authorized = _get_playbook_authorized_users(authorized_groups) + if username in pb_authorized: return True return False
- -def get_groups(): - """ retrieve the groups of which the current user is currently a member - :return: (username,groups) where groups is a set of groups which the current user is a member - """ - username = os.getlogin() - user = pwd.getpwnam(username) - groups = set(g.gr_name for g in grp.getgrall() if username in g.gr_mem) - groups.add(grp.getgrgid(user.pw_gid).gr_name) - groups = frozenset(groups) - - return user, groups - - def generate_args(options): """ Generate ansible-playbook compatible args representing the information passed into rbac @@ -272,11 +295,11 @@ def rbac_playbook(playbook_name, options): :param playbook_name: name of playbook file to run :param options: dictionary of options """ - username, groups = get_groups() + username = _get_username() checksum = get_checksum(playbook_name)
# raise exception if not allowed - if not can_run(config['acls'], groups, playbook_name): + if not can_run(config['acls'], username, playbook_name): notify(generate_message('FAILURE', username, playbook_name, options, checksum)) msg ="user {0} is not authorized to run {1}".format(username.pw_name, playbook_name) raise RbacAuthException(msg) diff --git a/tests/test_rbac_playbook.py b/tests/test_rbac_playbook.py index 765997d..0b76825 100644 --- a/tests/test_rbac_playbook.py +++ b/tests/test_rbac_playbook.py @@ -1,6 +1,8 @@ import copy +import grp
from dingus import Dingus +from mock import MagicMock, Mock
from ansible_utils import rbac_playbook as rbac
@@ -93,7 +95,7 @@ class TestGeneratePlaybookArgs(object): test_args = rbac.generate_args(ref_options) assert test_args == ['-l', ':'.join(ref_limit)]
- def test_simple_limit(self): + def test_simple_tag(self): ref_tags = ['tag', 'tagg', 'tog', 'togg']
ref_options = {'tags': ref_tags} @@ -123,25 +125,101 @@ class TestGeneratePlaybookArgs(object): '--start-at-task="some task"']
-class TestDetermineCanRun(object): +class TestCanRun(object):
def setup_method(self, method): self.ref_acl = {'group/unicorns.yml': ['sysadmin-unicorn', 'sysadmin-pony'], 'pony.yml': ['sysadmin-pony'] }
- def test_allow_one_of_multiple(self, monkeypatch): - ref_groups = set(['sysadmin-unicorn']) - test_can_run = rbac.can_run(self.ref_acl, ref_groups, 'group/unicorns.yml') + def test_allow(self, monkeypatch): + ref_user = 'twilightsparkle' + ref_authorized = ['twilightsparkle', 'fluttershy'] + + stub_authorized = MagicMock(return_value=ref_authorized) + monkeypatch.setattr(rbac, '_get_playbook_authorized_users', stub_authorized) + + test_can_run = rbac.can_run(self.ref_acl, ref_user, 'group/unicorns.yml') assert test_can_run
- def test_allow_one_of_multiple(self, monkeypatch): - ref_groups = set(['sysadmin-kittycat']) - test_can_run = rbac.can_run(self.ref_acl, ref_groups, 'group/unicorns.yml') + def test_disallow(self, monkeypatch): + ref_user = 'mittens' + ref_authorized = ['twilightsparkle', 'fluttershy'] + + stub_authorized = MagicMock(return_value=ref_authorized) + monkeypatch.setattr(rbac, '_get_playbook_authorized_users', stub_authorized) + + test_can_run = rbac.can_run(self.ref_acl, ref_user, 'group/unicorns.yml') assert not test_can_run
+class TestGetAuthorizedUsers(object): + + def setup_method(self, method): + self.ref_acl = {'group/unicorns.yml': ['sysadmin-unicorn', 'sysadmin-pony'], + 'pony.yml': ['sysadmin-pony'] + } + + def test_get_single_group_users(self, monkeypatch): + ref_authorized_groups = ['sysadmin-pony'] + ref_group_members = ['twilightsparkle', 'fluttershy'] + ref_members = set(ref_group_members) + + stub_group_members = MagicMock(return_value=ref_group_members) + monkeypatch.setattr(rbac, '_get_group_members', stub_group_members) + + test_group_members = rbac._get_playbook_authorized_users(ref_authorized_groups) + + assert test_group_members == ref_members + + def test_get_multiple_group_users(self, monkeypatch): + ref_authorized_groups = ['sysadmin-pony', 'sysadmin-unicorn'] + ref_group_members = [['twilightsparkle', 'fluttershy'], ['charlie', 'buttercup']] + ref_members = set(ref_group_members[0] + ref_group_members[1]) + + stub_group_members = MagicMock(side_effect=ref_group_members) + monkeypatch.setattr(rbac, '_get_group_members', stub_group_members) + + test_group_members = rbac._get_playbook_authorized_users(ref_authorized_groups) + + assert test_group_members == ref_members + + def test_get_multiple_group_with_overlap(self, monkeypatch): + ref_authorized_groups = ['sysadmin-pony', 'sysadmin-unicorn'] + ref_group_members = [['twilightsparkle', 'fluttershy'], ['charlie', 'buttercup', 'twilightsparkle']] + ref_members = set(ref_group_members[0] + ref_group_members[1]) + + stub_group_members = MagicMock(side_effect=ref_group_members) + monkeypatch.setattr(rbac, '_get_group_members', stub_group_members) + + test_group_members = rbac._get_playbook_authorized_users(ref_authorized_groups) + + assert test_group_members == ref_members + +class TestGetGroupMembers(object): + + def setup_method(self, method): + self.ref_groupname = 'sysadmin-ponies' + + def test_valid_group(self, monkeypatch): + ref_members = ['twilightsparkle', 'fluttershy'] + + stub_getgrnam = Mock(gr_mem=ref_members) + stub_grp = MagicMock(return_value=stub_getgrnam) + monkeypatch.setattr(grp, 'getgrnam', stub_grp) + + test_group_members = rbac._get_group_members(self.ref_groupname) + + assert test_group_members == ref_members + + def test_invalid_group(self, monkeypatch): + stub_grp = MagicMock(side_effect=KeyError) + monkeypatch.setattr(grp, 'getgrnam', stub_grp) + + test_group_members = rbac._get_group_members(self.ref_groupname) + + assert test_group_members == []
class TestConfigMerge(object):
On Tue, Sep 29, 2015 at 08:27:01AM -0600, Tim Flink wrote:
Long story short, when the batcave upgrade happened on Friday we found out that rbac_playbook doesn't work on el7 due to an issue with nss-altfiles.
I figured out how to sidestep the issue by changing the approach that rbac_playbook takes. It used to get all the groups for the user running the script and check for an intersection between those groups and the required groups for the playbook being run.
The new version looks at the groups required for the playbook being run, gathers all the users in those groups and verifies that the user running rbac_playbook is in that list before proceeding.
Thanks for doing this!
diff --git a/ansible_utils.spec b/ansible_utils.spec index 2e26a70..f097020 100644 --- a/ansible_utils.spec +++ b/ansible_utils.spec @@ -20,12 +20,14 @@ Requires: PyYAML BuildRequires: python2-devel BuildRequires: python-setuptools BuildRequires: python-kitchen +BuildRequires: python-mock
# python-dingus isn't built for el7, can't run tests %if 0%{?rhel} > 6 BuildRequires: pytest BuildRequires: PyYAML BuildRequires: python-dingus +BuildRequires: python-mock %endif
# the cli uses argparse, which is not part of the standard libaray in python2.6 diff --git a/ansible_utils/rbac_playbook.py b/ansible_utils/rbac_playbook.py index 42be23f..c6a952c 100755 --- a/ansible_utils/rbac_playbook.py +++ b/ansible_utils/rbac_playbook.py @@ -140,7 +140,7 @@ def generate_message(result, username, playbook_name, args, checksum): :return: rendered string summarizing rbac activity """ subject = "[rbac-playbook] {0} {1} ran {2}".format(result,
username.pw_name,
body = ['Details:'] body.extend(['{0}: {1}'.format(key, value) for key, value inusername, playbook_name)
args.iteritems()]) @@ -188,10 +188,13 @@ def run_sudo_command(playbook_file, playbook_args): full_filename = os.path.abspath(os.path.join(config['config']['PLAYBOOKS_DIR'], playbook_file))
- python_args = ['cd', config['config']['ANSIBLE_DIR'], ';',
'/usr/bin/python2',
config['config']['ANSIBLE_PLAYBOOK'],
full_filename]
- python_args.extend(playbook_args) executable = '/usr/bin/sudo'
- args = ['-i', '/usr/bin/python2',
config['config']['ANSIBLE_PLAYBOOK'], full_filename]
- args.extend(playbook_args)
- args = ['-i', '/bin/bash', '-i', '-c',
' '.join(python_args)]
Why the change here? Why the addition of bash?
# Note: (1) The function used here has to pass the environment to # ansible-playbook so that it can connect to the ssh-agent
correctly @@ -206,36 +209,56 @@ def run_sudo_command(playbook_file, playbook_args): print "EXECV: %s %s" % (executable, ' '.join(args)) os.execv(executable, args)
+def _get_username():
- """Retrieve the username for the user which started execution of
rbac_playbook""" -def can_run(acl, groups, playbook_file):
- user = os.getlogin()
- username = pwd.getpwnam(user)
- return username.pw_name
+def _get_group_members(groupname):
- """Find the members of a specific group
- :param groupname: name of group to find members of
- :return: list of usernames for members of the given group, empty
list if the group does not exist""" +
- group_data = None
- try:
group_data = grp.getgrnam(groupname)
- except KeyError:
return []
- return group_data.gr_mem
+def _get_playbook_authorized_users(grouplist):
- """Retrieve a set of all users who are members of one or more
groups +
- :param grouplist: list of one or more group names
- :return: set of usernames corresponding to the union of members
for each group in the grouplist""" +
- userlist = []
- for groupname in grouplist:
userlist.extend(_get_group_members(groupname))
- return set(userlist)
+def can_run(acl, username, playbook_file): """ determines whether the current user is allowed to run a playbook :param acl: dictionary of acl information
- :param groups: groups of which the user is a member
- :param username: username of user running the utility :param playbook_file: playbook file that is being run :return: True if the user is authorized, False if unauthorized """
- # exact match quick route
- authorized_groups = acl[playbook_file]
- if playbook_file in acl:
pb_groups = frozenset(acl[playbook_file])
if groups.intersection(pb_groups):
pb_authorized =
_get_playbook_authorized_users(authorized_groups)
return Falseif username in pb_authorized: return True
It looks like this function used to return False if an invalid playbook_file was passed to it, but now it will raise a KeyError. Is that intentional? I see there is a test case for it, but the `rbac_playbook` function doesn't handle it.
-def get_groups():
- """ retrieve the groups of which the current user is currently a
member
- :return: (username,groups) where groups is a set of groups which
the current user is a member
- """
- username = os.getlogin()
- user = pwd.getpwnam(username)
- groups = set(g.gr_name for g in grp.getgrall() if username in
g.gr_mem)
- groups.add(grp.getgrgid(user.pw_gid).gr_name)
- groups = frozenset(groups)
- return user, groups
def generate_args(options): """ Generate ansible-playbook compatible args representing the information passed into rbac @@ -272,11 +295,11 @@ def rbac_playbook(playbook_name, options): :param playbook_name: name of playbook file to run :param options: dictionary of options """
- username, groups = get_groups()
username = _get_username() checksum = get_checksum(playbook_name)
# raise exception if not allowed
- if not can_run(config['acls'], groups, playbook_name):
- if not can_run(config['acls'], username, playbook_name): notify(generate_message('FAILURE', username, playbook_name,
options, checksum)) msg ="user {0} is not authorized to run {1}".format(username.pw_name, playbook_name) raise RbacAuthException(msg) diff --git a/tests/test_rbac_playbook.py b/tests/test_rbac_playbook.py index 765997d..0b76825 100644 --- a/tests/test_rbac_playbook.py +++ b/tests/test_rbac_playbook.py @@ -1,6 +1,8 @@ import copy +import grp
from dingus import Dingus +from mock import MagicMock, Mock
from ansible_utils import rbac_playbook as rbac
@@ -93,7 +95,7 @@ class TestGeneratePlaybookArgs(object): test_args = rbac.generate_args(ref_options) assert test_args == ['-l', ':'.join(ref_limit)]
- def test_simple_limit(self):
def test_simple_tag(self): ref_tags = ['tag', 'tagg', 'tog', 'togg']
ref_options = {'tags': ref_tags}
@@ -123,25 +125,101 @@ class TestGeneratePlaybookArgs(object): '--start-at-task="some task"']
-class TestDetermineCanRun(object): +class TestCanRun(object):
def setup_method(self, method): self.ref_acl = {'group/unicorns.yml': ['sysadmin-unicorn',
'sysadmin-pony'], 'pony.yml': ['sysadmin-pony'] }
- def test_allow_one_of_multiple(self, monkeypatch):
ref_groups = set(['sysadmin-unicorn'])
test_can_run = rbac.can_run(self.ref_acl, ref_groups,
'group/unicorns.yml')
- def test_allow(self, monkeypatch):
ref_user = 'twilightsparkle'
ref_authorized = ['twilightsparkle', 'fluttershy']
stub_authorized = MagicMock(return_value=ref_authorized)
monkeypatch.setattr(rbac, '_get_playbook_authorized_users',
stub_authorized) +
test_can_run = rbac.can_run(self.ref_acl, ref_user,
'group/unicorns.yml') assert test_can_run
- def test_allow_one_of_multiple(self, monkeypatch):
ref_groups = set(['sysadmin-kittycat'])
test_can_run = rbac.can_run(self.ref_acl, ref_groups,
'group/unicorns.yml')
- def test_disallow(self, monkeypatch):
ref_user = 'mittens'
ref_authorized = ['twilightsparkle', 'fluttershy']
stub_authorized = MagicMock(return_value=ref_authorized)
monkeypatch.setattr(rbac, '_get_playbook_authorized_users',
stub_authorized) +
test_can_run = rbac.can_run(self.ref_acl, ref_user,
'group/unicorns.yml') assert not test_can_run
+class TestGetAuthorizedUsers(object):
- def setup_method(self, method):
self.ref_acl = {'group/unicorns.yml': ['sysadmin-unicorn',
'sysadmin-pony'],
'pony.yml': ['sysadmin-pony']
}
- def test_get_single_group_users(self, monkeypatch):
ref_authorized_groups = ['sysadmin-pony']
ref_group_members = ['twilightsparkle', 'fluttershy']
ref_members = set(ref_group_members)
stub_group_members = MagicMock(return_value=ref_group_members)
monkeypatch.setattr(rbac, '_get_group_members',
stub_group_members) +
test_group_members =
rbac._get_playbook_authorized_users(ref_authorized_groups) +
assert test_group_members == ref_members
- def test_get_multiple_group_users(self, monkeypatch):
ref_authorized_groups = ['sysadmin-pony', 'sysadmin-unicorn']
ref_group_members = [['twilightsparkle', 'fluttershy'],
['charlie', 'buttercup']]
ref_members = set(ref_group_members[0] + ref_group_members[1])
stub_group_members = MagicMock(side_effect=ref_group_members)
monkeypatch.setattr(rbac, '_get_group_members',
stub_group_members) +
test_group_members =
rbac._get_playbook_authorized_users(ref_authorized_groups) +
assert test_group_members == ref_members
- def test_get_multiple_group_with_overlap(self, monkeypatch):
ref_authorized_groups = ['sysadmin-pony', 'sysadmin-unicorn']
ref_group_members = [['twilightsparkle', 'fluttershy'],
['charlie', 'buttercup', 'twilightsparkle']]
ref_members = set(ref_group_members[0] + ref_group_members[1])
stub_group_members = MagicMock(side_effect=ref_group_members)
monkeypatch.setattr(rbac, '_get_group_members',
stub_group_members) +
test_group_members =
rbac._get_playbook_authorized_users(ref_authorized_groups) +
assert test_group_members == ref_members
+class TestGetGroupMembers(object):
- def setup_method(self, method):
self.ref_groupname = 'sysadmin-ponies'
- def test_valid_group(self, monkeypatch):
ref_members = ['twilightsparkle', 'fluttershy']
stub_getgrnam = Mock(gr_mem=ref_members)
stub_grp = MagicMock(return_value=stub_getgrnam)
monkeypatch.setattr(grp, 'getgrnam', stub_grp)
test_group_members =
rbac._get_group_members(self.ref_groupname) +
assert test_group_members == ref_members
- def test_invalid_group(self, monkeypatch):
stub_grp = MagicMock(side_effect=KeyError)
monkeypatch.setattr(grp, 'getgrnam', stub_grp)
test_group_members =
rbac._get_group_members(self.ref_groupname) +
assert test_group_members == []
class TestConfigMerge(object):
<snip>
Why the change here? Why the addition of bash?
No changed, just a slightly botched diff. New, properly done diff at the end of this email but that should be the only discrepancy.
+def can_run(acl, username, playbook_file): """ determines whether the current user is allowed to run a playbook :param acl: dictionary of acl information
- :param groups: groups of which the user is a member
- :param username: username of user running the utility :param playbook_file: playbook file that is being run :return: True if the user is authorized, False if unauthorized """
- # exact match quick route
- authorized_groups = acl[playbook_file]
- if playbook_file in acl:
pb_groups = frozenset(acl[playbook_file])
if groups.intersection(pb_groups):
pb_authorized =
_get_playbook_authorized_users(authorized_groups)
return Falseif username in pb_authorized: return True
It looks like this function used to return False if an invalid playbook_file was passed to it, but now it will raise a KeyError. Is that intentional? I see there is a test case for it, but the `rbac_playbook` function doesn't handle it.
KeyError now handled and test case added, thanks.
Tim
diff --git a/ansible_utils.spec b/ansible_utils.spec index a340832..ba45a3d 100644 --- a/ansible_utils.spec +++ b/ansible_utils.spec @@ -20,12 +20,14 @@ Requires: PyYAML BuildRequires: python2-devel BuildRequires: python-setuptools BuildRequires: python-kitchen +BuildRequires: python-mock
# python-dingus isn't built for el7, can't run tests %if 0%{?rhel} > 6 BuildRequires: pytest BuildRequires: PyYAML BuildRequires: python-dingus +BuildRequires: python-mock %endif
# the cli uses argparse, which is not part of the standard libaray in python2.6 diff --git a/ansible_utils/rbac_playbook.py b/ansible_utils/rbac_playbook.py index c850b94..553225b 100755 --- a/ansible_utils/rbac_playbook.py +++ b/ansible_utils/rbac_playbook.py @@ -140,7 +140,7 @@ def generate_message(result, username, playbook_name, args, checksum): :return: rendered string summarizing rbac activity """ subject = "[rbac-playbook] {0} {1} ran {2}".format(result, - username.pw_name, + username, playbook_name) body = ['Details:'] body.extend(['{0}: {1}'.format(key, value) for key, value in args.iteritems()]) @@ -209,36 +209,60 @@ def run_sudo_command(playbook_file, playbook_args): print "EXECV: %s %s" % (executable, ' '.join(args)) os.execv(executable, args)
+def _get_username(): + """Retrieve the username for the user which started execution of rbac_playbook""" -def can_run(acl, groups, playbook_file): + user = os.getlogin() + username = pwd.getpwnam(user) + return username.pw_name + +def _get_group_members(groupname): + """Find the members of a specific group + + :param groupname: name of group to find members of + :return: list of usernames for members of the given group, empty list if the group does not exist""" + + group_data = None + try: + group_data = grp.getgrnam(groupname) + except KeyError: + return [] + + return group_data.gr_mem + +def _get_playbook_authorized_users(grouplist): + """Retrieve a set of all users who are members of one or more groups + + :param grouplist: list of one or more group names + :return: set of usernames corresponding to the union of members for each group in the grouplist""" + + userlist = [] + for groupname in grouplist: + userlist.extend(_get_group_members(groupname)) + + return set(userlist) + +def can_run(acl, username, playbook_file): """ determines whether the current user is allowed to run a playbook :param acl: dictionary of acl information - :param groups: groups of which the user is a member + :param username: username of user running the utility :param playbook_file: playbook file that is being run - :return: True if the user is authorized, False if unauthorized + :return: True if the user is authorized, False if unauthorized or the + playbook filename is not in the acl information """ - # exact match quick route + + try: + authorized_groups = acl[playbook_file] + except KeyError: + return False + if playbook_file in acl: - pb_groups = frozenset(acl[playbook_file]) - if groups.intersection(pb_groups): + pb_authorized = _get_playbook_authorized_users(authorized_groups) + if username in pb_authorized: return True return False
- -def get_groups(): - """ retrieve the groups of which the current user is currently a member - :return: (username,groups) where groups is a set of groups which the current user is a member - """ - username = os.getlogin() - user = pwd.getpwnam(username) - groups = set(g.gr_name for g in grp.getgrall() if username in g.gr_mem) - groups.add(grp.getgrgid(user.pw_gid).gr_name) - groups = frozenset(groups) - - return user, groups - - def generate_args(options): """ Generate ansible-playbook compatible args representing the information passed into rbac @@ -275,11 +299,11 @@ def rbac_playbook(playbook_name, options): :param playbook_name: name of playbook file to run :param options: dictionary of options """ - username, groups = get_groups() + username = _get_username() checksum = get_checksum(playbook_name)
# raise exception if not allowed - if not can_run(config['acls'], groups, playbook_name): + if not can_run(config['acls'], username, playbook_name): notify(generate_message('FAILURE', username, playbook_name, options, checksum)) msg ="user {0} is not authorized to run {1}".format(username.pw_name, playbook_name) raise RbacAuthException(msg) diff --git a/tests/test_rbac_playbook.py b/tests/test_rbac_playbook.py index 765997d..665281e 100644 --- a/tests/test_rbac_playbook.py +++ b/tests/test_rbac_playbook.py @@ -1,6 +1,8 @@ import copy +import grp
from dingus import Dingus +from mock import MagicMock, Mock
from ansible_utils import rbac_playbook as rbac
@@ -83,7 +85,6 @@ class TestProcessArgs(object): class TestGeneratePlaybookArgs(object):
def test_simple_limit(self): - ref_playbook = 'unicorn.yml' ref_limit = ['pink.unicorn.com', 'white.unicorn.com', 'blue.unicorn.com', @@ -93,7 +94,7 @@ class TestGeneratePlaybookArgs(object): test_args = rbac.generate_args(ref_options) assert test_args == ['-l', ':'.join(ref_limit)]
- def test_simple_limit(self): + def test_simple_tag(self): ref_tags = ['tag', 'tagg', 'tog', 'togg']
ref_options = {'tags': ref_tags} @@ -102,8 +103,6 @@ class TestGeneratePlaybookArgs(object): assert test_args == ['-t', ','.join(ref_tags)]
def test_all(self): - - ref_playbook = 'unicorn.yml' ref_limit = ['pink.unicorn.com', 'white.unicorn.com', 'blue.unicorn.com', @@ -123,25 +122,109 @@ class TestGeneratePlaybookArgs(object): '--start-at-task="some task"']
-class TestDetermineCanRun(object): +class TestCanRun(object):
def setup_method(self, method): self.ref_acl = {'group/unicorns.yml': ['sysadmin-unicorn', 'sysadmin-pony'], 'pony.yml': ['sysadmin-pony'] }
- def test_allow_one_of_multiple(self, monkeypatch): - ref_groups = set(['sysadmin-unicorn']) - test_can_run = rbac.can_run(self.ref_acl, ref_groups, 'group/unicorns.yml') + def test_allow(self, monkeypatch): + ref_user = 'twilightsparkle' + ref_authorized = ['twilightsparkle', 'fluttershy'] + + stub_authorized = MagicMock(return_value=ref_authorized) + monkeypatch.setattr(rbac, '_get_playbook_authorized_users', stub_authorized) + + test_can_run = rbac.can_run(self.ref_acl, ref_user, 'group/unicorns.yml') assert test_can_run
- def test_allow_one_of_multiple(self, monkeypatch): - ref_groups = set(['sysadmin-kittycat']) - test_can_run = rbac.can_run(self.ref_acl, ref_groups, 'group/unicorns.yml') + def test_disallow(self, monkeypatch): + ref_user = 'mittens' + ref_authorized = ['twilightsparkle', 'fluttershy'] + + stub_authorized = MagicMock(return_value=ref_authorized) + monkeypatch.setattr(rbac, '_get_playbook_authorized_users', stub_authorized) + + test_can_run = rbac.can_run(self.ref_acl, ref_user, 'group/unicorns.yml') assert not test_can_run
+ def should_return_false_invalid_file(self, monkeypatch): + + ref_user = 'twilightsparkle' + + test_can_run = rbac.can_run({}, ref_user, 'group/unicorns.yml') + + assert not test_can_run + +class TestGetAuthorizedUsers(object): + + def setup_method(self, method): + self.ref_acl = {'group/unicorns.yml': ['sysadmin-unicorn', 'sysadmin-pony'], + 'pony.yml': ['sysadmin-pony'] + } + + def test_get_single_group_users(self, monkeypatch): + ref_authorized_groups = ['sysadmin-pony'] + ref_group_members = ['twilightsparkle', 'fluttershy'] + ref_members = set(ref_group_members) + + stub_group_members = MagicMock(return_value=ref_group_members) + monkeypatch.setattr(rbac, '_get_group_members', stub_group_members) + + test_group_members = rbac._get_playbook_authorized_users(ref_authorized_groups) + + assert test_group_members == ref_members + + def test_get_multiple_group_users(self, monkeypatch): + ref_authorized_groups = ['sysadmin-pony', 'sysadmin-unicorn'] + ref_group_members = [['twilightsparkle', 'fluttershy'], ['charlie', 'buttercup']] + ref_members = set(ref_group_members[0] + ref_group_members[1]) + + stub_group_members = MagicMock(side_effect=ref_group_members) + monkeypatch.setattr(rbac, '_get_group_members', stub_group_members) + + test_group_members = rbac._get_playbook_authorized_users(ref_authorized_groups) + + assert test_group_members == ref_members + + def test_get_multiple_group_with_overlap(self, monkeypatch): + ref_authorized_groups = ['sysadmin-pony', 'sysadmin-unicorn'] + ref_group_members = [['twilightsparkle', 'fluttershy'], ['charlie', 'buttercup', 'twilightsparkle']] + ref_members = set(ref_group_members[0] + ref_group_members[1]) + + stub_group_members = MagicMock(side_effect=ref_group_members) + monkeypatch.setattr(rbac, '_get_group_members', stub_group_members) + + test_group_members = rbac._get_playbook_authorized_users(ref_authorized_groups) + + assert test_group_members == ref_members + +class TestGetGroupMembers(object): + + def setup_method(self, method): + self.ref_groupname = 'sysadmin-ponies' + + def test_valid_group(self, monkeypatch): + ref_members = ['twilightsparkle', 'fluttershy'] + + stub_getgrnam = Mock(gr_mem=ref_members) + stub_grp = MagicMock(return_value=stub_getgrnam) + monkeypatch.setattr(grp, 'getgrnam', stub_grp) + + test_group_members = rbac._get_group_members(self.ref_groupname) + + assert test_group_members == ref_members + + def test_invalid_group(self, monkeypatch): + stub_grp = MagicMock(side_effect=KeyError) + monkeypatch.setattr(grp, 'getgrnam', stub_grp) + + test_group_members = rbac._get_group_members(self.ref_groupname) + + assert test_group_members == []
class TestConfigMerge(object):
On Tue, Sep 29, 2015 at 01:45:15PM -0600, Tim Flink wrote:
It looks like this function used to return False if an invalid playbook_file was passed to it, but now it will raise a KeyError. Is that intentional? I see there is a test case for it, but the `rbac_playbook` function doesn't handle it.
KeyError now handled and test case added, thanks.
Tim
Looks good now. +1, and thanks!
On Tue, 29 Sep 2015 17:26:36 -0400 Ralph Bean rbean@redhat.com wrote:
On Tue, Sep 29, 2015 at 01:45:15PM -0600, Tim Flink wrote:
It looks like this function used to return False if an invalid playbook_file was passed to it, but now it will raise a KeyError. Is that intentional? I see there is a test case for it, but the `rbac_playbook` function doesn't handle it.
KeyError now handled and test case added, thanks.
Tim
Looks good now. +1, and thanks!
Yeah, seems good to me too. ;)
kevin
On Tue, 29 Sep 2015 08:27:01 -0600 Tim Flink tflink@redhat.com wrote:
Long story short, when the batcave upgrade happened on Friday we found out that rbac_playbook doesn't work on el7 due to an issue with nss-altfiles.
I figured out how to sidestep the issue by changing the approach that rbac_playbook takes. It used to get all the groups for the user running the script and check for an intersection between those groups and the required groups for the playbook being run.
The new version looks at the groups required for the playbook being run, gathers all the users in those groups and verifies that the user running rbac_playbook is in that list before proceeding.
I've included the changes below for security review before updating anything on batcave01
Thanks for the reviews. Code has been pushed to git, I've built a new ansible_utils package and put that in the el7 infrastructure repo.
Tim
On Wed, 30 Sep 2015 12:07:07 -0600 Tim Flink tflink@redhat.com wrote:
On Tue, 29 Sep 2015 08:27:01 -0600 Tim Flink tflink@redhat.com wrote:
Long story short, when the batcave upgrade happened on Friday we found out that rbac_playbook doesn't work on el7 due to an issue with nss-altfiles.
I figured out how to sidestep the issue by changing the approach that rbac_playbook takes. It used to get all the groups for the user running the script and check for an intersection between those groups and the required groups for the playbook being run.
The new version looks at the groups required for the playbook being run, gathers all the users in those groups and verifies that the user running rbac_playbook is in that list before proceeding.
I've included the changes below for security review before updating anything on batcave01
Thanks for the reviews. Code has been pushed to git, I've built a new ansible_utils package and put that in the el7 infrastructure repo.
I've updated it on batcave. Testing welcome to make sure we are all working.
kevin
infrastructure@lists.fedoraproject.org