diff --git a/README.md b/README.md index e6bead4..d05b195 100644 --- a/README.md +++ b/README.md @@ -12,45 +12,50 @@ Or you can install it from source: pip install git+https://github.com/cubicbyte/tempmail-python.git ``` -## Usage example +## Examples +Receive a message (e.g. activation code) ```python -import tempmail +from tempmail import EMail -# Create a new email address -email = tempmail.get_email() -print(email) +email = EMail() -# Wait for a new message -msg = tempmail.wait_for_message(email) -print(msg['body']) +# ... request some email ... +msg = email.wait_for_message() +print(msg.body) # Hello World!\n ``` -Output: + +Get all messages in the inbox ```python -# vhpeptbsne@1secmail.com -# Hello World! +from tempmail import EMail + +email = EMail(username='example', domain='1secmail.com') +inbox = email.get_inbox() + +for msg_info in inbox: + print(msg_info.subject, msg_info.message.body) ``` -Using message filters: +Download an attachment ```python -import tempmail +from tempmail import EMail -email = tempmail.get_email() -print(email) +email = EMail('example@1secmail.com') +msg = email.wait_for_message() -# Wait for a new message from a specific sender -msg = tempmail.wait_for_message(email, filter=lambda m: m['from'] == 'no-reply@example.com') -print(msg['body']) -``` +if msg.attachments: + attachment = msg.attachments[0] + data = attachment.download() -## API + # Print + print(data) # b'Hello World!\n' + print(data.decode('utf-8')) # Hello World!\n -- `tempmail.get_email(username=None, domain=None)`: Generate a new email address. -- `tempmail.get_inbox(email)`: Retrieve a list of message IDs for the specified email address. -- `tempmail.get_message(email, id)`: Retrieve the contents of a message with the specified ID. -- `tempmail.wait_for_message(email, timeout=None, filter=None)`: Wait for a new message to arrive at the specified email address. You can optionally provide a timeout (in seconds) and a filter function to check the contents of the message. -- `tempmail.DOMAINS`: List of available email domains. + # Save to file + with open(attachment.filename, 'wb') as f: + f.write(data) +``` ## License tempmail-python is licensed under the MIT License. See the [LICENSE](LICENSE) file for more information. diff --git a/requirements.txt b/requirements.txt index 663bd1f..1ea19ae 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -requests \ No newline at end of file +requests>=2.19.0 \ No newline at end of file diff --git a/setup.py b/setup.py index 8bf2814..2fea7fa 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ def read(path: str) -> str: setup( name='tempmail-python', - version='1.0.0', + version='2.0.0', description='Python library for generating and managing temporary email addresses.', long_description=read('README.md'), long_description_content_type='text/markdown', @@ -17,7 +17,7 @@ def read(path: str) -> str: license='MIT', keywords='disposable-email temporary-email temp-email temp-mail email mail email-generator mail-generator', install_requires=[ - 'requests', + 'requests>=2.19.0', ], classifiers=[ 'Development Status :: 5 - Production/Stable', diff --git a/tempmail/__init__.py b/tempmail/__init__.py index 6a615ce..853d672 100644 --- a/tempmail/__init__.py +++ b/tempmail/__init__.py @@ -2,94 +2,10 @@ Example usage: ```python -import py_tempmail - -email = py_tempmail.get_email() -print(f"Your email address is {email}") - -print("Waiting for a message...") -message = py_tempmail.wait_for_message(email) -print(f"New message received: {message['subject']}")``` +TODO +``` """ -import time -import random -from functools import cache - -import requests - -from . import utils - -__all__ = ('get_email', 'get_inbox', 'get_message', 'wait_for_message', 'DOMAINS') - -DOMAINS = [ - '1secmail.com', - '1secmail.org', - '1secmail.net', - 'kzccv.com', - 'qiott.com', - 'wuuvo.com', - 'icznn.com', - 'ezztt.com', - ] -"""List of allowed email domains""" - - -@cache -def _check_message(email: str, id: int, filter: callable) -> tuple[bool, dict[str, any]]: - """Check if a message matches the filter""" - message = get_message(email, id) - return filter(message), message - - -def get_email(username: str | None = None, domain: str | None = None) -> str: - """Generate an email address""" - if username is None: - username = utils.random_string(10) - if domain is None: - domain = random.choice(DOMAINS) - return f'{username}@{domain}' - - -def get_inbox(email: str) -> list[dict[str, any]]: - """Get the inbox of the email address""" - username, domain = email.split('@') - - if domain not in DOMAINS: - raise ValueError(f'Invalid domain: {domain}') - - resp = requests.get(f'https://www.1secmail.com/api/v1/?action=getMessages&login={username}&domain={domain}') - return resp.json() - - -def get_message(email: str, id: int) -> dict[str, any]: - """Get a message from the inbox""" - username, domain = email.split('@') - - if domain not in DOMAINS: - raise ValueError(f'Invalid domain: {domain}') - - resp = requests.get(f'https://www.1secmail.com/api/v1/?action=readMessage&login={username}&domain={domain}&id={id}') - return resp.json() - - -def wait_for_message(email: str, timeout: int | None = 60, filter: callable = lambda _: True) -> dict[str, any]: - """Wait for a message to arrive in the inbox""" - username, domain = email.split('@') - - if domain not in DOMAINS: - raise ValueError(f'Invalid domain: {domain}') - - timeout_time = time.time() + timeout if timeout else None - while timeout is None or time.time() < timeout_time: - resp = requests.get(f'https://www.1secmail.com/api/v1/?action=getMessages&login={username}&domain={domain}').json() - - if resp: - for msg_info in resp: - status, msg = _check_message(email, msg_info['id'], filter) - if status: - return msg - - time.sleep(1) +from .providers import OneSecMail as EMail - raise TimeoutError('Timed out waiting for message') +__all__ = ('EMail',) diff --git a/tempmail/providers.py b/tempmail/providers.py new file mode 100644 index 0000000..51fc7ca --- /dev/null +++ b/tempmail/providers.py @@ -0,0 +1,147 @@ +import time +import random +from dataclasses import dataclass + +import requests + +from . import utils + +__all__ = ('OneSecMail',) + + +class OneSecMail: + """1secmail.com API wrapper""" + + def __init__(self, email: str | None = None, username: str | None = None, domain: str | None = None) -> None: + if email is not None: + username, domain = email.split('@') + + if domain is not None and domain not in self.get_domains(): + raise ValueError(f'Invalid domain: {domain}') + + self.session = requests.Session() + self.username = username or utils.random_string(10) + self.domain = domain or random.choice(self.get_domains()) + + def get_inbox(self) -> list['OneSecMail.MessageInfo']: + """Get the inbox of the email address""" + resp = self.session.get(f'https://www.1secmail.com/api/v1/?action=getMessages&login={self.username}&domain={self.domain}') + resp.raise_for_status() + return [OneSecMail.MessageInfo.from_dict(self, msg_info) for msg_info in resp.json()] + + @utils.cache + def get_message(self, id: int) -> 'OneSecMail.Message': + """Get a message from the inbox""" + resp = self.session.get(f'https://www.1secmail.com/api/v1/?action=readMessage&login={self.username}&domain={self.domain}&id={id}') + resp.raise_for_status() + return OneSecMail.Message.from_dict(self, resp.json()) + + @utils.cache + def download_attachment(self, id: int, file: str) -> bytes: + """Download an attachment from a message as bytes""" + resp = self.session.get(f'https://www.1secmail.com/api/v1/?action=download&login={self.username}&domain={self.domain}&id={id}&file={file}') + resp.raise_for_status() + return resp.content + + def wait_for_message(self, timeout: int | None = 60, filter: callable = lambda _: True) -> 'OneSecMail.Message': + """Wait for a message to arrive in the inbox + + :param timeout: How long to wait for a message to arrive, in seconds + :param filter: A message filter function that takes a message and returns a boolean + """ + + timeout_time = time.time() + timeout if timeout is not None else None + + while timeout is None or time.time() < timeout_time: + inbox = self.get_inbox() + for msg_info in inbox: + if filter(msg_info.message): + return msg_info.message + time.sleep(1) + + raise TimeoutError('Timed out waiting for message') + + @staticmethod + @utils.cache + def get_domains() -> tuple[str, ...]: + """List of allowed email domains""" + resp = requests.get('https://www.1secmail.com/api/v1/?action=getDomainList') + resp.raise_for_status() + return tuple(resp.json()) + + def __str__(self) -> str: + return f'{self.username}@{self.domain}' + + @dataclass + class MessageInfo: + id: int + from_addr: str + subject: str + date_str: str + _mail: 'OneSecMail' + + @property + def message(self) -> 'OneSecMail.Message': + return self._mail.get_message(self.id) + + @classmethod + def from_dict(cls, mail: 'OneSecMail', msg_info: dict[str, any]) -> 'OneSecMail.MessageInfo': + return cls( + _mail=mail, + id=msg_info['id'], + from_addr=msg_info['from'], + subject=msg_info['subject'], + date_str=msg_info['date'], + ) + + @dataclass + class Message: + id: int + from_addr: str + subject: str + date: str + body: str + test_body: str + html_body: str + _mail: 'OneSecMail' + _attachments: list[dict[str, any]] + + @classmethod + def from_dict(cls, mail: 'OneSecMail', msg: dict[str, any]) -> 'OneSecMail.Message': + return cls( + _mail=mail, + _attachments=msg['attachments'], + id=msg['id'], + from_addr=msg['from'], + subject=msg['subject'], + date=msg['date'], + body=msg['textBody'], + test_body=msg['textBody'], + html_body=msg['htmlBody'], + ) + + @property + def attachments(self) -> list['OneSecMail.Attachment']: + return [OneSecMail.Attachment.from_dict(self._mail, self.id, attachment) for attachment in self._attachments] + + @dataclass + class Attachment: + filename: str + content_type: str + size: int + _mail: 'OneSecMail' + _message_id: int + + def download(self) -> bytes: + """Download the attachment as bytes""" + return self._mail.download_attachment(self._message_id, self.filename) + + @classmethod + def from_dict(cls, mail: 'OneSecMail', message_id: int, attachment: dict[str, any]) -> 'OneSecMail.Attachment': + return cls( + _mail=mail, + _message_id=message_id, + filename=attachment['filename'], + content_type=attachment['contentType'], + size=attachment['size'], + ) diff --git a/tempmail/utils.py b/tempmail/utils.py index 73ea5fc..6dc265c 100644 --- a/tempmail/utils.py +++ b/tempmail/utils.py @@ -1,6 +1,18 @@ import random import string +import functools + +__all__ = ('random_string', 'cache') def random_string(length: int): return ''.join(random.choices(string.ascii_lowercase, k=length)) + + +def cache(func): + """Cache the result of a function with saved type hints""" + @functools.lru_cache + @functools.wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + return wrapper