Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion 2 .github/workflows/test-build-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.9", "3.10", "3.11"]
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]

steps:
- uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion 2 VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.6.1
2.7.0
2 changes: 1 addition & 1 deletion 2 domaintools/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@

"""

current = "2.6.1"
current = "2.7.0"
80 changes: 49 additions & 31 deletions 80 domaintools/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

import re
import ssl
import yaml

from domaintools.constants import (
Endpoint,
OutputFormat,
ENDPOINT_TO_SOURCE_MAP,
RTTF_PRODUCTS_LIST,
RTTF_PRODUCTS_CMD_MAPPING,
SPECS_MAPPING,
)
from domaintools._version import current as version
from domaintools.results import (
Expand All @@ -22,6 +24,7 @@
Results,
FeedsResults,
)
from domaintools.decorators import api_endpoint, auto_patch_docstrings
from domaintools.filters import (
filter_by_riskscore,
filter_by_expire_date,
Expand All @@ -40,6 +43,7 @@ def delimited(items, character="|"):
return character.join(items) if type(items) in (list, tuple, set) else items


@auto_patch_docstrings
class API(object):
"""Enables interacting with the DomainTools API via Python:

Expand Down Expand Up @@ -94,8 +98,10 @@ def __init__(
self.key_sign_hash = key_sign_hash
self.default_parameters["app_name"] = app_name
self.default_parameters["app_version"] = app_version
self.specs = {}

self._build_api_url(api_url, api_port)
self._initialize_specs()

if not https:
raise Exception(
Expand All @@ -104,8 +110,25 @@ def __init__(
if proxy_url and not isinstance(proxy_url, str):
raise Exception("Proxy URL must be a string. For example: '127.0.0.1:8888'")

def _initialize_specs(self):
for spec_name, file_path in SPECS_MAPPING.items():
try:
with open(file_path, "r", encoding="utf-8") as f:
spec_content = yaml.safe_load(f)
if not spec_content:
raise ValueError("Spec file is empty or invalid.")

self.specs[spec_name] = spec_content

except Exception as e:
print(f"Error loading {file_path}: {e}")

def _get_ssl_default_context(self, verify_ssl: Union[str, bool]):
return ssl.create_default_context(cafile=verify_ssl) if isinstance(verify_ssl, str) else verify_ssl
return (
ssl.create_default_context(cafile=verify_ssl)
if isinstance(verify_ssl, str)
else verify_ssl
)

def _build_api_url(self, api_url=None, api_port=None):
"""Build the API url based on the given url and port. Defaults to `https://api.domaintools.com`"""
Expand Down Expand Up @@ -133,11 +156,18 @@ def _rate_limit(self, product):
hours = limit_hours and 3600 / float(limit_hours)
minutes = limit_minutes and 60 / float(limit_minutes)

self.limits[product["id"]] = {"interval": timedelta(seconds=minutes or hours or default)}
self.limits[product["id"]] = {
"interval": timedelta(seconds=minutes or hours or default)
}

def _results(self, product, path, cls=Results, **kwargs):
"""Returns _results for the specified API path with the specified **kwargs parameters"""
if product != "account-information" and self.rate_limit and not self.limits_set and not self.limits:
if (
product != "account-information"
and self.rate_limit
and not self.limits_set
and not self.limits
):
always_sign_api_key_previous_value = self.always_sign_api_key
header_authentication_previous_value = self.header_authentication
self._rate_limit(product)
Expand Down Expand Up @@ -181,7 +211,9 @@ def handle_api_key(self, is_rttf_product, path, parameters):
else:
raise ValueError(
"Invalid value '{0}' for 'key_sign_hash'. "
"Values available are {1}".format(self.key_sign_hash, ",".join(AVAILABLE_KEY_SIGN_HASHES))
"Values available are {1}".format(
self.key_sign_hash, ",".join(AVAILABLE_KEY_SIGN_HASHES)
)
)

parameters["timestamp"] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
Expand All @@ -193,7 +225,9 @@ def handle_api_key(self, is_rttf_product, path, parameters):

def account_information(self, **kwargs):
"""Provides a snapshot of your accounts current API usage"""
return self._results("account-information", "/v1/account", items_path=("products",), **kwargs)
return self._results(
"account-information", "/v1/account", items_path=("products",), **kwargs
)

def available_api_calls(self):
"""Provides a list of api calls that you can use based on your account information."""
Expand Down Expand Up @@ -396,7 +430,9 @@ def reputation(self, query, include_reasons=False, **kwargs):

def reverse_ip(self, domain=None, limit=None, **kwargs):
"""Pass in a domain name."""
return self._results("reverse-ip", "/v1/{0}/reverse-ip".format(domain), limit=limit, **kwargs)
return self._results(
"reverse-ip", "/v1/{0}/reverse-ip".format(domain), limit=limit, **kwargs
)

def host_domains(self, ip=None, limit=None, **kwargs):
"""Pass in an IP address."""
Expand Down Expand Up @@ -570,8 +606,12 @@ def iris_enrich(self, *domains, **kwargs):
younger_than_date = kwargs.pop("younger_than_date", {}) or None
older_than_date = kwargs.pop("older_than_date", {}) or None
updated_after = kwargs.pop("updated_after", {}) or None
include_domains_with_missing_field = kwargs.pop("include_domains_with_missing_field", {}) or None
exclude_domains_with_missing_field = kwargs.pop("exclude_domains_with_missing_field", {}) or None
include_domains_with_missing_field = (
kwargs.pop("include_domains_with_missing_field", {}) or None
)
exclude_domains_with_missing_field = (
kwargs.pop("exclude_domains_with_missing_field", {}) or None
)

filtered_results = DTResultFilter(result_set=results).by(
[
Expand Down Expand Up @@ -624,6 +664,7 @@ def iris_enrich_cli(self, domains=None, **kwargs):
**kwargs,
)

@api_endpoint(spec_name="iris", path="/v1/iris-investigate/", methods="post")
def iris_investigate(
self,
domains=None,
Expand All @@ -641,29 +682,6 @@ def iris_investigate(
**kwargs,
):
"""Returns back a list of domains based on the provided filters.
The following filters are available beyond what is parameterized as kwargs:

- ip: Search for domains having this IP.
- email: Search for domains with this email in their data.
- email_domain: Search for domains where the email address uses this domain.
- nameserver_host: Search for domains with this nameserver.
- nameserver_domain: Search for domains with a nameserver that has this domain.
- nameserver_ip: Search for domains with a nameserver on this IP.
- registrar: Search for domains with this registrar.
- registrant: Search for domains with this registrant name.
- registrant_org: Search for domains with this registrant organization.
- mailserver_host: Search for domains with this mailserver.
- mailserver_domain: Search for domains with a mailserver that has this domain.
- mailserver_ip: Search for domains with a mailserver on this IP.
- redirect_domain: Search for domains which redirect to this domain.
- ssl_hash: Search for domains which have an SSL certificate with this hash.
- ssl_subject: Search for domains which have an SSL certificate with this subject string.
- ssl_email: Search for domains which have an SSL certificate with this email in it.
- ssl_org: Search for domains which have an SSL certificate with this organization in it.
- google_analytics: Search for domains which have this Google Analytics code.
- adsense: Search for domains which have this AdSense code.
- tld: Filter by TLD. Must be combined with another parameter.
- search_hash: Use search hash from Iris to bring back domains.

You can loop over results of your investigation as if it was a native Python list:

Expand Down
7 changes: 4 additions & 3 deletions 7 domaintools/base_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ def _get_session_params_and_headers(self):
headers["accept"] = HEADER_ACCEPT_KEY_CSV_FORMAT

if self.api.header_authentication:
header_key_for_api_key = "X-Api-Key" if is_rttf_product else "X-API-Key"
headers[header_key_for_api_key] = self.api.key
headers["X-Api-Key"] = self.api.key

session_param_and_headers = {"parameters": parameters, "headers": headers}
return session_param_and_headers
Expand Down Expand Up @@ -342,7 +341,9 @@ def html(self):
)

def as_list(self):
return "\n".join([json.dumps(item, indent=4, separators=(",", ": ")) for item in self._items()])
return "\n".join(
[json.dumps(item, indent=4, separators=(",", ": ")) for item in self._items()]
)

def __str__(self):
return str(
Expand Down
5 changes: 5 additions & 0 deletions 5 domaintools/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,8 @@ class OutputFormat(Enum):
"real-time-domain-discovery-feed-(api)": "domaindiscovery",
"real-time-domain-discovery-feed-(s3)": "domaindiscovery",
}

SPECS_MAPPING = {
"iris": "domaintools/specs/iris-openapi.yaml",
# "rttf": "domaintools/specs/feeds-openapi.yaml",
}
105 changes: 105 additions & 0 deletions 105 domaintools/decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import functools
import inspect

from typing import List, Union

from domaintools.docstring_patcher import DocstringPatcher
from domaintools.request_validator import RequestValidator


def api_endpoint(spec_name: str, path: str, methods: Union[str, List[str]]):
"""
Decorator to tag a method as an API endpoint AND validate inputs.

Args:
spec_name: The key for the spec in api_instance.specs
path: The API path (e.g., "/users")
methods: A single method ("get") or list of methods (["get", "post"])
"""

def decorator(func):
func._api_spec_name = spec_name
func._api_path = path

# Normalize methods to a list
normalized_methods = [methods] if isinstance(methods, str) else methods
func._api_methods = normalized_methods

# Get the signature of the original function ONCE
sig = inspect.signature(func)

@functools.wraps(func)
def wrapper(self, *args, **kwargs):

try:
bound_args = sig.bind(*args, **kwargs)
except TypeError:
# If arguments don't match signature, let the actual func raise the error
return func(*args, **kwargs)

arguments = bound_args.arguments

# Robustly find 'self' (it's usually the first argument in bound_args)
# We look for the first value in arguments, or try to get 'self' explicitly.
instance = arguments.pop("self", None)
if not instance and args:
instance = args[0]

# Retrieve the Spec from the instance
# We assume 'self' has a .specs attribute (like DocstringPatcher expects)
spec = getattr(self, "specs", {}).get(spec_name)

if "domains" in arguments.keys():
domains = arguments.pop("domains")
arguments["domain"] = (
",".join(domains) if isinstance(domains, (list, tuple)) else domains
)

if spec:
# Determine which HTTP method is currently being executed.
# If the function allows dynamic methods (e.g. method="POST"), use that.
# Otherwise, default to the first method defined in the decorator.
current_method = kwargs.get("method", normalized_methods[0])

# Run Validation
# This will raise a ValueError and stop execution if validation fails.
try:
RequestValidator.validate(
spec=spec,
path=path,
method=current_method,
parameters=arguments,
)
except ValueError as e:
print(f"[Validation Error] {e}")
raise e

# Proceed with the original function call
return func(*args, **kwargs)

# Copy tags to wrapper for the DocstringPatcher to find
wrapper._api_spec_name = func._api_spec_name
wrapper._api_path = func._api_path
wrapper._api_methods = func._api_methods

return wrapper

return decorator


def auto_patch_docstrings(cls):
original_init = cls.__init__

@functools.wraps(original_init)
def new_init(self, *args, **kwargs):
original_init(self, *args, **kwargs)
try:
# We instantiate our patcher and run it
patcher = DocstringPatcher()
patcher.patch(self)
except Exception as e:
print(f"Auto-patching failed: {e}")

cls.__init__ = new_init

return cls
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.