UN-2860 [FIX] Fixed active file cache not preventing duplicate file processing#1570
UN-2860 [FIX] Fixed active file cache not preventing duplicate file processing#1570harini-venkataraman merged 6 commits intomainZipstack/unstract:mainfrom fix/UN-2860-FIX_active-file-cache-preventing-duplicate-processingZipstack/unstract:fix/UN-2860-FIX_active-file-cache-preventing-duplicate-processingCopy head branch name to clipboard
Conversation
…rocessing
Fixed critical bug where ActiveFileFilter cache checks were failing to detect
files already being processed, causing duplicate file processing in concurrent
workflow executions.
Key fixes:
- Fixed cache data access: Extract execution_id from nested cache structure
(cached_data["data"]["execution_id"] instead of cached_data["execution_id"])
- Changed cache status from "EXECUTING" to "PENDING" for queued files
- Increased MAX_ACTIVE_FILE_CACHE_TTL from 1hr to 2hrs for resource-constrained environments
- Added cache cleanup in finally blocks to prevent stale entries
- Fixed cache key format consistency (hash-based) between backend and workers
- Optimized DB queries to skip files already found in cache
- Removed ~370 lines of dead code (file_management_utils.py and unused methods)
Root cause: RedisCacheBackend wraps data in {data: {...}, cached_at, ttl} but
filter_pipeline was accessing execution_id directly instead of from nested data key.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Summary by CodeRabbit
WalkthroughRemoved per-file cache lookups in backend active-file checks and replaced them with a single bulk database query (path-aware and UUID-only branches); workers now perform cache-first filtering, submit only remaining composite identifiers to the backend, and always attempt cache cleanup after pre-create attempts; several utility modules and exports were removed and cache TTL/status semantics adjusted. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant W as Worker FilterPipeline
participant AFM as ActiveFileManager (cache)
participant API as Backend internal_views
participant DB as Database
rect rgb(245,248,255)
note over W,AFM: Cache-first filtering
W->>AFM: Create cache keys, read cache entries
AFM-->>W: Cached-active entries (data.execution_id)
W->>W: Compute remaining_identifiers (exclude cached-active)
end
alt no remaining_identifiers
W-->>W: Skip backend call
else remaining_identifiers present
rect rgb(248,245,255)
note over W,API: Path-aware bulk query
W->>API: Request with composite ids (uuid:path)
API->>DB: Bulk check (path-aware or UUID-only)
DB-->>API: Active results
API-->>W: active_identifiers (preferred) or legacy active_uuids
end
W->>W: Map response to original identifiers
end
note over W: Proceed with non-active files only
sequenceDiagram
autonumber
participant T as Worker Task (_pre_create_file_executions)
participant C as Cache
participant L as Logger
T->>L: Log using file history (includes execution_id)
rect rgb(245,255,247)
note over T: Attempt pre-create per file (try)
T->>T: Create WorkflowFileExecution
end
rect rgb(255,249,245)
note over T,C: Guaranteed cache cleanup (finally)
T->>C: _cleanup_file_cache_entry(file_hash, workflow_id, file_name)
C-->>T: Success or error
T->>L: Warn on cleanup failure (non-fatal)
end
sequenceDiagram
autonumber
participant API as Backend internal_views
participant DB as Database
note over API: Backend no longer reads cache for active-file checks
API->>DB: Bulk query active files (path-aware or UUID-only)
DB-->>API: Results
API->>API: Log db_queries only
API-->>Caller: { active_identifiers | active_uuids, db_queries }
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro Cache: Disabled due to Reviews > Disable Cache setting Knowledge base: Disabled due to 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/workflow_manager/internal_views.py (1)
756-767: db_queries count is always set to 2 (inaccurate metrics)Set db_queries based on whether path-aware and/or legacy queries will actually run.
- self._bulk_database_check( + # Pre-compute which query paths will run + has_path = any(f["path"] is not None for f in files_needing_db_check) + has_legacy = any(f["path"] is None for f in files_needing_db_check) + self._bulk_database_check( files_needing_db_check=files_needing_db_check, workflow_id=workflow_id, current_execution_id=current_execution_id, active_files=active_files, active_identifiers=active_identifiers, ) - db_queries = 2 # At most 2 bulk queries (path-aware + legacy) + db_queries = int(has_path) + int(has_legacy) # 0–2 depending on pathsAlso applies to: 771-772
🧹 Nitpick comments (6)
workers/sample.env (1)
260-262: Add units/bounds note for clarityACTIVE_FILE_CACHE_TTL is seconds and capped in code. Consider adding a brief comment (e.g., “seconds, 60–7200”) to avoid misconfiguration.
workers/shared/clients/file_client.py (1)
130-132: Logging additions look goodIncluding execution_id improves traceability. Consider structured logs (extra=…) for easier filtering downstream.
Also applies to: 223-223, 227-229
backend/workflow_manager/internal_views.py (1)
739-754: Avoid duplicate entries in files_needing_db_checkIf input contains repeated uuid/path, we’ll add duplicates and grow Q unnecessarily. Deduplicate by composite_id for performance.
- for file_data in files: + seen = set() + for file_data in files: provider_uuid = file_data["uuid"] file_path = file_data.get("path") composite_id = ( f"{provider_uuid}:{file_path}" if file_path else provider_uuid ) - - # All files need database check - files_needing_db_check.append( + if composite_id in seen: + continue + seen.add(composite_id) + files_needing_db_check.append( { "uuid": provider_uuid, "path": file_path, "composite_id": composite_id, } )workers/file_processing/tasks.py (1)
840-875: Tolerate cleanup errors, but silence BLE001Catching broad Exception is acceptable in a finally cleanup; annotate to satisfy Ruff BLE001.
- except Exception as cleanup_error: + except Exception as cleanup_error: # noqa: BLE001 logger.warning(f"Cache cleanup failed for {file_name}: {cleanup_error}") # Don't raise - cache will expire anywayPlease confirm this cleanup timing is intentional: removing the “active” cache right after pre-creating WorkflowFileExecution relies on DB-based active checks to prevent duplicates during processing.
workers/shared/processing/filter_pipeline.py (2)
639-646: Correct: read execution_id from cached_data['data'] and reuse manager key logicThis fixes the root cause and ensures key parity. LGTM.
Prefer a public helper (e.g., ActiveFileManager.create_cache_key) rather than calling a private method (_create_cache_key) across modules to reduce brittleness.
654-656: Consider lowering to debugThis cache-hit count log may be noisy at INFO in high volume pipelines.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (8)
backend/workflow_manager/internal_views.py(4 hunks)workers/file_processing/tasks.py(3 hunks)workers/sample.env(1 hunks)workers/shared/clients/file_client.py(2 hunks)workers/shared/processing/filter_pipeline.py(4 hunks)workers/shared/workflow/execution/__init__.py(0 hunks)workers/shared/workflow/execution/active_file_manager.py(3 hunks)workers/shared/workflow/execution/file_management_utils.py(0 hunks)
💤 Files with no reviewable changes (2)
- workers/shared/workflow/execution/init.py
- workers/shared/workflow/execution/file_management_utils.py
🧰 Additional context used
🪛 Ruff (0.13.3)
workers/file_processing/tasks.py
872-872: Do not catch blind exception: Exception
(BLE001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (3)
workers/shared/workflow/execution/active_file_manager.py (1)
421-421: Status PENDING alignmentChanging cache entry status to PENDING is consistent with upstream use. LGTM.
Also applies to: 538-538
workers/file_processing/tasks.py (1)
976-979: Finally cleanup is good; ensure file_hash is always setfile_hash is created before the try, so this is safe. LGTM.
If any early-return paths are added later, ensure file_hash exists to avoid UnboundLocalError.
workers/shared/processing/filter_pipeline.py (1)
660-675: No staleexecution_idlookups detected
Repository-wide search found no directcached_data.get("execution_id")calls. LGTM.
…-duplicate-processing
…processing' of github.com:Zipstack/unstract into fix/UN-2860-FIX_active-file-cache-preventing-duplicate-processing
…-duplicate-processing
|
|
|
What
Why
RedisCacheBackendwraps cached data in{data: {...}, cached_at: "...", ttl: 300}butfilter_pipeline.py:643was accessingexecution_iddirectly instead of from the nesteddatakeyNoneforcached_exec_idHow
Critical fix in filter_pipeline.py (line 645):
cached_data.get("execution_id")→cached_data.get("data", {}).get("execution_id")Cache lifecycle improvements in active_file_manager.py:
Guaranteed cache cleanup in file_processing/tasks.py:
_cleanup_file_cache_entry()helper methodCache key consistency:
Database query optimization in filter_pipeline.py:
Code cleanup:
file_management_utils.py(~150 lines unused)filter_and_cache_files()method (~197 lines unused)_check_database_active_files()method (~23 lines unused)Can this PR break any existing features. If yes, please list possible items. If no, please explain why.
No, this PR will not break existing features:
get("data", {}).get("execution_id")pattern safely handles both old and new cache formats (returns None if structure differs)Risk mitigation:
Database Migrations
Env Config
ACTIVE_FILE_CACHE_TTLto customize cache TTL (default: 300s, max: 7200s)Relevant Docs
workers/shared/cache/cache_backends.pyworkers/shared/processing/filter_pipeline.pyworkers/shared/workflow/execution/active_file_manager.pyRelated Issues or PRs
Dependencies Versions
Notes on Testing
How to test:
file_active:*keys exist with correct execution IDsRedis verification:
Log verification:
Screenshots
N/A - Backend/worker cache logic fix
Checklist
✅ I have read and understood the Contribution Guidelines.
🤖 Generated with Claude Code