CSV Pipeline
Overview
Processes CSV files uploaded under the blob path conversionfiles/csv/raw/. The pipeline parses rows, produces normalized JSONL artifacts, optionally generates embeddings + uploads documents to an Azure AI Search index, and persists artifacts both to blob storage (parsed path) and a local repository artifacts/ mirror for offline inspection and testing.
Current Architecture
Core modules (post‑refactor):
- processors/csv/core.py – High level run_csv_processing(blob_url, ...) used by triggers; handles raw blob fetch, status initialization, local/blob storage abstraction, and final persistence.
- processors/csv/service.py – Trigger‑agnostic processing & orchestration (run_csv_pipeline, process_csv_embeddings_and_indexing, mapping detection helpers, CSV loading, document creation, embedding/index upload coordination).
- processors/csv/eg.py – Event Grid trigger implementation (CSVProcessor) plus legacy compatibility exports; now also re‑exports run_csv_processing and embedding helpers.
- Shared utilities leveraged:
- shared/storage_io.py (local vs blob IO + directory uploads)
- shared/persist_utils.py (timestamped artifact copy to repo under artifacts/csv)
- shared/status_utils.py (safe wrapper around shared.status_store for progress updates)
- shared/config_resolver.py (single global CONFIG object; remote or local config resolution)
- shared/embedding_upload.py (generic embed + upload batching with progress callbacks & retries)
Backward compatibility: Tests and older code that monkeypatch processors.csv.eg.Retrieval still work because service.process_csv_embeddings_and_indexing now dynamically prefers eg.Retrieval if present.
Triggers & Entry Points
- Event Grid: Function
CSVProcessordefined inprocessors/csv/eg.py(previous docs referenced an old path). - (Optional) HTTP trigger or direct invocation can call
run_csv_processingorrun_csv_pipelinedirectly for local/testing scenarios.
Data Contracts
Input:
- Blob: conversionfiles/csv/raw/<file>.csv
Primary Output Artifacts (in blob storage):
- conversionfiles/csv/parsed/<file>/csv_documents.jsonl
- conversionfiles/csv/parsed/<file>/csv_documents_index_ready.jsonl
- conversionfiles/csv/parsed/<file>/csv_summary.json
- (If embeddings enabled) conversionfiles/csv/parsed/<file>/csv_documents_index_ready_embedded.jsonl
- (On failures) conversionfiles/csv/parsed/<file>/csv_index_upload_failures.json
Local Persistence Mirror (repository):
- artifacts/csv/<file>_<YYYYMMDD_HHMMSS>/* – Timestamped copy of the above summary + JSONL artifacts (facilitates test assertions & manual inspection without browsing storage).
Processing Stages
- Download / local fetch of raw CSV (supports local dev storage via
USE_LOCAL_STORAGE). - Parse CSV → row documents with metadata; produce
csv_documents.jsonl+ identical initialcsv_documents_index_ready.jsonl. - (Optional) Embeddings & AI Search indexing (batch embed & upload with progress + retries).
- Write summary + embedded artifact (if embeddings run) + failures file when upload errors occur.
- Persist artifacts to blob parsed path and local timestamped directory.
- Update status store through lifecycle (progress percentages roughly: parse 30–50, doc creation 50–70, embedding 70–85, upload 85–95, completion 100).
Example Artifact Lines
csv_documents.jsonl:
{"id":"
csv_documents_index_ready.jsonl:
{"id":"
csv_documents_index_ready_embedded.jsonl (example – after embedding; additional embedding metadata fields may be present per Retrieval implementation).
Configuration & Environment Variables
Central configuration: loaded once via shared/config_resolver.CONFIG (may download remote YAML or use local fallback).
Key environment variables:
- AzureWebJobsStorage – Connection string (or UseDevelopmentStorage=true).
- USE_LOCAL_STORAGE / LOCAL_STORAGE_ROOT – Enable & root path for local filesystem mirror during tests.
- CSV_PERSIST_OUT_ROOT – Override destination root for persisted artifacts (default: artifacts/csv).
- FORCE_REPARSE – Force re‑processing even if parsed artifacts exist.
- Embedding/index tuning:
- EMBED_PROGRESS_CHUNK – Doc batch size for progress increments during embedding (default 50).
- INDEX_UPLOAD_PROGRESS_CHUNK – Doc batch size for upload progress (default 50).
- CSV_UPLOAD_MAX_RETRIES – Upload batch retry attempts (default 2).
- CSV_RETRY_BASE_DELAY – Base delay (seconds) for exponential retry backoff (default 1).
- Progress/index routing:
- CSV_TARGET_INDEX_ALIAS / per‑file metadata (mapping + target index hints).
ID Policy
Row number included to ensure uniqueness; if a row splits into multiple chunks (future extension) chunks should append -pN suffix. Embedded variant preserves original IDs.
Error Modes & Observability
- Malformed CSV → parse errors logged; status updated with
pipeline_errorstage. - Missing index‑ready file before embeddings → logged and embeddings skipped.
- Upload batch failures → retried; permanent failures appended to
csv_index_upload_failures.jsonwith counts in summary. - Status tracking performed via
shared.status_utilswrappers (defensive try/except around store operations). - Progress callback events (embedding/upload) mapped to percentage ranges (see stages above).
Persistence Strategy
Artifacts always written to a temp working directory, uploaded (blob) and then copied into a timestamped folder under artifacts/csv using persist_artifacts(..., timestamp=True). This enables multiple test runs without clobbering prior results.
Tests
Location: tests/unit/csv/
| Test File | Purpose |
|---|---|
test_csv_core_processing.py |
Exercises run_csv_processing core flow (parsing + artifact generation) with local persistence. |
test_csv_embedding_orchestration.py |
Unit tests for embedding + indexing orchestration (success path, retry/failure classification, text dedup logic). Monkeypatches processors.csv.eg.Retrieval to inject dummy retrieval behavior. |
test_csv_local_offline.py |
Fully local offline test (no embeddings) verifying artifacts & console output, using local storage abstraction. |
test_csv_local_no_monkeypatch.py |
Natural path run (optionally skipping embeddings if RAG components absent) validating end‑to‑end pipeline outputs. |
Running just the CSV unit suite:
pytest tests/unit/csv -q
Using marker selection (markers registered in root pytest.ini):
pytest -m unit tests/unit/csv
Embedding tests require RAG components; if those imports fail they are skipped automatically (pytest.skipif).
Maintenance Notes
- When modifying embedding behavior ensure
process_csv_embeddings_and_indexingcontinues to importeg.Retrievaldynamically so existing test monkeypatches remain valid. - To add new processors, replicate the core/service/trigger layering and reuse shared modules for consistency.
Quick Verification Checklist
- Upload CSV blob to
conversionfiles/csv/raw/(or run local test) – status store shows parsing stage. - Confirm parsed artifacts under blob + local
artifacts/csv/<file>_<timestamp>/. - If embeddings enabled, ensure
csv_documents_index_ready_embedded.jsonlpresent andcsv_summary.jsonhasdocuments_uploadedcount. - Review
csv_index_upload_failures.jsonif failures > 0. - Run unit tests before/after refactors to guarantee no regression in artifact schema.