A Flask-based webhook service that detects suspicious behavior in GitHub organizations by analyzing webhook events in real-time.
- Real-time anomaly detection from GitHub webhook events
- Extensible rule-based system for detecting various suspicious behaviors
- Multiple notification channels (currently console, easily extensible)
- Type-safe domain models with comprehensive type hints
- Clean architecture designed for multi-developer collaboration
The application follows a clean, layered architecture:
github-anomaly-detector/
├── app/ # Application layer (Flask routes, webhook parsing)
├── domain/ # Domain layer (events, anomalies, rules)
├── services/ # Business logic (anomaly engine, state management)
├── notifiers/ # Notification handlers
└── tests/ # Test suite
- Strategy Pattern: Rules and notifiers are implemented via abstract base classes, making it trivial to add new detection rules or notification methods
- Adapter Pattern:
github_event_parser.pyadapts GitHub webhook format to our domain model - Dependency Injection: Components are injected at initialization for easy testing and extensibility
- Push Time Anomaly: Detects pushes between 14:00-16:00 UTC (configurable)
- Hacker Team Anomaly: Detects teams created with suspicious naming patterns (e.g., "hacker-*")
- Repository Lifetime Anomaly: Detects repositories created and deleted within 10 minutes
- Python 3.10+
- pip
- ngrok (for local testing with GitHub webhooks)
- Clone the repository
- Create and activate virtual environment:
python3 -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate
- Install dependencies:
pip install -r requirements.txt
- Run the application:
Or use the startup script:
python run.py
./start.sh
The server will start on http://localhost:8000
To add a new anomaly detection rule:
- Create a new file in
domain/rules/(e.g.,new_rule.py) - Inherit from
BaseRuleand implement required methods:
from typing import Any, Iterable
from domain.rules.base import BaseRule
from domain.anomalies import Anomaly
from domain.events import YourEventType
class YourNewRule(BaseRule):
def name(self) -> str:
return "your_rule_name"
def evaluate(self, event: Any) -> Iterable[Anomaly]:
if not isinstance(event, YourEventType):
return []
# Your detection logic here
if suspicious_condition:
return [
Anomaly(
rule_name=self.name(),
description="Description of the anomaly",
# ... other fields
)
]
return []- Register the rule in
app/server.py:
engine = AnomalyEngine([
PushTimeRule(),
HackerTeamRule(),
RepoLifetimeRule(repo_store),
YourNewRule(), # Add here
])To add a new notification method:
- Create a new file in
notifiers/(e.g.,email_notifier.py) - Inherit from
Notifierand implementnotify():
from typing import List
from notifiers.base import Notifier
from domain.anomalies import Anomaly
class EmailNotifier(Notifier):
def notify(self, anomalies: List[Anomaly]) -> None:
# Send email notifications
pass- Replace or add the notifier in
app/server.py:
notifier = EmailNotifier() # Or use multiple notifiersRun the integration tests:
PYTHONPATH=. python tests/run_integration_tests.pyOr run individual test files:
PYTHONPATH=. python tests/test_webhook_integration.py
PYTHONPATH=. python tests/test_anomaly_engine_demo.py-
Start ngrok to expose your local server:
ngrok http 8000
-
Copy the HTTPS URL (e.g.,
https://xxxx-xxxx.ngrok.io) -
Configure GitHub webhook:
- Go to your organization/repository Settings → Webhooks
- Click "Add webhook"
- Payload URL:
https://your-ngrok-url.ngrok.io/webhook - Content type:
application/json - Select events: Push events, Team events, Repository events
- Click "Add webhook"
The codebase follows best practices:
- Type hints throughout for better IDE support and maintainability
- Comprehensive docstrings explaining design decisions and extensibility
- Constants instead of magic numbers for easy configuration
- Error handling with graceful degradation
- Clean separation of concerns between layers
github-anomaly-detector/
├── app/
│ ├── server.py # Flask application and routes
│ └── github_event_parser.py # GitHub webhook → domain event adapter
├── domain/
│ ├── events.py # Domain event models
│ ├── anomalies.py # Anomaly data model
│ └── rules/
│ ├── base.py # Abstract rule interface
│ ├── push_time_rule.py # Push time detection
│ ├── hacker_team_rule.py # Team name detection
│ └── repo_lifetime_rule.py # Repository lifetime detection
├── services/
│ ├── anomaly_engine.py # Core detection engine
│ └── repo_state_store.py # Repository state tracking
├── notifiers/
│ ├── base.py # Abstract notifier interface
│ └── console_notifier.py # Console output notifier
├── tests/
│ ├── test_anomaly_engine_demo.py # Engine demo tests
│ ├── test_webhook_integration.py # Integration tests
│ └── run_integration_tests.py # Test runner
├── requirements.txt
├── run.py
└── start.sh
MIT