Source code for risksense_api.__subject.__sla.__sla

"""
**Sla module defined for different sla related api endpoints.**
"""
""" *******************************************************************************************************************
|
|  Name        :  __sla.py
|  Description :  SLA
|  Project     :  risksense_api
|  Copyright   :  2022 RiskSense, Inc.
|  License     :  Apache-2.0 (https://www.apache.org/licenses/LICENSE-2.0.txt)
|
******************************************************************************************************************* """

import json
from re import L
import sched
from ..__subject import Subject
from ..._params import *
from ..._api_request_handler import *
import csv
import pandas as pd

[docs]class SlaActionType: """Types of sla action type""" REMEDIATION_SLA= "REMEDIATION_SLA"
[docs]class SlaMatrix : """Types of Sla Matrix""" STANDARD= {"1":[45,90,90,120,0],"2":[30,90,90,120,0],"3":[21,45,90,120,0],"4":[14,30,90,120,0],"5":[7,21,90,120,0]} ACCELERATED= {"1":[30,60,90,90,0],"2":[21,45,90,90,0],"3":[14,30,60,90,0],"4":[7,21,45,90,0],"5":[3,14,30,90,0]} AGGRESSIVE= {"1":[14,30,45,60,0],"2":[7,21,30,60,0],"3":[3,14,30,60,0],"4":[2,7,15,60,0],"5":[1,3,15,60,0]}
[docs]class SlaDataOperator: """Types of sla data operator""" MET_SLA="MET_SLA" OVERDUE="OVERDUE" MISSED_SLA="MISSED_SLA" WITHIN_SLA="WITHIN_SLA"
[docs]class SlaMatrixProfileType: """Types of Sla Matrix Profile Type""" STANDARD="STANDARD" AGGRESSIVE="AGGRESSIVE" ACCELERATED="ACCELERATED" CUSTOM="CUSTOM"
[docs]class TimeReference: """Types of sla time reference""" INGESTION_DATE = "INGESTION_DATE" DISCOVERED_DATE = "DISCOVERED_DATE"
[docs]class offsetbasis: """Types of sla offset basis""" VRR = "VRR" SEVERITY = "SEVERITY"
[docs]class Sla(Subject): """ Sla class """ """ **Class for Sla function defintions**. To utlise sla function: Args: profile: Profile Object Usage: :obj:`self.{risksenseobjectname}.sla.{function}` Examples: To get sla rules use :meth:`getslarules()` function >>> self.{risksenseobject}.sla.getslarules({args}) """
[docs] def __init__(self, profile:object): """ Initialization of sla object. Args: profile: Profile Object """ self.subject_name = "sla" Subject.__init__(self, profile, self.subject_name)
[docs] def create_sla(self,description:str,schedule_type:str='DAILY',hourofday:int=12,name:str='Remediation SLAs',csvdump:bool=False,client_id:int=None,**kwargs)->str: """ Creates an sla Args: name: Name of playbook,note for system sla please mention Remediation SLAs description: Provide description for sla schedule_type: Schedule Frequency (ScheduleFreq.DAILY, ScheduleFreq.WEEKLY, ScheduleFreq.MONTHLY, 'DISABLED') hourofday: Hour of the day csvdump: dumps the data in csv client_id: Client ID Keyword Args: day_of_week(``str``): Day of the week day_of_month(``str``): Day of the month Return: Playbook UUID Example: To create a remediation sla >>> self.{risksenseobject}.sla.create_sla('remediation sla') Note: You can also dump the data in csv using :obj:`csvdump=True` >>> self.{risksenseobject}.sla.create_sla('remediation sla',csvdump=True) """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id)) day_of_week = kwargs.get("day_of_week", None) day_of_month = kwargs.get("day_of_month", None) body = { "name": name, "description": description, "schedule": { "type": schedule_type, "hourOfDay": hourofday }, "type":"System" } if type(csvdump)!=bool: print('Error in csvdump value,Please provide either true or false') exit() elif schedule_type == ScheduleFreq.WEEKLY: if int(day_of_week) < 1 or int(day_of_week) > 7: raise ValueError("valid day_of_week (1-7) is required for a WEEKLY connector schedule.") body['schedule'].update(daysOfWeek=day_of_week) elif schedule_type == ScheduleFreq.MONTHLY: if int(day_of_month) < 1 or int(day_of_month) > 31: raise ValueError("valid day_of_month (1-31) is required for a MONTHLY connector schedule.") body['schedule'].update(daysOfMonth=day_of_month) try: raw_response = self.request_handler.make_request(ApiRequestHandler.POST, url, body=body) except (RequestFailed,Exception) as e: print() print('There seems to be an exception') print(e) exit() jsonified_response = json.loads(raw_response.text) new_playbook_uuid = jsonified_response['uuid'] if csvdump==True: jobid={'playbookuuid':[jsonified_response['uuid']]} df=pd.DataFrame(jobid) df.to_csv('playbookuuid.csv') return new_playbook_uuid
[docs] def getslarules(self,playbookuuid:str,csvdump:bool=False,client_id:int=None)->list: """ Get a list of all sla rules for an sla. Args: playbookuuid: Uuid of the playbook to fetch sla csvdump: dumps the data in csv client_id: Client ID, if no client id provided , gets default client id Return: Sla rules Example: To get list of all sla rules >>> self.{risksenseobject}.sla.getslarules('11ed05cb-e7a0-4f25-b7ab-06933745a4d6') Note: You can also dump the data in csv using :obj:`csvdump=True` >>> self.{risksenseobject}.sla.getslarules('11ed05cb-e7a0-4f25-b7ab-06933745a4d6',csvdump=True) """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id)) + "/{}/rules".format(playbookuuid) if type(csvdump)!=bool: print('Error in csvdump value,Please provide either true or false') exit() try: raw_response = self.request_handler.make_request(ApiRequestHandler.GET, url) except RequestFailed as e: print() print('There seems to be an exception') print(e) exit() jsonified_response = json.loads(raw_response.text) if csvdump==True: field_names = [] newdump=[] for i in jsonified_response['content']: temp={} for j,k in i.items(): temp[j]=k if j=='filter': temp.pop('filter') if j=='actionConfig': temp.pop('actionConfig') if j=='detailInfo': temp['slatype']=k['type'] temp['groupNames']=','.join(k['groupNames']) temp['impacted_asset_count']=k['impactedMetrics']['asset_count'] temp['impacted_finding_count']=k['impactedMetrics']['finding_count'] temp.pop('detailInfo') newdump.append(temp) for item in newdump[0]: field_names.append(item) try: with open('slarules.csv', 'w') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=field_names) writer.writeheader() for item in newdump: writer.writerow(item) except FileNotFoundError as fnfe: print("An exception has occurred while attempting to write the .csv file.") print() print(fnfe) return jsonified_response
[docs] def getallslas(self,csvdump:bool=False,client_id:int=None)->dict: """ Gets all slas for a particular client. Args: csvdump: dumps the data in csv client_id: Client ID, if no client id provided , gets default client id Return: To get all slas Example: To get all slas >>> self.{risksenseobject}.getallslas() Note: You can also dump the data in csv using :obj:`csvdump=True` >>> self.{risksenseobject}.getallslas(csvdump=True) """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id)) + "/fetch" if type(csvdump)!=bool: print('Error in csvdump value,Please provide either true or false') exit() try: raw_response = self.request_handler.make_request(ApiRequestHandler.GET, url) except RequestFailed as e: print() print('There seems to be an exception') print(e) exit() jsonified_response = json.loads(raw_response.text) if csvdump==True: newdump=[] for i in jsonified_response['content']: temp={} for j,k in i.items(): temp[j]=k if j=='filter': temp.pop('filter') if j=='actionConfig': temp.pop('actionConfig') if j=='detailInfo': temp['slatype']=k['type'] temp['groupNames']=','.join(k['groupNames']) temp['impacted_asset_count']=k['impactedMetrics']['asset_count'] temp['impacted_finding_count']=k['impactedMetrics']['finding_count'] temp.pop('detailInfo') newdump.append(temp) field_names = [] for item in newdump[0]: field_names.append(item) try: with open('allslas.csv', 'w') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=field_names) writer.writeheader() for item in newdump: writer.writerow(item) except FileNotFoundError as fnfe: print("An exception has occurred while attempting to write the .csv file.") print() print(fnfe) return jsonified_response
[docs] def delete_sla(self,sla_uuid:str,csvdump:bool=False,client_id:int=None)->bool: """ Delete a particular sla Args: sla_uuid: UUID of sla csvdump: dumps the data in csv client_id: Client ID, if no client id provided , gets default client id Return: Success Example: >>> self.{risksenseobject}.sla.delete_sla('1234-123-9f18-b7ab-06933745a4d6') Note: You can also dump the data in csv using :obj:`csvdump=True` >>> self.{risksenseobject}.sla.delete_sla('1234-123-9f18-b7ab-06933745a4d6') """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id)) +"/{}".format(sla_uuid) if type(csvdump)!=bool: print('Error in csvdump value,Please provide either true or false') exit() if csvdump==True: self.get_specified_sla(playbookuuid=sla_uuid,csvdump=True) try: success=False raw_response = self.request_handler.make_request(ApiRequestHandler.DELETE, url) if raw_response.status_code==200: success=True except RequestFailed as e: print() print('There seems to be an exception') print(e) exit() return success
[docs] def get_sla_rule(self,playbookrulepairinguuid:str,csvdump:bool=False,client_id:int=None)->dict: """ Gets all sla rules for a particular client. Args: playbookrulepairinguuid: Playbook rule pairing uuid csvdump: dumps the data in csv client_id: Client ID, if no client id provided , gets default client id Return: SLA details Example: To get all sla rules >>> self.{risksenseobject}.sla.get_sla_rule('12334-123-3bb6-9564-dbbe539b1a74') Note: You can also dump the data in csv using :obj:`csvdump=True` >>> self.{risksenseobject}.sla.get_sla_rule('187914ba-b06e-3bb6-9564-dbbe539b1a74',csvdump=True) """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id)) + "/rule/{}".format(playbookrulepairinguuid) try: raw_response = self.request_handler.make_request(ApiRequestHandler.GET, url) except RequestFailed as e: print() print('There seems to be an exception') print(e) exit() jsonified_response = json.loads(raw_response.text) if csvdump==True: field_names = [] for item in jsonified_response: field_names.append(item) try: with open('sla_rule_data.csv', 'w') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=field_names) writer.writeheader() writer.writerow(jsonified_response) except FileNotFoundError as fnfe: print("An exception has occurred while attempting to write the .csv file.") print() print(fnfe) return jsonified_response
[docs] def get_specified_sla(self,playbookuuid:str,csvdump:bool=False,client_id:int=None)->dict: """ Gets a specified sla for a particular client. Args: playbookuuid: UUID of playbook csvdump: dumps the data in csv client_id: Client ID, if no client id provided , gets default client id Return: Specific sla data Example: To get specific sla >>> self.{risksenseobject}.sla.get_specified_sla('12345st-123-4f25-b7ab-06933745a4d6') Note: You can also dump the data in csv using :obj:`csvdump=True` >>> self.{risksenseobject}.sla.get_specified_sla('11ed05cb-e7a0-4f25-b7ab-06933745a4d6',csvdump=True) """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id)) + "/fetch/{}".format(playbookuuid) try: raw_response = self.request_handler.make_request(ApiRequestHandler.GET, url) except RequestFailed as e: print() print('There seems to be an exception') print(e) exit() jsonified_response = json.loads(raw_response.text) if csvdump==True: print('Sla details saved in the csv file get_specified_sla') field_names = [] for item in jsonified_response: field_names.append(item) try: with open('specified_sla.csv', 'w') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=field_names) writer.writeheader() writer.writerow(jsonified_response) except FileNotFoundError as fnfe: print("An exception has occurred while attempting to write the .csv file.") print() print(fnfe) return jsonified_response
[docs] def get_sla_details(self,playbookuuid:str,csvdump:bool=False,client_id:int=None)->dict: """ Get details of a particular sla playbook. Args: playbookuuid: SLA Playbook uuid csvdump: dumps the data in csv client_id: Client ID, if no client id provided , gets default client id Return: Returns the sla playbook details Example: To get sla details >>> self.{risksenseobject}.sla.get_sla_details('11ed05cb-e7a0-4f25-b7ab-06933745a4d6') Note: You can also dump the data in csv using :obj:`csvdump=True` >>> self.{risksenseobject}.sla.get_sla_details('11ed05cb-e7a0-4f25-b7ab-06933745a4d6',csvdump=True) """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id))+"/{}".format(playbookuuid) if type(csvdump)!=bool: print('Error in csvdump value,Please provide either true or false') exit() try: raw_response = self.request_handler.make_request(ApiRequestHandler.GET, url) except RequestFailed as e: print() print('There seems to be an exception') print(e) exit() jsonified_response = json.loads(raw_response.text) if csvdump==True: field_names = [] for item in jsonified_response: field_names.append(item) try: with open('sladetails.csv', 'w') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=field_names) writer.writeheader() writer.writerow(jsonified_response) except FileNotFoundError as fnfe: print("An exception has occurred while attempting to write the .csv file.") print() print(fnfe) return jsonified_response
[docs] def add_default_sla_rule(self, sla_uuid:str, description:str, priority:int, timeReference:str=TimeReference.DISCOVERED_DATE, serviceLevelAgreementMatrix:str=SlaMatrix.STANDARD,slaMatrixProfileType:str=SlaMatrixProfileType.STANDARD,offsetBasis:str=offsetbasis.VRR,affectOnlyNewFindings:bool=True,updateSLAIfVRRUpdates:bool=True,inputdata:str="HOST_FINDING",actionType:str=SlaActionType.REMEDIATION_SLA,csvdump:bool=False,client_id:int=None)->list: """ Adds default rule to existing sla playbook.Works only if there is no default rule applied to the playbook Args: sla_uuid: Sla UUID description: Provide description for the default sla rule, priority: Priority to be provided to default sla rule , note: default sla priority should be a greater number than the group sla rules timeReference: Timereference to be provided to default sla rule , serviceLevelAgreementMatrix : Provide slamatrix for the particular sla slaMatrixProfileType : Provide sla matrix type for the sla offsetBasis: Provide offset basis for particular sla affectOnlyNewFindings : Choose whether it affects new findings updateSLAIfVRRUpdates: Choose if slaupdates with vrr inputdata: Type of data provided for sla actionType: Type of action for particular sla csvdump: dumps the data in csv client_id: Client ID , if no client id provided , takes default client id action_type: Action type for the particular sla rule,by default remediation rule is provided Return: List containing dict of rule details. Example: To add default sla rule >>> self.{risksenseobject}.add_default_sla_rule('11ed13e6-a1aa-5c28-9fb0-02a87de7e1ee','testing',3) Note: You can also change the default time Reference or service Level Agreement Matrix or other default arguments You can also dump the data in csv using :obj:`csvdump=True` >>> self.{risksenseobject}.add_default_sla_rule('11ed13e6-a1aa-5c28-9fb0-02a87de7e1ee','testing',3,csvdump=True) """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id)) + "/{}/rule".format(sla_uuid) body={"name":'Default SLA',"description":description,"priority":priority,"supplementalFilterRequest":None,"actionType":actionType,"action":{ "isDefaultSLA":True,"targetGroupIds":[],"timeReference":timeReference,"serviceLevelAgreementMatrix":serviceLevelAgreementMatrix,"slaMatrixProfileType":slaMatrixProfileType,"offsetBasis":offsetBasis,"affectOnlyNewFindings":affectOnlyNewFindings,"updateSLAIfVRRUpdates":updateSLAIfVRRUpdates},"outputType":"NO_OUTPUT","output":{},"input":inputdata} if type(csvdump)!=bool: print('Error in csvdump value,Please provide either true or false') exit() try: raw_response = self.request_handler.make_request(ApiRequestHandler.POST, url, body=body) except RequestFailed as e: print() print('There seems to be an exception') print(e) exit() jsonified_response = json.loads(raw_response.text) if csvdump==True: data={'uuid':jsonified_response[0]['uuid'],'name':jsonified_response[0]['name'],'description':jsonified_response[0]['description']} field_names = ['uuid','name','description'] try: with open('defaultslarulecreated.csv', 'w') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=field_names) writer.writeheader() writer.writerow(data) except FileNotFoundError as fnfe: print("An exception has occurred while attempting to write the .csv file.") print() print(fnfe) return jsonified_response
[docs] def update_default_sla_rule(self, playbookrulepairinguuid:str, description:str, priority:int, timeReference:str=TimeReference.DISCOVERED_DATE, serviceLevelAgreementMatrix:dict=SlaMatrix.STANDARD,slaMatrixProfileType:str=SlaMatrixProfileType.STANDARD,offsetBasis:str=offsetbasis.VRR,affectOnlyNewFindings:bool=True,updateSLAIfVRRUpdates:bool=True,inputdata:str="HOST_FINDING",actionType:str=SlaActionType.REMEDIATION_SLA,csvdump:bool=False,client_id:int=None)->list: """ Updates default rule to existing sla playbook. Args: playbookrulepairinguuid: Playbook rule pairing uuid description: Provide description for the default sla rule, priority: Priority to be provided to default sla rule , note: default sla priority should be a greater number than the group sla rules timeReference : Timereference to be provided to default sla rule , serviceLevelAgreementMatrix: Provide slamatrix for the particular sla slaMatrixProfileType : Provide sla matrix type for the sla offsetBasis : Provide offset basis for particular sla affectOnlyNewFindings : Choose whether it affects new findings updateSLAIfVRRUpdates: Choose if slaupdates with vrr inputdata: Type of data provided for sla actionType: Type of action for particular sla csvdump: dumps the data in csv client_id: Client ID , if no client id provided , takes default client id Return: List containing dict of rule details. Example: To update default sla rule >>> self.{risksenseobject}.update_default_sla_rule('11ed13e6-a1aa-5c28-9fb0-02a87de7e1ee','testing',3) Note: You can also change the default time Reference or service Level Agreement Matrix or other default arguments You can also dump the data in csv using :obj:`csvdump=True` >>> self.{risksenseobject}.update_default_sla_rule('11ed13e6-a1aa-5c28-9fb0-02a87de7e1ee','testing',3,csvdump=True) """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id)) +"/rule/{}".format(playbookrulepairinguuid) body={"name":'Default SLA',"description":description,"priority":priority,"supplementalFilterRequest":None,"actionType":actionType,"action":{ "isDefaultSLA":True,"targetGroupIds":[],"timeReference":timeReference,"serviceLevelAgreementMatrix":serviceLevelAgreementMatrix,"slaMatrixProfileType":slaMatrixProfileType,"offsetBasis":offsetBasis,"affectOnlyNewFindings":affectOnlyNewFindings,"updateSLAIfVRRUpdates":updateSLAIfVRRUpdates},"outputType":"NO_OUTPUT","output":{},"input":inputdata} if type(csvdump)!=bool: print('Error in csvdump value,Please provide either true or false') exit() try: raw_response = self.request_handler.make_request(ApiRequestHandler.PUT, url, body=body) except RequestFailed as e: print() print('There seems to be an exception') print(e) exit() jsonified_response = json.loads(raw_response.text) if csvdump==True: data={'uuid':jsonified_response[0]['uuid'],'name':jsonified_response[0]['name'],'description':jsonified_response[0]['description']} field_names = ['uuid','name','description'] try: with open('defaultslaruleupdated.csv', 'w') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=field_names) writer.writeheader() writer.writerow(data) except FileNotFoundError as fnfe: print("An exception has occurred while attempting to write the .csv file.") print() print(fnfe) return jsonified_response
[docs] def add_group_sla_rule(self, sla_uuid:str, name:str, description:str, priority:int, targetgroupids:list, timeReference:str=TimeReference.DISCOVERED_DATE, serviceLevelAgreementMatrix:dict=SlaMatrix.STANDARD,slaMatrixProfileType:str=SlaMatrixProfileType.STANDARD,offsetBasis:str=offsetbasis.VRR,affectOnlyNewFindings:bool=True,updateSLAIfVRRUpdates:bool=True,inputdata:str="HOST_FINDING",actionType:str=SlaActionType.REMEDIATION_SLA,csvdump:bool=False,client_id:int=None)->list: """ Adds group rule to existing sla playbook for the groups specified. Args: sla_uuid: Sla UUID name: Name of the group specific sla rule description: Provide description for the group specific sla rule, priority: Priority to be provided to group speicifc sla , note: group sla priority should be a lesser number than the default sla rule targetgroupids: The groups ids where the sla applies timeReference : Timereference to be provided to default sla rule , serviceLevelAgreementMatrix: Provide slamatrix for the particular sla slaMatrixProfileType: Provide sla matrix type for the sla offsetBasis: Provide offset basis for particular sla affectOnlyNewFindings : Choose whether it affects new findings updateSLAIfVRRUpdates: Choose if slaupdates with vrr inputdata: Type of data provided for sla actionType: Type of action for particular sla csvdump: Dumps data to csv client_id: Client ID , if no client id provided , takes default client id Return: List containing dict of rule details. Example: >>> self.{risksenseobject}.sla.add_group_sla_rule(sla_uuid='12345st-123-5c28-9fb0-02a87de7e1ee',name='testingnew',description='testingnew',targetgroupids=[],priority=2) Note: You can also change the default time Reference or service Level Agreement Matrix or other default arguments You can also dump the data in csv using :obj:`csvdump=True` >>> self.{risksenseobject}.sla.add_group_sla_rule(sla_uuid='12345st-123-5c28-9fb0-02a87de7e1ee',name='testingnew',description='testingnew',targetgroupids=[],priority=2,csvdump=True) """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id)) +"/{}/rule".format(sla_uuid) body={"name":name,"description":description,"priority":priority,"supplementalFilterRequest":None,"actionType":actionType,"action":{"isDefaultSLA":False,"targetGroupIds":targetgroupids,"timeReference":timeReference,"serviceLevelAgreementMatrix":serviceLevelAgreementMatrix,"slaMatrixProfileType":slaMatrixProfileType,"offsetBasis":offsetBasis,"affectOnlyNewFindings":affectOnlyNewFindings,"updateSLAIfVRRUpdates":updateSLAIfVRRUpdates},"outputType":"NO_OUTPUT","output":{},"input":inputdata} if type(csvdump)!=bool: print('Error in csvdump value,Please provide either true or false') exit() try: raw_response = self.request_handler.make_request(ApiRequestHandler.POST, url, body=body) except RequestFailed as e: print() print('There seems to be an exception') print(e) exit() jsonified_response = json.loads(raw_response.text) if csvdump==True: data={'uuid':jsonified_response[0]['uuid'],'name':jsonified_response[0]['name'],'description':jsonified_response[0]['description']} field_names = ['uuid','name','description'] try: with open('groupslarulecreated.csv', 'w') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=field_names) writer.writeheader() writer.writerow(data) except FileNotFoundError as fnfe: print("An exception has occurred while attempting to write the .csv file.") print() print(fnfe) return jsonified_response
[docs] def update_group_sla_rule(self, playbookrulepairinguuid:str, name:str, description:str, priority:int, targetgroupids:list, timeReference:str=TimeReference.DISCOVERED_DATE, serviceLevelAgreementMatrix:dict=SlaMatrix.STANDARD,slaMatrixProfileType:str=SlaMatrixProfileType.STANDARD,offsetBasis:str=offsetbasis.VRR,affectOnlyNewFindings:bool=True,updateSLAIfVRRUpdates:bool=True,inputdata:str="HOST_FINDING",actionType:str=SlaActionType.REMEDIATION_SLA,csvdump:bool=False,client_id:int=None)->list: """ Updates a group sla rule for an already existing playbook Args: playbookrulepairinguuid: Playbook rule pairing uuid name: Name of the group specific sla rule description: Provide description for the group specific sla rule, priority: Priority to be provided to group speicifc sla , note: group sla priority should be a lesser number than the default sla rule targetgroupids: The groups ids where the sla applies timeReference: Timereference to be provided to default sla rule , serviceLevelAgreementMatrix: Provide slamatrix for the particular sla slaMatrixProfileType: Provide sla matrix type for the sla offsetBasis: Provide offset basis for particular sla affectOnlyNewFindings : Choose whether it affects new findings updateSLAIfVRRUpdates: Choose if slaupdates with vrr inputdata: Type of data provided for sla actionType: Type of action for particular sla csvdump: dumps the data in csv client_id: Client ID , if no client id provided , takes default client id action_type: Action type for the particular sla rule,by default remediation rule is provided Return: List containing dict of rule details. Example: To update group sla rule >>> self.{risksenseobject}.sla.update_group_sla_rule('12345st-123-5c28-9fb0-02a87de7e1ee','testingnew','testingnew',[],2) Note: You can also change the default time Reference or service Level Agreement Matrix or other default arguments You can also dump the data in csv using :obj:`csvdump=True` >>> self.{risksenseobject}.sla.update_group_sla_rule('12345st-123-5c28-9fb0-02a87de7e1ee','testingnew','testingnew',[],2,csvdump=True) """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id)) +"/rule/{}".format(playbookrulepairinguuid) body={"name":name,"description":description,"priority":priority,"supplementalFilterRequest":None,"actionType":actionType,"action":{"isDefaultSLA":False,"targetGroupIds":targetgroupids,"timeReference":timeReference,"serviceLevelAgreementMatrix":serviceLevelAgreementMatrix,"slaMatrixProfileType":slaMatrixProfileType,"offsetBasis":offsetBasis,"affectOnlyNewFindings":affectOnlyNewFindings,"updateSLAIfVRRUpdates":updateSLAIfVRRUpdates},"outputType":"NO_OUTPUT","output":{},"input":inputdata} if type(csvdump)!=bool: print('Error in csvdump value,Please provide either true or false') exit() try: raw_response = self.request_handler.make_request(ApiRequestHandler.PUT, url, body=body) except RequestFailed as e: print() print('There seems to be an exception') print(e) exit() jsonified_response = json.loads(raw_response.text) if csvdump==True: data={'uuid':jsonified_response[0]['uuid'],'name':jsonified_response[0]['name'],'description':jsonified_response[0]['description']} field_names = ['uuid','name','description'] try: with open('groupslaruleupdated.csv', 'w') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=field_names) writer.writeheader() writer.writerow(data) except FileNotFoundError as fnfe: print("An exception has occurred while attempting to write the .csv file.") print() print(fnfe) return jsonified_response
[docs] def del_group_sla_rule(self,playbookuuid:str,client_id:int=None)->bool: """ Deletes a particular group sla rule. Args: client_id: Client ID, if no client id provided , gets default client id playbookuuid: The playbookuuid that needs to be deleted Return: Returns json of deleted rule Example: Delete group sla rule >>> self.{risksenseobject}.sla.del_group_sla_rule('123-123') """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id)) +"/rule/{}".format(playbookuuid) try: raw_response = self.request_handler.make_request(ApiRequestHandler.DELETE, url) except RequestFailed as e: print() print('There seems to be an exception') print(e) exit() jsonified_response = json.loads(raw_response.text) Success=True return Success
[docs] def change_order(self,slauuid:str,ruleuuids:list,csvdump:bool=False,client_id:int=None)->list: """ Changes the order of the sla rules Args: client_id: Client ID, if no client id provided , gets default client id slauuid: The sla where rules needs to be reordered ruleuuids: The order of rules reordering csvdump: dumps the data in csv Return: Returns the reordered rule json Example: >>> self.rs.sla.change_order('123-4567-f4cb-b7ab-06933745a4d6',["97abc-123-3bc0-b606-98aed43c944a"]) Note: You can also dump the data in csv using :obj:`csvdump=True` >>> self.rs.sla.change_order('123-4567-f4cb-b7ab-06933745a4d6',["97abc-123-3bc0-b606-98aed43c944a"],csvdump=True) """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id)) +"/{}/rule-reorder".format(slauuid) body={"ruleUuids":ruleuuids} try: success=False raw_response = self.request_handler.make_request(ApiRequestHandler.PUT, url, body=body) if raw_response.status_code==200: success=True except RequestFailed as e: print() print('There seems to be an exception') print(e) exit() jsonified_response = json.loads(raw_response.text) if csvdump==True: field_names = ['uuid','name'] data=[] for i in jsonified_response: data.append({'uuid':i['uuid'],'name':i['name']}) try: with open('rule_reorder.csv', 'w',newline='') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=field_names) writer.writeheader() for item in data: writer.writerow(item) except (RequestFailed,Exception) as e: print() print('There seems to be an exception') print(e) exit() return success
[docs] def sla_run(self,slauuid:str,csvdump:bool=False,client_id:int=None)->dict: """ Runs the sla Args: client_id: Client ID, if no client id provided , gets default client id slauuid: The sla rule that needs to be run csvdump: Dumps the data in csv Return: Returns the json of sla Example: >>> self.{risksenseobject}.sla.sla_run('12345-123ea-4f25-b7ab-06933745a4d6') """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id)) +"/{}/run".format(slauuid) try: raw_response = self.request_handler.make_request(ApiRequestHandler.GET,url) except RequestFailed as e: print() print('There seems to be an exception') print(e) exit() jsonified_response = json.loads(raw_response.text) if csvdump==True: field_names = [] for item in jsonified_response: field_names.append(item) try: with open('slarun.csv', 'w') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=field_names) writer.writeheader() writer.writerow(jsonified_response) except FileNotFoundError as fnfe: print("An exception has occurred while attempting to write the .csv file.") print() print(fnfe) return jsonified_response
[docs] def update_sla(self, sla_uuid:str,description:str,schedule_type:str='DAILY',hourofday:int=0,dayofmonth:str='4',dayofweek:str='5',name:str="Remediation SLAs",type:str='System',csvdump:bool=False,client_id=None,**kwargs): """ Updates an sla Args: sla_uuid: Sla UUID name: Name of the description: Provide description for sla schedule_type: Schedule Frequency (ScheduleFreq.DAILY, ScheduleFreq.WEEKLY, ScheduleFreq.MONTHLY, 'DISABLED') type: The type of sla , can be 'SYSTEM' or 'USER' hourofday: Hour of the day dayofmonth : Day of the month dayofweek: Day of the week csvdump: dumps the data in csv client_id: Client ID , if no client id provided , takes default client id Return: List containing dict of playbook details. Example: >>> self.{risksenseobject}.sla.update_sla('1234-1234abc-bfc0-b7ab-06933745a4d6','remediation sla','Testingviascript') Note: You can also dump the data in csv using :obj:`csvdump=True` >>> self.{risksenseobject}.sla.update_sla('1234-1234abc-bfc0-b7ab-06933745a4d6','remediation sla','Testingviascript',csvdump=True) """ if schedule_type.upper()=='DAILY': schedule={ 'type':'DAILY', "hourOfDay":hourofday } if schedule_type.upper()=='WEEKLY': schedule={ 'type':'WEEKLY', "hourOfDay":hourofday, 'daysOfWeek':dayofweek } if schedule_type.upper()=='MONTHLY': schedule={ 'type':'MONTHLY', "hourOfDay":hourofday, 'daysOfMonth':dayofmonth } if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id)) +"/{}".format(sla_uuid) body={ "name": name, "description": description, "schedule": schedule, "type": type } try: raw_response = self.request_handler.make_request(ApiRequestHandler.PUT, url, body=body) except RequestFailed as e: print() print('There seems to be an exception') print(e) exit() jsonified_response = json.loads(raw_response.text) if csvdump==True: field_names = [] for item in jsonified_response: field_names.append(item) try: with open('slaupdate.csv', 'w') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=field_names) writer.writeheader() writer.writerow(jsonified_response) except FileNotFoundError as fnfe: print("An exception has occurred while attempting to write the .csv file.") print() print(fnfe) return jsonified_response
[docs] def disablesla(self,playbookuuid:str,client_id:int=None)->bool: """ Disables a particular sla rule. Args: playbookuuid: The playbookuuids that needs to be disabled client_id: Client ID, if no client id provided , gets default client id Return: Returns json of disable sla Example: >>> self.{risksenseobject}.sla.disablesla(['11ed05cb-e7a0-4f25-b7ab-06933745a4d6']) """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id)) + "/toggle-enabled" body={"playbookUuids":playbookuuid,"enabled":False} try: raw_response = self.request_handler.make_request(ApiRequestHandler.PUT, url, body=body) except RequestFailed as e: print() print('There seems to be an exception') print(e) exit() Success=True jsonified_response = json.loads(raw_response.text) return Success
[docs] def enablesla(self,playbookuuid:list,client_id:int=None)->bool: """ Enables a particular sla rule. Args: playbookuuid: The playbookuuids that needs to be enabled client_id: Client ID, if no client id provided , gets default client id Return: success Example: >>> self.{risksenseobject}.sla.enablesla(['11ed05cb-e7a0-4f25-b7ab-06933745a4d6']) """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id)) + "/toggle-enabled" body={"playbookUuids":playbookuuid,"enabled":True} print(url) print(body) try: success=False raw_response = self.request_handler.make_request(ApiRequestHandler.PUT, url, body=body) if raw_response.status_code==200: success=True except RequestFailed as e: print() print('There seems to be an exception') print(e) exit() print(raw_response) jsonified_response = json.loads(raw_response.text) return success
[docs] def getplaybookslarules(self,slaid:int,client_id:int=None)->dict: """ Gets sla playbook rules for a particular sla. Args: slaid: Slaid to get the sla rules client_id: Client ID, if no client id provided , gets default client id Return: Returns json of playbook rules Example: To get sla playbook rule >>> self.{risksenseobject}.sla.getplaybookslarules('11ed13e6-a1aa-5c28-9fb0-02a87de7e1ee') """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id)) + f"/{slaid}" try: raw_response = self.request_handler.make_request(ApiRequestHandler.GET, url) except RequestFailed as e: print() print('There seems to be an exception') print(e) exit() jsonified_response = json.loads(raw_response.text) return jsonified_response
[docs] def delete_sla_rule(self,playbookrulepairinguuid:str,client_id:int=None)->bool: """ Deletes a particular sla rule Args: playbookrulepairinguuid: the playbook rule uuid client_id: Client ID, if no client id provided , gets default client id Return: Success Example: To delete an sla rule >>> self.{risksenseobject}.sla.delete_sla_rule('94945073-3d62-35b6-b176-6412bd324b84') """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id)) +"/rule/{}".format(playbookrulepairinguuid) try: success=False raw_response = self.request_handler.make_request(ApiRequestHandler.DELETE, url) if raw_response.status_code==200: success=True except RequestFailed as e: print() print('There seems to be an exception') print(e) exit() return success
""" Copyright 2022 RiskSense, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """