Source code for khorosjx.groups

# -*- coding: utf-8 -*-
"""
:Module:         khorosjx.groups
:Synopsis:       Collection of functions relating to security groups
:Usage:          ``from khorosjx import groups``
:Example:        ``group_info = groups.get_group_info(1051)``
:Created By:     Jeff Shurtliff
:Last Modified:  Jeff Shurtliff
:Modified Date:  22 Sep 2021
"""

import requests

from . import core, users, errors
from .utils.classes import Groups
from .utils.core_utils import eprint
from .utils import core_utils, df_utils

# Define global variables
base_url, api_credentials = '', None


# Define function to verify the connection in the core module
[docs]def verify_core_connection(): """This function verifies that the core connection information (Base URL and API credentials) has been defined. .. versionchanged:: 3.1.0 Refactored the function to be more pythonic and to avoid depending on a try/except block. :returns: None :raises: :py:exc:`khorosjx.errors.exceptions.KhorosJXError`, :py:exc:`khorosjx.errors.exceptions.NoCredentialsError` """ if not base_url or not api_credentials: retrieve_connection_info() return
[docs]def retrieve_connection_info(): """This function initializes and defines the global variables for the connection information. .. versionchanged:: 3.1.0 Refactored the function to be more efficient. :returns: None :raises: :py:exc:`khorosjx.errors.exceptions.KhorosJXError`, :py:exc:`khorosjx.errors.exceptions.NoCredentialsError` """ # Define the global variables at this module level global base_url global api_credentials base_url, api_credentials = core.get_connection_info() return
# Define function to get basic group information for a particular Group ID
[docs]def get_group_info(group_id, return_fields=None, ignore_exceptions=False): """This function obtains the group information for a given Group ID. .. versionchanged:: 3.1.0 Changed the default ``return_fields`` value to ``None`` and adjusted the function accordingly. :param group_id: The Group ID of the group whose information will be requested :type group_id: int,str :param return_fields: Specific fields to return if not all of the default fields are needed (Optional) :type return_fields: list, None :param ignore_exceptions: Determines whether nor not exceptions should be ignored (Default: ``False``) :type ignore_exceptions: bool :returns: A dictionary with the group information :raises: :py:exc:`khorosjx.errors.exceptions.GETRequestError`, :py:exc:`khorosjx.errors.exceptions.InvalidDatasetError` """ # Verify that the core connection has been established verify_core_connection() # Initialize the empty dictionary for the group information group_info = {} # Perform the API query to retrieve the group information query_uri = f"{base_url}/securityGroups/{group_id}?fields=@all" response = core.get_request_with_retries(query_uri) # Verify that the query was successful successful_response = errors.handlers.check_api_response(response, ignore_exceptions=ignore_exceptions) # Parse the data if the response was successful if successful_response: # Determine which fields to return group_json = response.json() group_info = core.get_fields_from_api_response(group_json, 'security_group', return_fields) return group_info
def _get_paginated_groups(_return_fields, _ignore_exceptions, _start_index): """This function returns paginated group information. (Up to 100 records at a time) .. versionchanged:: 3.1.0 Adjusted a dictionary lookup to proactively avoid raising a :py:exc:`KeyError` exception. :param _return_fields: Specific fields to return if not all of the default fields are needed (Optional) :type _return_fields: list, None :param _ignore_exceptions: Determines whether nor not exceptions should be ignored (Default: ``False``) :type _ignore_exceptions: bool :param _start_index: The startIndex API value :type _start_index: int, str :returns: A list of dictionaries containing information for each group in the paginated query """ # Initialize the empty list for the group information _groups = [] # Perform the API query to retrieve the group information _query_uri = f"{base_url}/securityGroups?fields=@all&count=100&startIndex={_start_index}" _response = core.get_request_with_retries(_query_uri) # Verify that the query was successful successful_response = errors.handlers.check_api_response(_response, ignore_exceptions=_ignore_exceptions) if successful_response: # Get the response data in JSON format _paginated_group_data = _response.json() for _group_data in _paginated_group_data.get('list'): _parsed_data = core.get_fields_from_api_response(_group_data, 'security_group', _return_fields) _groups.append(_parsed_data) return _groups # Define function to get information on all security groups
[docs]def get_all_groups(return_fields=None, return_type='list', ignore_exceptions=False): """This function returns information on all security groups found within the environment. .. versionchanged:: 3.1.0 Changed the default ``return_fields`` value to ``None`` and adjusted the function accordingly. :param return_fields: Specific fields to return if not all of the default fields are needed (Optional) :type return_fields: list, None :param return_type: Determines if the data should be returned in a list or a pandas dataframe (Default: ``list``) :type return_type: str :param ignore_exceptions: Determines whether nor not exceptions should be ignored (Default: ``False``) :type ignore_exceptions: bool :returns: A list of dictionaries or a dataframe containing information for each group :raises: :py:exc:`khorosjx.errors.exceptions.InvalidDatasetError` """ # Verify that the core connection has been established verify_core_connection() # Initialize the empty list for the group information all_groups = [] # Perform the first query to get up to the first 100 groups start_index = 0 groups = _get_paginated_groups(return_fields, ignore_exceptions, start_index) all_groups = core_utils.add_to_master_list(groups, all_groups) # Continue querying for groups until none are returned while groups: start_index += 100 groups = _get_paginated_groups(return_fields, ignore_exceptions, start_index) all_groups = core_utils.add_to_master_list(groups, all_groups) # Return the data as a master list of group dictionaries or a pandas dataframe if return_type == "dataframe": all_groups = df_utils.convert_dict_list_to_dataframe(all_groups) return all_groups
# Define function to obtain and return a list of the security group memberships for a user
[docs]def get_user_memberships(user_lookup, return_values='name', ignore_exceptions=False): """This function returns the security group memberships for a given user. .. versionchanged:: 3.1.0 Refactored the function to be more efficient. :param user_lookup: A User ID or email address that can be used to identify the user :type user_lookup: int,str :param return_values: The type of values that should be returned in the membership list (Default: ``name``) :type return_values: str :param ignore_exceptions: Determines whether nor not exceptions should be ignored (Default: ``False``) :type ignore_exceptions: bool :returns: A list of group memberships for the user :raises: :py:exc:`khorosjx.errors.exceptions.UserQueryError` """ # Verify that the core connection has been established verify_core_connection() # Initialize an empty list for group memberships memberships = [] # Get the User ID if the user lookup value is an email address if not isinstance(user_lookup, int) and user_lookup.isdigit() is False: if '@' in user_lookup: user_lookup = users.get_user_id(user_lookup) else: # TODO: Allow a username to be supplied and utilized as a lookup value error_msg = f"{user_lookup} is not a valid lookup value to obtain user memberships." if ignore_exceptions: print(error_msg) return memberships else: raise errors.exceptions.UserQueryError(error_msg) # Perform the API query and convert the response to JSON if possible query = f"{base_url}/people/{user_lookup}/securityGroups" response = core.get_request_with_retries(query) if response.status_code != 200: error_msg = f"The attempt to get group membership for the user {user_lookup} " + \ f"failed with a {response.status_code} status code." if ignore_exceptions: print(error_msg) else: raise errors.exceptions.UserQueryError(error_msg) else: groups = (response.json()).get('list') # Report a warning if 100+ groups are found since function doesn't currently look through multiple API responses if len(groups) >= 100: warn_msg = f"User ID {user_lookup} belongs to 100 or more and some may not have been captured." print(warn_msg) # TODO: Allow the paginate through the groups of >100 exist for a user # Add the group names to the memberships list for group in groups: if return_values == "name": memberships.append(group.get('name')) # TODO: Allow the Group ID to be specified as the return_values option and returned # Return the memberships list whether populated or empty return memberships
[docs]def check_user_membership(user_memberships, groups_to_check, scope='any', ignore_exceptions=False): """This function checks if a user belongs to one or more security groups. .. versionchanged:: 3.1.0 Parenthesis were added to the exception classes and the function was refactored to be more efficient. :param user_memberships: A list of security groups to which the user belongs :type user_memberships: list, tuple :param groups_to_check: One or more groups (name or ID) against which to compare the user's memberships :type groups_to_check: list, tuple, str :param scope: Determines the result returned for the comparison (Options: ``any``, ``all`` or ``each``) :type scope: str :param ignore_exceptions: Determines whether nor not exceptions should be ignored (Default: ``False``) :type ignore_exceptions: bool :returns: Returns a Boolean value for ``any`` and ``all`` scopes, or a list of Boolean values for ``each`` :raises: :py:exc:`khorosjx.errors.exceptions.InvalidScopeError` """ # Convert the groups_to_check argument to a tuple if a string was provided if isinstance(groups_to_check, str): # Check for a comma-separated string if ',' in groups_to_check: if ', ' in groups_to_check: groups_to_check = ', '.join(groups_to_check) else: groups_to_check = ','.join(groups_to_check) tuple(groups_to_check) else: groups_to_check = (groups_to_check, ) # Check to ensure that a valid scope is defined scope_types = ['any', 'all', 'each'] if scope not in scope_types: if ignore_exceptions: error_msg = f"The supplied scope '{scope}' is not recognized and the default scope of 'any' will be used." eprint(error_msg) else: raise errors.exceptions.InvalidScopeError() # Check the groups supplied against the list of memberships groups_found, all_results = [], [] for group in groups_to_check: if group in user_memberships: groups_found.append(group) all_results.append(True) else: all_results.append(False) # Define and return the Boolean response based on the scope result = False if scope == "any": result = True if groups_found else False elif scope == "all": result = True if len(groups_found) == len(groups_to_check) else False elif scope == "each": result = all_results return result
# Define function to add a user to a security group
[docs]def add_user_to_group(group_id, user_value, lookup_type="id", return_mode="none", print_results=True, ignore_exceptions=True): """This function adds a user to a security group. .. versionchanged:: 3.1.0 Parenthesis were added to the exception classes and the function was refactored to be more efficient. :param group_id: The Group ID of the security group to which the user should be added :type group_id: int, str :param user_value: The value with which to look up the user (e.g. User ID, email address) :type user_value: int, str :param lookup_type: Defines whether the user value is a User ID or an email address (Default: ``id``) :type lookup_type: str :param return_mode: Determines what--if anything--should be returned by the function (Default: ``none``) :type return_mode: str :param print_results: Determines whether or not all results (including success messages) should be printed onscreen :type print_results: bool :param ignore_exceptions: Determines whether nor not exceptions should be ignored (Default: ``True``) :type ignore_exceptions: bool :returns: The resulting status code, a Boolean value indicating the success of the operation, or nothing :raises: :py:exc:`khorosjx.errors.exceptions.POSTRequestError` """ # Verify that the core connection has been established verify_core_connection() # Obtain the Jive ID if an email address is provided for the user information valid_lookup_types = ('id', 'email') if lookup_type not in valid_lookup_types: if ignore_exceptions: error_msg = "The supplied lookup type for the API is not recognized. (Examples of valid " + \ "lookup types include 'id' and 'email')" eprint(error_msg) else: raise errors.exceptions.InvalidLookupTypeError() if lookup_type == "email": user_value = users.get_user_id(user_value) # Define the query parameters query_uri = f"{base_url}/securityGroups/{group_id}/members" user_uri = f'["{base_url}/people/{user_value}"]' # Add the user to the group response = requests.post(query_uri, data=user_uri, auth=api_credentials, headers={"Content-Type": "application/json", "Accept": "application/json"}) added_to_group = errors.handlers.check_api_response(response, 'post', ignore_exceptions=ignore_exceptions) # The remainder of the function assumes exceptions are being ignored if added_to_group: # True if status code == 204 if print_results: success_msg = f"The user (User ID {user_value}) has been added successfully to Group ID {group_id}." print(success_msg) else: fail_msg = f"The user (User ID {user_value}) failed to be added to Group ID {group_id} with a " \ f"{response.status_code} status code and the following message: {response.text}" eprint(fail_msg) # Define what is returned at the end of the function if return_mode.lower() == "status": try: return response.status_code except UnboundLocalError: return 500 elif return_mode.lower() == "bool": try: return added_to_group except UnboundLocalError: return False else: return
def _add_paginated_members(_base_query_uri, _response_data_type, _start_index, _ignore_exceptions, _return_fields, _all_users, _quiet=False): """This function retrieves a paginated list of users and then adds them to a master list. .. versionchanged:: 2.5.3 Added the optional ``_quiet`` argument to silence missing API field errors. :param _base_query_uri: The API query without the query string :type _base_query_uri: str :param _response_data_type: The dataset of fields that will be in the API response (e.g. ``group_members``) :type _response_data_type: str :param _start_index: The startIndex value in the API query string (``0`` by default) :type _start_index: int, str :param _ignore_exceptions: Determines whether nor not exceptions should be ignored (Default: ``False``) :type _ignore_exceptions: bool :param _return_fields: The fields that should be returned from the API response (Default: all fields in dataset) :type _return_fields: list :param _all_users: The master list of group members :type _all_users: list :param _quiet: Silences any errors about being unable to locate API fields (``False`` by default) :type _quiet: bool :returns: The master list of group members and the number of members found during this API call :raises: :py:exc:`khorosjx.errors.exceptions.GETRequestError` """ _paginated_users = core.get_paginated_results(_base_query_uri, _response_data_type, _start_index, ignore_exceptions=_ignore_exceptions, return_fields=_return_fields, quiet=_quiet) _all_users = core_utils.add_to_master_list(_paginated_users, _all_users) return _all_users, len(_paginated_users)
[docs]def get_group_memberships(group_id, user_type="member", only_id=True, return_type="list", ignore_exceptions=False, quiet=False): """This function gets the memberships (including administrator membership) for a specific security group. .. versionchanged:: 2.5.3 Added the optional ``_quiet`` argument to silence missing API field errors. .. versionchanged:: 2.5.1 The ``?fields=@all`` query string was added to the API URI to ensure all fields are retrieved. :param group_id: The Group ID for the security group :type group_id: int, str :param user_type: Determines if the function should return ``admin`` or ``member`` users (Default: ``member``) :type user_type: str :param only_id: Determines if only the User ID for the members should be returned or full data (Default: ``True``) :type only_id: bool :param return_type: Determines if a ``list`` or ``dataframe`` should be returned (Default: ``list``) :type return_type: str :param ignore_exceptions: Determines whether nor not exceptions should be ignored (Default: ``True``) :type ignore_exceptions: bool :param quiet: Silences any errors about being unable to locate API fields (``False`` by default) :type quiet: bool :returns: A list or dataframe of security group memberships :raises: :py:exc:`ValueError` """ # Verify that the core connection has been established verify_core_connection() # Initiate an empty list for the user data all_users = [] return_fields = [] # Define the base query URI if user_type in Groups.membership_types: base_query_uri = f"{base_url}/securityGroups/{group_id}/{Groups.membership_types.get(user_type)}?fields=@all" else: raise ValueError(f"The '{user_type}' value is not a valid user type.") # Determine if only the User IDs should be returned if only_id: return_fields.append('id') # Get the response data type response_data_type = Groups.user_type_mapping.get(user_type) # Perform the first query to get up to the first 100 members or admins start_index = 0 all_users, users_returned = _add_paginated_members(base_query_uri, response_data_type, start_index, ignore_exceptions, return_fields, all_users, quiet) # Continue querying for groups until none are returned while users_returned > 0: start_index += 100 all_users, users_returned = _add_paginated_members(base_query_uri, response_data_type, start_index, ignore_exceptions, return_fields, all_users, quiet) # Return the data as a master list of group dictionaries or a pandas dataframe if return_type == "dataframe": all_users = df_utils.convert_dict_list_to_dataframe(all_users) elif return_type == "list" and only_id is True: all_users = core_utils.convert_single_pair_dict_list(all_users) return all_users