Source code for risksense_api.__subject.__vulnerabilities.__vulnerabilities

"""
Vulnerability module defined for different vulnerability related api endpoints.
"""

""" *******************************************************************************************************************
|
|  Name        :  __vulnerabilities.py
|  Project     :  risksense_api
|  Copyright   :  2022 RiskSense, Inc.
|  License     :  Apache-2.0 (https://www.apache.org/licenses/LICENSE-2.0.txt)
|
******************************************************************************************************************* """

import json
from ...__subject import Subject
from ..__exports import ExportFileType
from ..__exports import ExportRowNumbers
from ..._params import *
from ..._api_request_handler import *
from ..__exports import Exports
import sys
import zipfile


[docs]class Vulnerabilities(Subject): """Class for vulnerability function definitions. Args: profile: Profile Object To utlise vulnerability function: Usage: :obj:`self.{risksenseobjectname}.vulnerabilities.{function}` Examples: To get model for vulnerability using :meth:`get_model` function >>> self.{risksenseobjectname}.vulnerabilities.get_model() """
[docs] def __init__(self, profile:object): """ Initialization of Vulnerabilities object. Args: profile: Profile Object """ self.subject_name = "vulnerability" Subject.__init__(self, profile, self.subject_name)
[docs] def downloadfilterinexport(self,filename,filters,client_id=None): if client_id is None: client_id= self._use_default_client_id() exportid=self.export(filters,file_name=filename) self.exports=Exports(self.profile) while(True): try: exportstatus=self.exports.check_status(exportid) print(exportstatus) if exportstatus=='COMPLETE': break elif exportstatus=='ERROR': print('error getting zip file please check ') exit() except (RequestFailed,MaxRetryError,StatusCodeError, ValueError) as ex: print(ex) print() print("Unable to export the file.") sys.exit("Exiting") try: self.exports.download_export(exportid,f"{filename}.csv") except(RequestFailed,MaxRetryError,StatusCodeError, ValueError) as ex: print(ex) print() print("Unable to retrieve file. Exiting.") sys.exit(1)
[docs] def search(self, search_filters:list, projection=Projection.BASIC, page_size:int=150, sort_field:str=SortField.ID, sort_dir:str=SortDirection.ASC,csvdump:bool=False, client_id:int=None)->list: """ Searches for and returns vulnerabilities based on the provided filter(s) and other parameters. Rather than returning paginated results, this function cycles through all pages of results and returns them all in a single list. Args: search_filters: A list of dictionaries containing filter parameters. projection: Projection to be used in API request. Projection.BASIC or Projection.DETAIL page_size: The number of results per page to be returned. sort_field: The field to be used for sorting results returned. sort_dir: The direction of sorting to be used. SortDirection.ASC or SortDirection.DESC csvdump: dumps the data in csv client_id: Client ID. If an ID isn't passed, will use the profile's default Client ID. Return: A list containing all vulnerability returned by the search using the filter provided. Examples: >>> apiobj = self.{risksenseobject}.vulnerabilities.search([{"field":"vrrGroup","exclusive":False,"operator":"IN","orWithPrevious":False,"implicitFilters":[],"value":"Critical"}]) Note: You can also dump the data of the vulnerability search in a csv file. Just make csvdump as True: >>> self.{risksenseobject}.vulnerabilities.search([{"field":"vrrGroup","exclusive":False,"operator":"IN","orWithPrevious":False,"implicitFilters":[],"value":"Critical"}],csvdump=True) """ func_args = locals() func_args.pop('self') func_args.pop('csvdump') if client_id is None: client_id, func_args['client_id'] = self._use_default_client_id() if type(csvdump)!=bool: print('Error in csvdump value,Please provide either true or false') exit() try: page_info = self._get_page_info(self.subject_name, search_filters, page_size=page_size, client_id=client_id) num_pages = page_info[1] except RequestFailed: raise page_range = range(0, num_pages) try: all_results = self._search(self.subject_name, self.get_single_search_page, page_range, **func_args) except (RequestFailed, Exception): raise if csvdump==True: self.downloadfilterinexport('vulnerabilitiessearch',search_filters) return all_results
[docs] def list_vulnerability_filter_fields(self,client_id:int=None)->dict: """ List filter endpoints. Args: client_id: Client ID. If an ID isn't passed, will use the profile's default Client ID. Return: The JSON output from the platform is returned, listing the available filters. Examples: >>> apiobj = self.{risksenseobject}.vulnerabilities.list_vulnerability_filter_fields(client_id=123) """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id))+'/filter' try: raw_response = self.request_handler.make_request(ApiRequestHandler.GET, url) except (Exception) as e: print() print('There seems to be an exception') print(e) exit() jsonified_response = json.loads(raw_response.text) return jsonified_response
[docs] def get_single_search_page(self, search_filters:list, projection:str=Projection.BASIC, page_num:int=0, page_size:int=150, sort_field:str=SortField.ID, sort_dir:str=SortDirection.ASC, client_id:int=None)->dict: """ Searches for and returns vulnerabilities based on the provided filter(s) and other parameters. Args: search_filters: A list of dictionaries containing filter parameters. projection: Projection to be used in API request. Projection.BASIC or Projection.DETAIL page_num: The page number of results to be returned. page_size: The number of results per page to be returned. sort_field: The field to be used for sorting results returned. sort_dir: The direction of sorting to be used. SortDirection.ASC or SortDirection.DESC client_id: Client ID. If an ID isn't passed, will use the profile's default Client ID. Return: The JSON response from the platform is returned. Examples: >>> apiobj = self.{risksenseobject}.vulnerabilities.get_single_search_page([{"field":"vrrGroup","exclusive":False,"operator":"IN","orWithPrevious":False,"implicitFilters":[],"value":"Critical"}]) """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id)) + "/search" body = { "filters": search_filters, "projection": projection, "sort": [ { "field": sort_field, "direction": sort_dir } ], "page": page_num, "size": page_size } try: raw_response = self.request_handler.make_request(ApiRequestHandler.POST, url, body=body) except RequestFailed: raise jsonified_response = json.loads(raw_response.text) return jsonified_response
[docs] def get_model(self, client_id:int=None)->dict: """ Get available projections and models for Vulnerabilities. Args: client_id: Client ID Return: Vulnerability projections and models are returned. Examples: >>> apiobj = self.{risksenseobject}.vulnerabilities.get_model() """ if client_id is None: client_id = self._use_default_client_id()[0] try: response = self._model(self.subject_name, client_id) except RequestFailed: raise return response
[docs] def suggest(self, search_filter:list, suggest_filter:dict, client_id:int=None)->dict: """ Suggest values for filter fields. Args: search_filter: Search Filter suggest_filter: Suggest Filter client_id: Client ID Return: Value suggestions Examples: >>> apiobj = self.{risksenseobject}.vulnerabilities.suggest([],{"field":"vrrGroup","exclusive":False,"operator":"WILDCARD","orWithPrevious":False,"implicitFilters":[],"value":"Critic*"}) """ if client_id is None: client_id = self._use_default_client_id()[0] try: response = self._suggest(self.subject_name, search_filter, suggest_filter, client_id) except RequestFailed: raise return response
[docs] def getexporttemplate(self,client_id:int=None)->list: """ Gets configurable export template for Vulnerabilities. Args: client_id: Client ID. If an ID isn't passed, will use the profile's default Client ID. Return: The Exportable fields Examples: >>> apiobj = self.{risksenseobject}.vulnerabilities.getexporttemplate() """ if client_id is None: client_id = self._use_default_client_id()[0] url = self.api_base_url.format(str(client_id)) + "/export/template" try: raw_response = self.request_handler.make_request(ApiRequestHandler.GET, url) except RequestFailed: raise exportablefilter = json.loads(raw_response.text) for i in range(len(exportablefilter['exportableFields'])): for j in range(len(exportablefilter['exportableFields'][i]['fields'])): if exportablefilter['exportableFields'][i]['fields'][j]['selected']==False: exportablefilter['exportableFields'][i]['fields'][j]['selected']=True return exportablefilter['exportableFields']
[docs] def export(self, search_filters:list, file_name:str, row_count:str=ExportRowNumbers.ROW_ALL,file_type:str=ExportFileType.CSV, client_id:int=None)->int: """ Initiates an export job on the platform for Vulnerabilities based on the provided filter(s). Args: search_filters: A list of dictionaries containing filter parameters. file_name: The name to be used for the exported file. row_count: No of rows to be exported. Possible options : ExportRowNumbers.ROW_5000,ExportRowNumbers.ROW_10000, ExportRowNumbers.ROW_25000, ExportRowNumbers.ROW_50000, ExportRowNumbers.ROW_100000, ExportRowNumbers.ROW_ALL file_type: File type to export. ExportFileType.CSV, ExportFileType.JSON, or ExportFileType.XLSX client_id: Client ID. If an ID isn't passed, will use the profile's default Client ID. Return: The job ID in the platform from is returned. Examples: >>> apiobj = self.{risksenseobject}.vulnerabilities.export([{"field":"vrrGroup","exclusive":False,"operator":"IN","orWithPrevious":False,"implicitFilters":[],"value":"Critical"}],'test') """ func_args = locals() func_args['exportable_filter']=self.getexporttemplate() func_args.pop('self') if client_id is None: func_args['client_id'] = self._use_default_client_id()[1] try: export_id = self._export(self.subject_name, **func_args) except RequestFailed: raise return export_id
""" Copyright 2022 RiskSense, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """