Source code for

# -*- coding: utf-8 -*-
:Synopsis:       Collection of functions relating to security groups
:Usage:          ``from khorosjx import news``
:Example:        ``all_publication =``
:Created By:     Jeff Shurtliff
:Last Modified:  Jeff Shurtliff
:Modified Date:  22 Sep 2021

from . import core, errors
from .utils import core_utils, df_utils

# Define global variables
base_url, api_credentials = '', None

# Define function to verify the connection in the core module
[docs]def verify_core_connection(): """This function verifies that the core connection information (Base URL and API credentials) has been defined. .. versionchanged:: 3.1.0 Refactored the function to be more pythonic and to avoid depending on a try/except block. :returns: None :raises: :py:exc:`khorosjx.errors.exceptions.KhorosJXError`, :py:exc:`khorosjx.errors.exceptions.NoCredentialsError` """ if not base_url or not api_credentials: retrieve_connection_info() return
[docs]def retrieve_connection_info(): """This function initializes and defines the global variables for the connection information. .. versionchanged:: 3.1.0 Refactored the function to be more efficient. :returns: None :raises: :py:exc:`khorosjx.errors.exceptions.KhorosJXError`, :py:exc:`khorosjx.errors.exceptions.NoCredentialsError` """ # Define the global variables at this module level global base_url global api_credentials base_url, api_credentials = core.get_connection_info() return
[docs]def get_all_publications(return_fields=None, return_type='list', ignore_exceptions=False): """This function retrieves all publications within an environment. .. versionchanged:: 3.1.0 Changed the default ``return_fields`` value to ``None`` and adjusted the function accordingly. :param return_fields: Specific fields to return if not all of the default fields are needed (Optional) :type return_fields: list, None :param return_type: Determines if the data should be returned in a list or a pandas dataframe (Default: ``list``) :type return_type: str :param ignore_exceptions: Determines whether nor not exceptions should be ignored (Default: ``False``) :type ignore_exceptions: bool :returns: A list of dictionaries or a dataframe containing information for each publication :raises: :py:exc:`khorosjx.errors.exceptions.InvalidDatasetError` """ # Verify that the core connection has been established verify_core_connection() # Initialize the empty list for the subscription information all_publications = [] # Perform the first query to get up to the first 100 groups query = f'{base_url}/publications' start_index = 0 publications = core.get_paginated_results(query, 'publication', start_index, return_fields=return_fields, ignore_exceptions=ignore_exceptions) all_publications = core_utils.add_to_master_list(publications, all_publications) # Continue querying for groups until none are returned while publications: start_index += 100 publications = core.get_paginated_results(query, 'publication', start_index, return_fields=return_fields, ignore_exceptions=ignore_exceptions) all_publications = core_utils.add_to_master_list(publications, all_publications) # Return the data as a master list of publication dictionaries or a pandas dataframe if return_type == "dataframe": all_publications = df_utils.convert_dict_list_to_dataframe(all_publications) return all_publications
[docs]def get_publication(pub_id, return_fields=None, ignore_exceptions=False): """This function retrieves the information on a single publication when supplied its ID. .. versionchanged:: 3.1.0 Changed the default ``return_fields`` value to ``None`` and adjusted the function accordingly. :param pub_id: The ID of the publication :type pub_id: int, str :param return_fields: Specific fields to return if not all of the default fields are needed (Optional) :type return_fields: list, None :param ignore_exceptions: Determines whether nor not exceptions should be ignored (Default: ``False``) :type ignore_exceptions: bool :returns: A dictionary with the data for the publication :raises: :py:exc:`khorosjx.errors.exceptions.InvalidDatasetError`, :py:exc:`khorosjx.errors.exceptions.GETRequestError` """ # Verify that the core connection has been established verify_core_connection() # Retrieve the publication publication = core.get_data('publications', pub_id, return_json=False, all_fields=True) successful_response = errors.handlers.check_api_response(publication, ignore_exceptions=ignore_exceptions) if successful_response: publication = core.get_fields_from_api_response(publication.json(), 'publication', return_fields) return publication
[docs]def delete_publication(pub_id, return_json=False): """This function deletes a publication when given its ID. :param pub_id: The ID of the publication :type pub_id: int, str :param return_json: Determines if the API response should be returned in JSON format (``False`` by default) :type return_json: bool :returns: The API response (optionally in JSON format) """ # Verify that the core connection has been established verify_core_connection() # Delete the publication publication_uri = f"{base_url}/publications/{pub_id}" response = core.delete(publication_uri, return_json=return_json) return response
[docs]def get_subscription_data(pub_id): """This function returns the subscription data for a given publication. :param pub_id: The ID of the publication :type pub_id: int, str :returns: A list of dictionaries containing the data for each subscription """ return get_publication(pub_id, ['subscriptions'])
[docs]def get_subscription_ids(pub_id, return_type='str'): """This function compiles a list of subscription IDs for a given publication ID. :param pub_id: The ID of the publication :type pub_id: int, str :param return_type: Determines if the IDs should be returned in ``str`` (default) or ``int`` format :returns: A list of subscription IDs :raises: :py:exc:`ValueError` """ # Verify that the core connection has been established verify_core_connection() # Retrieve the subscription IDs subscription_ids = [] all_subscriptions = get_subscription_data(pub_id) for subscription in all_subscriptions: sub_id = subscription['id'] if return_type == 'int': sub_id = int(sub_id) elif return_type != 'str': raise ValueError(f"'{return_type}' is not a valid return type") subscription_ids.append(sub_id) return subscription_ids
[docs]def filter_subscriptions_by_id(sub_id, subscriptions): """This function filters the returned IDs by a supplied subscription ID when applicable. .. versionchanged:: 3.1.0 Parenthesis were added to the exception classes and the function was refactored to be more efficient. :param sub_id: The subscription ID to use as the filter :type sub_id: str :param subscriptions: A list of subscriptions :type subscriptions: list :returns: The subscription that has the supplied subscription ID :raises: :py:exc:`khorosjx.errors.exceptions.SubscriptionNotFoundError` """ for subscription in subscriptions: if subscription['id'] == sub_id: return subscription raise errors.exceptions.SubscriptionNotFoundError()
[docs]def get_subscriber_groups(publication_id, subscription_id='', full_uri=False): """This function identifies the subscriber groups for one or more subscriptions within a publication. .. versionchanged:: 3.1.0 Refactored the function to be more efficient. :param publication_id: The ID of the publication :type publication_id: int, str :param subscription_id: The specific subscription ID for which to return subscriber groups (Optional) :type subscription_id: int, str :param full_uri: Determines whether or not to return the full URI or just the Group ID (``False`` by default) :type full_uri: bool :returns: A dictionary mapping the subscription IDs to the respective subscriber groups :raises: :py:exc:`khorosjx.errors.exceptions.SubscriptionNotFoundError` """ # Verify that the core connection has been established verify_core_connection() # Capture the subscriber groups for each subscription subscriptions = get_subscription_data(publication_id) # Filter for a specific subscription if an ID is provided if subscription_id: subscriptions = filter_subscriptions_by_id(subscription_id, subscriptions) # Capture the subscriber groups subscriber_groups = {} for subscription in subscriptions: if full_uri: subscriber_groups[subscription['id']] = subscription.get('subscribers') else: subscribers = [] for subscriber in subscription.get('subscribers'): subscribers.append(subscriber.split('securityGroups/')[1]) subscriber_groups[subscription['id']] = subscribers return subscriber_groups
[docs]def get_subscriber_ids(subscribers): """This function pulls the subscriber IDs out of dictionaries and into a single list. :param subscribers: A list of dictionaries containing subscriber information from which to extract the IDs :type subscribers: list :returns: A list of IDs for the supplied subscribers """ subscriber_ids = [] for subscriber in subscribers: subscriber_ids.append(subscriber['id']) return subscriber_ids
[docs]def get_subscribers(publication_id, subscription_id, return_type='list', only_id=True, return_fields=None, ignore_exceptions=False): """This function retrieves the individual subscribers (i.e. users) for a given subscription within a publication. .. versionchanged:: 3.1.0 Changed the default ``return_fields`` value to ``None`` and adjusted the function accordingly. :param publication_id: The ID of the publication where the subscription resides :type publication_id: int, str :param subscription_id: The ID of the subscription in which to identify the subscribers :type subscription_id: int, str :param return_type: Determines whether the data should be returned as a ``list`` (default) or a pandas ``dataframe`` :type return_type: str :param only_id: Determines if only the ID of each user should be returned (default) or a dict with all user data :type only_id: bool :param return_fields: Specific fields to return if not all of the default fields are needed (Optional) :type return_fields: list, None :param ignore_exceptions: Determines whether nor not exceptions should be ignored (Default: ``False``) :type ignore_exceptions: bool :returns: A list or pandas dataframe with the subscriber information """ # Verify that the core connection has been established verify_core_connection() # Initialize the empty list for the subscription information all_subscribers = [] # Overwrite the return_fields list if the only_id value is True if only_id: return_fields = ['id'] # Perform the first query to get up to the first 100 groups query = f"{base_url}/publications/{publication_id}/subscriptions/{subscription_id}/subscribers" start_index = 0 subscribers = core.get_paginated_results(query, 'people', start_index, return_fields=return_fields, ignore_exceptions=ignore_exceptions) if only_id and return_type == 'list': subscribers = get_subscriber_ids(subscribers) all_subscribers = core_utils.add_to_master_list(subscribers, all_subscribers) # Continue querying for groups until none are returned while subscribers: start_index += 100 subscribers = core.get_paginated_results(query, 'people', start_index, return_fields=return_fields, ignore_exceptions=ignore_exceptions) if only_id and return_type == 'list': subscribers = get_subscriber_ids(subscribers) all_subscribers = core_utils.add_to_master_list(subscribers, all_subscribers) # Return the data as a master list of publication dictionaries or a pandas dataframe if return_type == "dataframe": all_subscribers = df_utils.convert_dict_list_to_dataframe(all_subscribers) return all_subscribers
[docs]def rebuild_publication(publication_id): """This function rebuilds a publication. :param publication_id: The ID of the publication to be rebuilt :type publication_id: int, str :returns: The response from the API PUT request :raises: :py:exc:`khorosjx.errors.exceptions.PUTRequestError` """ # Verify that the core connection has been established verify_core_connection() # Perform the PUT request to rebuild the publication query = f"{base_url}/publications/{publication_id}/rebuild" payload = {} response = core.put_request_with_retries(query, payload) return response
[docs]def update_publication(publication_id, payload): """This function updates a publication using the supplied JSON payload. :param publication_id: The ID of the publication to be updated :type publication_id: int, str :param payload: The JSON payload with which the publication will be updated :type payload: dict :returns: The response from the API PUT request :raises: :py:exc:`khorosjx.errors.exceptions.PUTRequestError` """ # Verify that the core connection has been established verify_core_connection() # Perform the PUT request to update the publication query = f"{base_url}/publications/{publication_id}" response = core.put_request_with_retries(query, payload) return response
[docs]def get_stream(stream_id, return_fields=None, ignore_exceptions=False): """This function retrieves the information on a single publication when supplied its ID. .. versionchanged:: 3.1.0 Changed the default ``return_fields`` value to ``None`` and adjusted the function accordingly. :param stream_id: The ID of the stream to retrieve :type stream_id: int, str :param return_fields: Specific fields to return if not all of the default fields are needed (Optional) :type return_fields: list, None :param ignore_exceptions: Determines whether nor not exceptions should be ignored (Default: ``False``) :type ignore_exceptions: bool :returns: A dictionary with the data for the publication :raises: :py:exc:`khorosjx.errors.exceptions.InvalidDatasetError`, :py:exc:`khorosjx.errors.exceptions.GETRequestError` """ # Verify that the core connection has been established verify_core_connection() # Retrieve the publication stream = core.get_data('streams', stream_id, return_json=False, all_fields=True) successful_response = errors.handlers.check_api_response(stream, ignore_exceptions=ignore_exceptions) if successful_response: stream = core.get_fields_from_api_response(stream.json(), 'stream', return_fields) return stream
[docs]def update_stream(stream_id, payload): """This function updates a stream using the supplied JSON payload. :param stream_id: The ID of the stream to be updated :type stream_id: int, str :param payload: The JSON payload with which the stream will be updated :type payload: dict :returns: The response from the API PUT request :raises: :py:exc:`khorosjx.errors.exceptions.PUTRequestError` """ # Verify that the core connection has been established verify_core_connection() # Perform the PUT request to update the publication query = f"{base_url}/streams/{stream_id}" response = core.put_request_with_retries(query, payload) return response
[docs]def delete_stream(stream_id, return_json=False): """This function deletes a stream when given its ID. :param stream_id: The ID of the stream :type stream_id: int, str :param return_json: Determines if the API response should be returned in JSON format (``False`` by default) :type return_json: bool :returns: The API response (optionally in JSON format) """ # Verify that the core connection has been established verify_core_connection() # Delete the publication stream_uri = f"{base_url}/streams/{stream_id}" response = core.delete(stream_uri, return_json=return_json) return response
# TODO: Add functions relating to the /people/{id}/streams endpoint