Source code for khorosjx.content.base
# -*- coding: utf-8 -*-
"""
:Module: khorosjx.content.base
:Synopsis: Collection of core functions relating to content
:Usage: ``import khorosjx.content.base as content_core``
:Example: ``content_id = content_core.get_content_id(url, 'document')``
:Created By: Jeff Shurtliff
:Last Modified: Jeff Shurtliff
:Modified Date: 22 Sep 2021
"""
import re
from .. import core, errors
from ..utils import core_utils
from ..utils.classes import Content
# Define global variables
base_url, api_credentials = '', None
# Define function to verify the connection in the core module
[docs]def verify_core_connection():
"""This function verifies that the core connection information (Base URL and API credentials) has been defined.
.. versionchanged:: 3.1.0
Refactored the function to be more pythonic and to avoid depending on a try/except block.
:returns: None
:raises: :py:exc:`khorosjx.errors.exceptions.KhorosJXError`,
:py:exc:`khorosjx.errors.exceptions.NoCredentialsError`
"""
if not base_url or not api_credentials:
retrieve_connection_info()
return
[docs]def retrieve_connection_info():
"""This function initializes and defines the global variables for the connection information.
.. versionchanged:: 3.1.0
Refactored the function to be more efficient.
:returns: None
:raises: :py:exc:`khorosjx.errors.exceptions.KhorosJXError`,
:py:exc:`khorosjx.errors.exceptions.NoCredentialsError`
"""
# Define the global variables at this module level
global base_url
global api_credentials
base_url, api_credentials = core.get_connection_info()
return
# Define function to get the content ID from a URL
[docs]def get_content_id(url, content_type="document", verify_ssl=True):
"""This function obtains the Content ID for a particular content asset. (Supports all but blog posts)
.. versionchanged:: 3.1.0
Made some minor syntax improvements.
.. versionchanged:: 2.6.0
Added the ``verify_ssl`` argument.
:param url: The URL to the content
:type url: str
:param content_type: The content type for the URL for which to obtain the Content ID (Default: ``document``)
:type content_type: str
:param verify_ssl: Determines if API calls should verify SSL certificates (``True`` by default)
:type verify_ssl: bool
:returns: The Content ID for the content URL
:raises: :py:exc:`ValueError`, :py:exc:`khorosjx.errors.exceptions.ContentNotFoundError`
"""
# Verify that the core connection has been established
verify_core_connection()
# Get the domain URL from the supplied content URL
if content_type in Content.content_url_delimiters:
platform_url = url.split(Content.content_url_delimiters.get(content_type))[0]
if not platform_url.startswith('http'):
platform_url = f"https://{platform_url}"
else:
error_msg = "Unable to identify the platform URL for the URL and defined content type."
raise ValueError(error_msg)
# Get the ID to be used in the GET request
if content_type == "document":
item_id = url.split('DOC-')[1]
elif content_type == "blog post":
raise ValueError("The get_content_id function does not currently support blog posts.")
else:
item_id = re.sub(r'^.*/', '', url)
# Construct the appropriate query URL
if content_type in Content.content_types:
content_type_id = Content.content_types.get(content_type)
query_url = f"{platform_url}/api/core/v3/contents?filter=entityDescriptor({content_type_id},{item_id})&count=1"
else:
error_msg = f"The content type {content_type} is unrecognized. Unable to perform the function."
raise ValueError(error_msg)
# Query the API to get the content ID
try:
response = core.get_request_with_retries(query_url, verify_ssl=verify_ssl)
content_data = response.json()
content_id = content_data['list'][0]['contentID']
except KeyError:
raise errors.exceptions.ContentNotFoundError()
return content_id
# Define an internal function to convert a lookup value to a proper lookup type
def __convert_lookup_value(_lookup_value, _lookup_type, _content_type="document"):
"""This function converts a lookup value to a proper lookup type.
:param _lookup_value: The lookup value to be converted
:type _lookup_value: str, int
:param _lookup_type: The current lookup type of the value to be converted
:type _lookup_type: str
:param _content_type: The type of content associated with the lookup value and lookup type (Default: ``document``)
:type _content_type: str
:returns: The properly formatted lookup value
:raises: :py:exc:`khorosjx.errors.exceptions.LookupMismatchError`,
:py:exc:`khorosjx.errors.exceptions.InvalidLookupTypeError`,
:py:exc:`khorosjx.errors.exceptions.CurrentlyUnsupportedError`
"""
# TODO: Rename this function to only have one underscore prefix
# Verify that the core connection has been established
verify_core_connection()
# Convert the lookup value as needed
if _content_type == "document":
# Get the Content ID if not supplied
if _lookup_type == "doc_id" or _lookup_type == "url":
if _lookup_type == "doc_id":
if 'http' in str(_lookup_value):
_error_msg = f"The 'doc_id' lookup_type was supplied (default) but the lookup value is a URL."
raise errors.exceptions.LookupMismatchError(_error_msg)
_lookup_value = f"{base_url.split('/api')[0]}/docs/DOC-{_lookup_value}"
_lookup_value = get_content_id(_lookup_value)
elif _lookup_type != "id" and _lookup_type != "content_id":
_exception_msg = "The supplied lookup type for the API is not recognized. " + \
"(Valid lookup types include 'id', 'content_id', 'doc_id' and 'url')"
raise errors.exceptions.InvalidLookupTypeError(_exception_msg)
else:
_exception_msg = f"The '{_content_type}' content type is not currently supported."
raise errors.exceptions.CurrentlyUnsupportedError(_exception_msg)
# TODO: Add functionality for other content types (e.g. discussion/question threads)
return _lookup_value
# Define internal function to trim the attachments data
def __trim_attachments_info(_attachment_info):
"""This function removes certain fields from attachments data captured via the API.
:param _attachment_info: List containing dictionaries of attachments retrieved via the API
:type _attachment_info: list
:returns: The trimmed list of dictionaries
"""
# TODO: Rename this function to only have one underscore prefix
for _idx in range(len(_attachment_info)):
_fields_to_ignore = ['resources', 'doUpload']
for _ignored_field in _fields_to_ignore:
if _ignored_field in _attachment_info[_idx].keys():
del _attachment_info[_idx][_ignored_field]
return _attachment_info
[docs]def get_paginated_content(endpoint, query_string="", start_index=0, dataset="", all_fields=False, return_fields=None,
ignore_exceptions=False):
"""This function returns paginated content information. (Up to 100 records at a time)
.. versionchanged:: 3.1.0
Changed the default ``return_fields`` value to ``None`` and adjusted the function accordingly.
:param endpoint: The full endpoint without preceding slash (e.g. ``securityGroups``, ``people/email/user_email``)
:type endpoint: str
:param query_string: Any query strings to apply (without preceding ``?``) excluding ``count`` and ``startIndex``
:type query_string: str
:param start_index: The startIndex API value
:type start_index: int, str
:param dataset: Defines the type of data returned in the API response (e.g. ``security_group``, ``people``, etc.)
:type dataset: str
:param all_fields: Determines if the ``fields=@all`` parameter should be passed in the query
:type all_fields: bool
:param return_fields: Specific fields to return if not all of the default fields are needed (Optional)
:type return_fields: list, None
:param ignore_exceptions: Determines whether nor not exceptions should be ignored (Default: ``False``)
:type ignore_exceptions: bool
:returns: A list of dictionaries containing information for each group in the paginated query
"""
# Initialize the empty list for the group information
content = []
# Identify if all fields should be captured
all_fields_options = {True: 'fields=@all&', False: ''}
all_fields = False if 'fields=@all' in query_string else all_fields
all_fields = all_fields_options.get(all_fields)
# Construct the API query
start_index_delimiter = {True: '?', False: '&'}
query_uri = f"{base_url}/{endpoint.replace('?', '')}?{query_string.replace('?', '')}"
empty_query = True if query_string == "" else False
query_uri = f"{query_uri}{start_index_delimiter.get(empty_query)}" + \
f"{all_fields}startIndex={start_index}&count=100"
# Perform the API query to retrieve the group information
response = core.get_request_with_retries(query_uri)
# Verify that the query was successful
successful_response = errors.handlers.check_api_response(response, ignore_exceptions=ignore_exceptions)
if successful_response:
# Get the response data in JSON format
paginated_data = response.json()
for content_data in paginated_data.get('list'):
if dataset == "" or dataset not in Content.datasets:
dataset = core_utils.identify_dataset(query_uri)
parsed_data = core.get_fields_from_api_response(content_data, dataset, return_fields)
content.append(parsed_data)
return content