From b4e739bf8bc7815ab7b2c84a7bc4329aed0af63b Mon Sep 17 00:00:00 2001 From: shallman1 <105513486+shallman1@users.noreply.github.com> Date: Tue, 26 Dec 2023 23:31:35 -0500 Subject: [PATCH 1/2] Update api.py added batching and pagination functionality to enrich and investigate functions. --- domaintools/api.py | 204 ++++++++++++++++++++++++++------------------- 1 file changed, 118 insertions(+), 86 deletions(-) diff --git a/domaintools/api.py b/domaintools/api.py index b182b4e..299aec9 100644 --- a/domaintools/api.py +++ b/domaintools/api.py @@ -275,32 +275,7 @@ def risk_evidence(self, domain, **kwargs): return self._results('risk-evidence', '/v1/risk/evidence/', items_path=('components',), domain=domain, **kwargs) - def iris_enrich(self, *domains, **kwargs): - """Returns back enriched data related to the specified domains using our Iris Enrich service - each domain should be passed in as an un-named argument to the method: - iris_enrich('domaintools.com', 'google.com') - - api.iris_enrich(*DOMAIN_LIST)['results_count'] Returns the number of results - api.iris_enrich(*DOMAIN_LIST)['missing_domains'] Returns any domains that we were unable to - retrieve enrichment data for - api.iris_enrich(*DOMAIN_LIST)['limit_exceeded'] Returns True if you've exceeded your API usage - - for enrichment in api.iris_enrich(*DOMAIN_LIST): # Enables looping over all returned enriched domains - - for example: - enrich_domains = ['google.com', 'amazon.com'] - assert api.iris_enrich(*enrich_domains)['missing_domains'] == [] - """ - if not domains: - raise ValueError('One or more domains to enrich must be provided') - - domains = ','.join(domains) - data_updated_after = kwargs.get('data_updated_after', None) - if hasattr(data_updated_after, 'strftime'): - data_updated_after = data_updated_after.strftime('%Y-%m-%d') - return self._results('iris-enrich', '/v1/iris-enrich/', domain=domains, data_updated_after=data_updated_after, - items_path=('results',), **kwargs) def iris_enrich_cli(self, domains=None, **kwargs): """Returns back enriched data related to the specified domains using our Iris Enrich service. @@ -331,68 +306,7 @@ def iris_enrich_cli(self, domains=None, **kwargs): return self._results('iris-enrich', '/v1/iris-enrich/', domain=domains, data_updated_after=data_updated_after, items_path=('results',), **kwargs) - def iris_investigate(self, domains=None, data_updated_after=None, expiration_date=None, - create_date=None, active=None, search_hash=None, **kwargs): - """Returns back a list of domains based on the provided filters. - The following filters are available beyond what is parameterized as kwargs: - - - ip: Search for domains having this IP. - - email: Search for domains with this email in their data. - - email_domain: Search for domains where the email address uses this domain. - - nameserver_host: Search for domains with this nameserver. - - nameserver_domain: Search for domains with a nameserver that has this domain. - - nameserver_ip: Search for domains with a nameserver on this IP. - - registrar: Search for domains with this registrar. - - registrant: Search for domains with this registrant name. - - registrant_org: Search for domains with this registrant organization. - - mailserver_host: Search for domains with this mailserver. - - mailserver_domain: Search for domains with a mailserver that has this domain. - - mailserver_ip: Search for domains with a mailserver on this IP. - - redirect_domain: Search for domains which redirect to this domain. - - ssl_hash: Search for domains which have an SSL certificate with this hash. - - ssl_subject: Search for domains which have an SSL certificate with this subject string. - - ssl_email: Search for domains which have an SSL certificate with this email in it. - - ssl_org: Search for domains which have an SSL certificate with this organization in it. - - google_analytics: Search for domains which have this Google Analytics code. - - adsense: Search for domains which have this AdSense code. - - tld: Filter by TLD. Must be combined with another parameter. - - search_hash: Use search hash from Iris to bring back domains. - - You can loop over results of your investigation as if it was a native Python list: - - for result in api.iris_investigate(ip='199.30.228.112'): # Enables looping over all related results - - api.iris_investigate(QUERY)['results_count'] Returns the number of results returned with this request - api.iris_investigate(QUERY)['total_count'] Returns the number of results available within Iris - api.iris_investigate(QUERY)['missing_domains'] Returns any domains that we were unable to find - api.iris_investigate(QUERY)['limit_exceeded'] Returns True if you've exceeded your API usage - api.iris_investigate(QUERY)['position'] Returns the position key that can be used to retrieve the next page: - next_page = api.iris_investigate(QUERY, position=api.iris_investigate(QUERY)['position']) - - for enrichment in api.iris_enrich(i): # Enables looping over all returned enriched domains - - """ - # We put search_hash in the signature definition so the CLI can see it as a valid arg - if search_hash: - kwargs['search_hash'] = search_hash - if not (kwargs or domains): - raise ValueError('Need to define investigation using kwarg filters or domains') - - if isinstance(domains, (list, tuple)): - domains = ','.join(domains) - if hasattr(data_updated_after, 'strftime'): - data_updated_after = data_updated_after.strftime('%Y-%m-%d') - if hasattr(expiration_date, 'strftime'): - expiration_date = expiration_date.strftime('%Y-%m-%d') - if hasattr(create_date, 'strftime'): - create_date = create_date.strftime('%Y-%m-%d') - if isinstance(active, bool): - kwargs['active'] = str(active).lower() - - return self._results('iris-investigate', '/v1/iris-investigate/', domain=domains, - data_updated_after=data_updated_after, expiration_date=expiration_date, - create_date=create_date, items_path=('results',), **kwargs) def iris_detect_monitors(self, include_counts=False, datetime_counts_since=None, sort=None, order="desc", offset=0, limit=None, **kwargs): @@ -428,6 +342,124 @@ def iris_detect_monitors(self, include_counts=False, datetime_counts_since=None, return self._results('iris-detect-monitors', '/v1/iris-detect/monitors/', order=order, offset=offset, limit=limit, items_path=('monitors',), response_path=(), **kwargs) + def iris_investigate(self, domains=None, data_updated_after=None, expiration_date=None, + create_date=None, active=None, search_hash=None, **kwargs): + def convert_date(date_param): + return date_param.strftime('%Y-%m-%d') if hasattr(date_param, 'strftime') else date_param + + + aggregated_results = { + 'limit_exceeded': False, + 'has_more_results': True, + 'message': 'Enjoy your data.', + 'results_count': 0, + 'total_count': 0, + 'results': [], + 'missing_domains': [], + 'position': None + } + + # Convert parameters to string format as necessary + data_updated_after = convert_date(data_updated_after) + expiration_date = convert_date(expiration_date) + create_date = convert_date(create_date) + active_str = str(active).lower() if isinstance(active, bool) else active + if search_hash: + kwargs['search_hash'] = search_hash + + # Join given domains if list or tuple + if isinstance(domains, (list, tuple)): + domains = ','.join(domains) + + # Check if batch processing is needed + batch_size = 100 + if isinstance(domains, str) and len(domains.split(',')) > batch_size: + domains_list = domains.split(',') + for i in range(0, len(domains_list), batch_size): + batch_domains = domains_list[i:i + batch_size] + batch_result = self.iris_investigate( + batch_domains, data_updated_after, expiration_date, create_date, active_str, search_hash, **kwargs) + + # Aggregate the batch results + aggregated_results['results_count'] += batch_result['results_count'] + aggregated_results['total_count'] += batch_result['total_count'] + aggregated_results['limit_exceeded'] |= batch_result.get('limit_exceeded', False) + aggregated_results['results'].extend(batch_result['results']) + aggregated_results['missing_domains'].extend(batch_result.get('missing_domains', [])) + + aggregated_results['has_more_results'] = aggregated_results['results_count'] < aggregated_results['total_count'] + else: + # Handle non-batched requests or single domain requests + kwargs['active'] = active_str + while True: + response = self._results( + 'iris-investigate', '/v1/iris-investigate/', domain=domains, + data_updated_after=data_updated_after, expiration_date=expiration_date, + create_date=create_date, position=aggregated_results['position'], items_path=('results',), **kwargs) + + # Aggregate the results + aggregated_results['results_count'] += response['results_count'] + aggregated_results['total_count'] = response['total_count'] + aggregated_results['limit_exceeded'] |= response.get('limit_exceeded', False) + aggregated_results['results'].extend(response.get('results', [])) + aggregated_results['position'] = response.get('position', None) + aggregated_results['has_more_results'] = response.get('has_more_results', False) + aggregated_results['missing_domains'].extend(response.get('missing_domains', [])) + + if not response.get('has_more_results', False): + break + + return aggregated_results + + def iris_enrich(self, *domains, **kwargs): + # Flatten the possible nested list from *domains using a generator expression + all_domains = (d for domain in domains for d in (domain if isinstance(domain, list) else [domain])) + all_domains = list(all_domains) # Convert generator to list, ensuring only one iteration over input domains + + # Set batch size limit + batch_size = 100 + + # Check if batched processing is needed + batched = kwargs.get('batched', False) + if batched: + del kwargs['batched'] + + if len(all_domains) > batch_size and not batched: + # Handle batching by splitting domains into chunks and calling iris_enrich recursively for each batch + aggregated_results = { + 'limit_exceeded': False, + "message": "Enjoy your data.", + 'results_count': 0, + 'results': [], + 'missing_domains': [], + + } + + for i in range(0, len(all_domains), batch_size): + batch_domains = all_domains[i:i + batch_size] + batch_result = self.iris_enrich(*batch_domains, batched=True, **kwargs) + + aggregated_results['results_count'] += batch_result['results_count'] + aggregated_results['missing_domains'].extend(batch_result['missing_domains']) + if batch_result['limit_exceeded']: + aggregated_results['limit_exceeded'] = True + aggregated_results['results'].extend(batch_result['results']) + + return aggregated_results + else: + # Process a single domain or smaller batch of domains + if not all_domains: + raise ValueError('One or more domains to enrich must be provided') + + domains_str = ','.join(all_domains) + data_updated_after = kwargs.get('data_updated_after', None) + if hasattr(data_updated_after, 'strftime'): + data_updated_after = data_updated_after.strftime('%Y-%m-%d') + + # Make the API call for single or smaller batches and return the single result set + return self._results('iris-enrich', '/v1/iris-enrich/', domain=domains_str, + data_updated_after=data_updated_after, items_path=('results',), **kwargs) + def iris_detect_new_domains(self, monitor_id=None, tlds=None, risk_score_ranges=None, mx_exists=None, discovered_since=None, changed_since=None, search=None, sort=None, order=None, include_domain_data=False, offset=0, limit=None, preview=None, **kwargs): From 759e4fd0587b770d4e010ad57cd980329993c1b8 Mon Sep 17 00:00:00 2001 From: shallman1 <105513486+shallman1@users.noreply.github.com> Date: Wed, 27 Dec 2023 12:01:49 -0500 Subject: [PATCH 2/2] Update api.py added investigate cli function --- domaintools/api.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/domaintools/api.py b/domaintools/api.py index 299aec9..a1812e1 100644 --- a/domaintools/api.py +++ b/domaintools/api.py @@ -307,7 +307,34 @@ def iris_enrich_cli(self, domains=None, **kwargs): items_path=('results',), **kwargs) + def iris_investigate_cli(self, domains=None, **kwargs): + """ + Returns back investigation data related to the specified domains using our Iris Investigate service. + This is a CLI version of the iris_investigate method to help maintain backwards compatibility. + + For example: + investigate_domains = ['google.com', 'amazon.com'] + assert api.iris_investigate_cli(*investigate_domains)['missing_domains'] == [] + """ + if not domains: + raise ValueError('One or more domains to investigate must be provided') + # Handle the domains, whether passed as a list, tuple, or individual domains + if isinstance(domains, (list, tuple)): + domains = ','.join(domains) + elif isinstance(domains, str): + domains = domains + + # Format the additional parameters that require special formatting (e.g., dates) + data_updated_after = kwargs.get('data_updated_after', None) + if hasattr(data_updated_after, 'strftime'): + data_updated_after = data_updated_after.strftime('%Y-%m-%d') + + # Call the _results method similarly to iris_enrich_cli, adjusting the endpoint as needed + result = self._results('iris-investigate', '/v1/iris-investigate/', domain=domains, + data_updated_after=data_updated_after, **kwargs) + + return result def iris_detect_monitors(self, include_counts=False, datetime_counts_since=None, sort=None, order="desc", offset=0, limit=None, **kwargs): """Returns back a list of monitors in Iris Detect based on the provided filters.