Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions 9 taskbadger/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,18 @@ def scrape_urls(self, urls):

def apply_async(self, *args, **kwargs):
headers = kwargs.setdefault("headers", {})
headers["taskbadger_track"] = True
tb_kwargs = self._get_tb_kwargs(kwargs)
if kwargs.get("kwargs"):
# extract taskbadger options from task kwargs when supplied as keyword argument
tb_kwargs.update(self._get_tb_kwargs(kwargs["kwargs"]))
elif len(args) > 1 and isinstance(args[1], dict):
# extract taskbadger options from task kwargs when supplied as positional argument
tb_kwargs.update(self._get_tb_kwargs(args[1]))
headers[TB_KWARGS_ARG] = tb_kwargs

if Badger.is_configured():
headers["taskbadger_track"] = True
headers[TB_KWARGS_ARG] = tb_kwargs

result = super().apply_async(*args, **kwargs)

tb_task_id = result.info.get(TB_TASK_ID) if result.info else None
Expand Down Expand Up @@ -150,13 +153,13 @@ def taskbadger_task(self):
@before_task_publish.connect
def task_publish_handler(sender=None, headers=None, body=None, **kwargs):
headers = headers if "task" in headers else body
header_kwargs = headers.pop(TB_KWARGS_ARG, {}) # always remove TB headers
if sender.startswith("celery.") or not Badger.is_configured():
return

celery_system = Badger.current.settings.get_system_by_id("celery")
auto_track = celery_system and celery_system.track_task(sender)
manual_track = headers.get("taskbadger_track")
header_kwargs = headers.pop(TB_KWARGS_ARG, {})
if not manual_track and not auto_track:
return

Expand Down
9 changes: 8 additions & 1 deletion 9 tests/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,14 @@ def add_no_tb(self, a, b):
with mock.patch("taskbadger.celery.create_task_safe") as create, mock.patch(
"taskbadger.celery.update_task_safe"
) as update:
result = add_no_tb.delay(2, 2)
result = add_no_tb.delay(
2,
2,
taskbadger_kwargs={
# add an action here to test serialization failure when Badger is not configured
"actions": [Action("stale", integration=EmailIntegration(to="test@test.com"))]
},
)
assert result.get(timeout=10, propagate=True) == 4

create.assert_not_called()
Expand Down
2 changes: 2 additions & 0 deletions 2 tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def test_session_multiple_threads():


class TestThread(threading.Thread):
__test__ = False

def __init__(self, name, barrier, clients):
threading.Thread.__init__(self, name=name)
self.barrier = barrier
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.