Source code for risksense_api.__subject.__groups.__groups

"""
Group module defined for different group related api endpoints.
"""

""" *******************************************************************************************************************
|
|  Name        :  __groups.py
|  Module      :  risksense_api
|  Description :  A class to be used for searching for and updating groups on the RiskSense Platform.
|  Copyright   : (c) RiskSense, Inc.
|  License     : Apache-2.0 (http://www.apache.org/licenses/LICENSE-2.0)
|
******************************************************************************************************************* """


import json
import profile
from ...__subject import Subject
from ..__exports import ExportFileType
from ..__exports import ExportRowNumbers
from ..__notifications import Notifications
from ..._params import *
from ..._api_request_handler import *
from ..__exports import Exports
import zipfile
import sys
import csv


[docs]class Groups(Subject): """Class for group function definitions. Args: profile: Profile Object To utlise group function: Usage: :obj:`self.{risksenseobjectname}.groups.{function}` Examples: To create a group using :meth:`create` function >>> self.{risksenseobjectname}.groups.create(args) """
[docs] def __init__(self, profile:object): """ Initialization of Groups object. Args: profile: Profile Object """ self.profile=profile self.subject_name = "group" Subject.__init__(self, profile, self.subject_name)
[docs] def downloadfilterinexport(self,filename:str,filters:list,client_id:int=None): """ Download group data based on search filters. Args: filename: Name of the file filters: A list of dictionaries containing filter parameters client_id: Client ID. If an ID isn't passed, will use the profile's default Client ID. Note: **IGNORE** - Internal funtion for csv dump """ if client_id is None: client_id= self._use_default_client_id() exportid=self.export(filters,file_name=filename) self.exports=Exports(self.profile) while(True): try: exportstatus=self.exports.check_status(exportid) print(exportstatus) if exportstatus=='COMPLETE': break elif exportstatus=='ERROR': print('error getting zip file please check ') exit() except (RequestFailed,MaxRetryError,StatusCodeError, ValueError) as ex: print(ex) print() print("Unable to export the file.") sys.exit("Exiting") try: self.exports.download_export(exportid,f"{filename}.csv") except(RequestFailed,MaxRetryError,StatusCodeError, ValueError) as ex: print(ex) print() print("Unable to retrieve file. Exiting.") sys.exit(1)
[docs] def list_group_filter_fields(self,client_id:int=None)->dict: """ List filter endpoints. Args: client_id: Client ID. If an ID isn't passed, will use the profile's default Client ID. Return: The JSON output from the platform is returned, listing the available filters. Examples: >>> apiobj = self.{risksenseobject}.groups.list_group_filter_fields() """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id))+'/filter' try: raw_response = self.request_handler.make_request(ApiRequestHandler.GET, url) except (Exception) as e: print() print('There seems to be an exception') print(e) exit() jsonified_response = json.loads(raw_response.text) return jsonified_response
[docs] def get_single_search_page(self, search_filters:list, projection:str=Projection.BASIC, page_num:int=0, page_size:int=150, sort_field:str=SortField.ID, sort_dir:str=SortDirection.ASC, client_id:int=None,csvdump:bool=False)->dict: """ Searches for and returns groups based on the provided filter(s) and other parameters. Args: search_filters: A list of dictionaries containing filter parameters. projection: Projection to use page_num: The page number of results to be returned. page_size: The number of results per page to be returned. sort_field: The field to be used for sorting the results returned. sort_dir: The direction of sorting to be used. (SortDirection.ASC or SortDirection.DESC) client_id: Client ID. If an ID isn't passed, will use the profile's default Client ID. csvdump: Toggle to dump data in csv Return: The JSON response from the platform is returned. Examples: >>> apiobj = self.{risksenseobject}.groups.get_single_search_page([{"field":"name","exclusive":False,"operator":"WILDCARD","value":"test*","implicitFilters":[]}]) Note: You can also dump the data of the group search in a csv file. Just make csvdump as True: >>> self.{risksenseobject}.groups.get_single_search_page([{"field":"name","exclusive":False,"operator":"WILDCARD","value":"test*","implicitFilters":[]}],csvdump=True) """ func_args = locals() func_args.pop('self') if client_id is None: client_id, func_args['client_id'] = self._use_default_client_id() if type(csvdump) != bool: print('csvdump value is not in type boolean') exit() try: response = self._get_single_search_page(self.subject_name, **func_args) except (RequestFailed,Exception) as e: print() print('There seems to be an exception') print(e) exit() if csvdump==True: self.downloadfilterinexport('groupsearchdata',search_filters) return response
[docs] def search(self, search_filters:list, projection:str=Projection.DETAIL, page_size:int=150, sort_field:int=SortField.ID, sort_dir:int=SortDirection.ASC, client_id:int=None,csvdump:bool=False)->list: """ Searches for and returns groups based on the provided filter(s) and other parameters. Rather than returning paginated results, this function cycles through all pages of results and returns them all in a single list. Args: search_filters: A list of dictionaries containing filter parameters. projection: Projection to use page_size: The number of results per page to be returned. sort_field: The field to be used for sorting the results returned. sort_dir: The direction of sorting to be used. (SortDirection.ASC or SortDirection.DESC) client_id: Client ID. If an ID isn't passed, will use the profile's default Client ID. csvdump: Toggle to dump data in csv Return: A list containing all host findings returned by the search using the filter provided. Examples: >>> apiobj = self.{risksenseobject}.groups.search([{"field":"name","exclusive":False,"operator":"WILDCARD","value":"test*","implicitFilters":[]}]) Note: You can also dump the data of the group search in a csv file. Just make csvdump as True: >>> self.{risksenseobject}.groups.search([{"field":"name","exclusive":False,"operator":"WILDCARD","value":"test*","implicitFilters":[]}],csvdump=True) """ func_args = locals() func_args.pop('self') all_results = [] func_args.pop('csvdump') if client_id is None: client_id, func_args['client_id'] = self._use_default_client_id() if type(csvdump) != bool: print('csvdump value is not in type boolean') exit() try: page_info = self._get_page_info(self.subject_name, search_filters, page_size=page_size, client_id=client_id) num_pages = page_info[1] except (RequestFailed,Exception) as e: print() print('There seems to be an exception') print(e) exit() page_range = range(0, num_pages) try: all_results = self._search(self.subject_name, self.get_single_search_page, page_range, **func_args) except (RequestFailed,Exception) as e: print() print('There seems to be an exception') print(e) exit() if csvdump==True: self.downloadfilterinexport('groupsearchdata',search_filters) return all_results
[docs] def create(self, name:str,description:str, client_id:int=None, csvdump:bool=False)->int: """ Creates a new group. Args: name: The name to be used for the new group. description: Group creation description client_id: Client ID. If an ID isn't passed, will use the profile's default Client ID. csvdump: Toggle to dump data in csv Return: The new group ID is returned. Examples: >>> apiobj = self.{risksenseobject}.groups.create('test','test') Note: You can also dump the data of the group job id in a csv file. Just make csvdump as True: >>> self.{risksenseobject}.groups.create('test','test',csvdump=True) """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id)) body = { "name": name, "description":description } if type(csvdump) != bool: print('csvdump value is not in type boolean') exit() try: raw_response = self.request_handler.make_request(ApiRequestHandler.POST, url, body=body) except (RequestFailed,Exception) as e: print() print('There seems to be an exception') print(e) exit() jsonified_response = json.loads(raw_response.text) new_group_id = jsonified_response['id'] try: if csvdump==True: field_names = ['id'] try: with open('groupcreate.csv', 'w', newline='') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=field_names) writer.writeheader() writer.writerow({'id': new_group_id}) except FileNotFoundError as fnfe: print("An exception has occurred while attempting to write the .csv file.") print() print(fnfe) except Exception as e: print('There seems to be an exception') print(e) exit() return new_group_id
[docs] def history(self, groupid:int, client_id:int=None,csvdump:bool=False)->dict: """ Get group history. Args: groupid: The id to be used to fetch group history. client_id: Client ID. If an ID isn't passed, will use the profile's default Client ID. csvdump: Toggle to dump data in csv Return: The history of group id. Examples: >>> apiobj = self.{risksenseobject}.groups.history(1234) Note: You can also dump the data of the group history in a csv file. Just make csvdump as True: >>> self.{risksenseobject}.groups.history(1234,csvdump=True) """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id))+f'/{groupid}/history' if type(csvdump) != bool: print('csvdump value is not in type boolean') exit() try: raw_response = self.request_handler.make_request(ApiRequestHandler.GET, url) except (RequestFailed,Exception) as e: print() print('There seems to be an exception') print(e) exit() jsonified_response = json.loads(raw_response.text) if csvdump==True: field_names = [] for item in jsonified_response: for key in item.keys(): if key not in field_names: field_names.append(item) try: with open('grouphistory.csv', 'w', newline='') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=field_names) writer.writeheader() for item in jsonified_response: writer.writerow(item) except FileNotFoundError as fnfe: print("An exception has occurred while attempting to write the .csv file.") print() print(fnfe) return jsonified_response
[docs] def delete(self, search_filters:list, client_id:int=None,csvdump:bool=False)->int: """ Deletes groups as specified in search_filters. Args: search_filters: A list of dictionaries containing filter parameters. client_id: Client ID. If an ID isn't passed, will use the profile's default Client ID. csvdump: Toggle to dump data in csv Return: Job Id Examples: >>> apiobj = self.{risksenseobject}.groups.delete([{"field":"name","exclusive":False,"operator":"WILDCARD","value":"test*","implicitFilters":[]}]) Note: You can also dump the data of the group to be deleted in a csv file. Just make csvdump as True: >>> self.{risksenseobject}.groups.get_single_search_page([{"field":"name","exclusive":False,"operator":"WILDCARD","value":"test*","implicitFilters":[]}],csvdump=True) """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id)) + "/delete" body = { "filterRequest": { "filters": search_filters } } if type(csvdump) != bool: print('csvdump value is not in type boolean') exit() if csvdump==True: self.downloadfilterinexport('groupexportbeforedeleting',search_filters) try: raw_response = self.request_handler.make_request(ApiRequestHandler.POST, url, body=body) except (RequestFailed,Exception) as e: print() print('There seems to be an exception') print(e) exit() if raw_response.status_code == 200: jsonified_response = json.loads(raw_response.text) deleted_groups = jsonified_response['id'] return deleted_groups
[docs] def update_single_group(self, group_id:int, client_id:int=None, csvdump:bool=False, **kwargs)->int: """ Updates a group name and/or asset criticality. Args: group_id: The group ID. client_id: Client ID. If an ID isn't passed, will use the profile's default Client ID. csvdump: Toggle to dump data in csv Keyword Args: name (`str`): The new name. description (`str`): The new Description Return: The job ID is returned. Examples: >>> apiobj = self.{risksenseobject}.groups.update_single_group(1234) Note: You can also dump the data of the group job id in a csv file. Just make csvdump as True: >>> self.{risksenseobject}.groups.update_single_group(1234,csvdump=True) """ if client_id is None: client_id = self._use_default_client_id()[0] name = kwargs.get('name', None) description = kwargs.get('description', None) url = self.api_base_url.format(str(client_id)) + "/" + str(group_id) body = {} if name is not None: body.update(name=name) if description is not None: body.update(description=description) if body == {}: ValueError("Body is empty. Please provide name and/or description") if type(csvdump) != bool: print('csvdump value is not in type boolean') exit() try: raw_response = self.request_handler.make_request(ApiRequestHandler.PUT, url, body=body) except (RequestFailed,Exception) as e: print() print('There seems to be an exception') print(e) exit() jsonified_response = json.loads(raw_response.text) job_id = jsonified_response['id'] try: if csvdump==True: field_names = ['id'] try: with open('groupupdate.csv', 'w', newline='') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=field_names) writer.writeheader() writer.writerow({'id': job_id}) except FileNotFoundError as fnfe: print("An exception has occurred while attempting to write the .csv file.") print() print(fnfe) except Exception as e: print('There seems to be an exception') print(e) exit() return job_id
[docs] def assign(self, search_filters:list, user_ids:list, client_id:int=None, csvdump:bool=False)->int: """ Assign group(s) to user IDs, based on specified filter(s) Args: search_filters: A list of dictionaries containing filter parameters. user_ids: A list of user IDs. client_id: Client ID. If an ID isn't passed, will use the profile's default Client ID. csvdump: Toggle to dump data in csv Return: The job ID is returned. Examples: >>> apiobj = self.{risksenseobject}.groups.assign([{"field":"name","exclusive":False,"operator":"EXACT","value":"test","implicitFilters":[]}],[1234]) Note: You can also dump the data of the group job id in a csv file. Just make csvdump as True: >>> self.{risksenseobject}.groups.assign([{"field":"name","exclusive":False,"operator":"EXACT","value":"test","implicitFilters":[]}],[1234],csvdump=True) """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id)) + "/assign" body = { "filters": search_filters, "userIds": user_ids } if type(csvdump) != bool: print('csvdump value is not in type boolean') exit() try: raw_response = self.request_handler.make_request(ApiRequestHandler.POST, url, body=body) except (RequestFailed,Exception) as e: print() print('There seems to be an exception') print(e) exit() jsonified_response = json.loads(raw_response.text) job_id = jsonified_response['id'] try: if csvdump==True: field_names = ['id'] try: with open('groupassign.csv', 'w', newline='') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=field_names) writer.writeheader() writer.writerow({'id': job_id}) except FileNotFoundError as fnfe: print("An exception has occurred while attempting to write the .csv file.") print() print(fnfe) except Exception as e: print('There seems to be an exception') print(e) exit() return job_id
[docs] def unassign(self, search_filters:list, user_ids:list, client_id:int=None,csvdump:bool=False)->int: """ Unassign group(s) from user IDs, based on specified filter(s) Args: search_filters: A list of dictionaries containing filter parameters. user_ids: A list of user IDs. client_id: Client ID. If an ID isn't passed, will use the profile's default Client ID. csvdump: Toggle to dump data in csv Return: The job ID is returned. Examples: >>> apiobj = self.{risksenseobject}.groups.unassign([{"field":"name","exclusive":False,"operator":"EXACT","value":"test","implicitFilters":[]}],[1234]) Note: You can also dump the data of the group job id in a csv file. Just make csvdump as True: >>> self.{risksenseobject}.groups.unassign([{"field":"name","exclusive":False,"operator":"EXACT","value":"test","implicitFilters":[]}],[1234],csvdump=True) """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id)) + "/unassign" if csvdump==True: self.downloadfilterinexport('groupexportbeforeunassigning',search_filters) body = { "filters": search_filters, "userIds": user_ids } if type(csvdump) != bool: print('csvdump value is not in type boolean') exit() try: raw_response = self.request_handler.make_request(ApiRequestHandler.POST, url, body=body) except (RequestFailed,Exception) as e: print() print('There seems to be an exception') print(e) exit() jsonified_response = json.loads(raw_response.text) job_id = jsonified_response['id'] try: if csvdump==True: field_names = ['id'] try: with open('groupunassign.csv', 'w', newline='') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=field_names) writer.writeheader() writer.writerow({'id': job_id}) except FileNotFoundError as fnfe: print("An exception has occurred while attempting to write the .csv file.") print() print(fnfe) except Exception as e: print('There seems to be an exception') print(e) exit() return job_id
[docs] def get_model(self, client_id:int=None)->dict: """ Get available projections and models for Groups. Args: client_id: Client ID Return: Group projections and models are returned. Examples: >>> apiobj = self.{risksenseobject}.groups.get_model() """ if client_id is None: client_id = self._use_default_client_id()[0] try: response = self._model(self.subject_name, client_id) except (RequestFailed,Exception) as e: print() print('There seems to be an exception') print(e) exit() return response
[docs] def suggest(self, search_filter:list, suggest_filter:dict, client_id:int=None)->dict: """ Suggest values for filter fields. Args: search_filter: Search Filter suggest_filter: Suggest Filter client_id: Client ID. If an ID isn't passed, will use the profile's default Client ID. Return: Value suggestions Examples: >>> apiobj = self.{risksenseobject}.groups.suggest([{"field":"name","exclusive":False,"operator":"WILDCARD","value":"test*","implicitFilters":[]}]) """ if client_id is None: client_id = self._use_default_client_id()[0] try: response = self._suggest(self.subject_name, search_filter, suggest_filter, client_id) except (RequestFailed,Exception) as e: print() print('There seems to be an exception') print(e) exit() return response
[docs] def getexporttemplate(self,client_id:int=None)->list: """ Gets configurable export template for application findings. Args: client_id: Client ID. If an ID isn't passed, will use the profile's default Client ID. Return: The Exportable fields Examples: >>> apiobj = self.{risksenseobject}.groups.getexporttemplate() """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id)) + "/export/template" try: raw_response = self.request_handler.make_request(ApiRequestHandler.GET, url) except (RequestFailed,Exception) as e: print() print('There seems to be an exception') print(e) exit() exportablefilter = json.loads(raw_response.text) for i in range(len(exportablefilter['exportableFields'])): for j in range(len(exportablefilter['exportableFields'][i]['fields'])): if exportablefilter['exportableFields'][i]['fields'][j]['selected']==False: exportablefilter['exportableFields'][i]['fields'][j]['selected']=True return exportablefilter['exportableFields']
[docs] def export(self, search_filters:list, file_name:str, row_count:str=ExportRowNumbers.ROW_ALL,file_type:str=ExportFileType.CSV, client_id:int=None, csvdump:bool=False)->int: """ Initiates an export job on the platform for group(s) based on the provided filter(s). Args: search_filters: A list of dictionaries containing filter parameters. file_name: The name to be used for the exported file. row_count: No of rows to be exported. Possible options : ExportRowNumbers.ROW_5000, ExportRowNumbers.ROW_10000, ExportRowNumbers.ROW_25000, ExportRowNumbers.ROW_50000", ExportRowNumbers.ROW_100000", ExportRowNumbers.ROW_ALL file_type: File type to export. ExportFileType.CSV, ExportFileType.XML, or ExportFileType.XLSX client_id: Client ID. If an ID isn't passed, will use the profile's default Client ID. csvdump: Toggle to dump data in csv Return: The Export job ID in the platform from is returned. Examples: >>> apiobj = self.{risksenseobject}.groups.export([{"field":"name","exclusive":False,"operator":"WILDCARD","value":"test*","implicitFilters":[]}],'test') Note: You can also dump the data of the group in a csv file. Just make csvdump as True: >>> self.{risksenseobject}.groups.export([{"field":"name","exclusive":False,"operator":"WILDCARD","value":"test*","implicitFilters":[]}],csvdump=True) """ func_args = locals() func_args['exportable_filter']=self.getexporttemplate() func_args.pop('self') if client_id is None: func_args['client_id'] = self._use_default_client_id()[1] if type(csvdump) != bool: print('csvdump value is not in type boolean') exit() try: export_id = self._export(self.subject_name, **func_args) except (RequestFailed,Exception) as e: print() print('There seems to be an exception') print(e) exit() try: if csvdump==True: field_names = ['id'] try: with open('groupexport.csv', 'w', newline='') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=field_names) writer.writeheader() writer.writerow({'id': export_id}) except FileNotFoundError as fnfe: print("An exception has occurred while attempting to write the .csv file.") print() print(fnfe) except Exception as e: print('There seems to be an exception') print(e) exit() return export_id
## PRIVATE FUNCTIONS ##
[docs] def subscribe_change_in_grouprs3(self,client_id:int=None)->dict: """ Subscribe change in group rs3 notification Args: client_id: Client ID. If an ID isn't passed, will use the profile's default Client ID. Return: Jsonified response Examples: >>> apiobj = self.{risksenseobject}.groups.subscribe_change_in_grouprs3() """ if client_id is None: client_id = self._use_default_client_id() self.notifications=Notifications(self.profile) try: subscribe = self.notifications.subscribe_notifications(notificationtypeid=3,subscribe=True) except (RequestFailed,Exception) as e: print() print('There seems to be an exception') print(e) exit() return subscribe
[docs] def unsubscribe_change_in_grouprs3(self,client_id:int=None)->dict: """ Unsubscribe change in group rs3 notification Args: client_id: Client ID. If an ID isn't passed, will use the profile's default Client ID. Return: Jsonified response Examples: >>> apiobj = self.{risksenseobject}.groups.unsubscribe_change_in_grouprs3() """ if client_id is None: client_id = self._use_default_client_id() try: print(client_id) subscribe = self.notifications.subscribe_notifications(self,notificationtypeid=3,subscribe=False) except (RequestFailed,Exception) as e: print() print('There seems to be an exception') print(e) exit() return subscribe
""" Copyright 2022 RiskSense, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """