RAG Service
← Back to Documentation Center

CSV Pipeline

Overview

Processes CSV files uploaded under the blob path conversionfiles/csv/raw/. The pipeline parses rows, produces normalized JSONL artifacts, optionally generates embeddings + uploads documents to an Azure AI Search index, and persists artifacts both to blob storage (parsed path) and a local repository artifacts/ mirror for offline inspection and testing.

Current Architecture

Core modules (post‑refactor): - processors/csv/core.py – High level run_csv_processing(blob_url, ...) used by triggers; handles raw blob fetch, status initialization, local/blob storage abstraction, and final persistence. - processors/csv/service.py – Trigger‑agnostic processing & orchestration (run_csv_pipeline, process_csv_embeddings_and_indexing, mapping detection helpers, CSV loading, document creation, embedding/index upload coordination). - processors/csv/eg.py – Event Grid trigger implementation (CSVProcessor) plus legacy compatibility exports; now also re‑exports run_csv_processing and embedding helpers. - Shared utilities leveraged: - shared/storage_io.py (local vs blob IO + directory uploads) - shared/persist_utils.py (timestamped artifact copy to repo under artifacts/csv) - shared/status_utils.py (safe wrapper around shared.status_store for progress updates) - shared/config_resolver.py (single global CONFIG object; remote or local config resolution) - shared/embedding_upload.py (generic embed + upload batching with progress callbacks & retries)

Backward compatibility: Tests and older code that monkeypatch processors.csv.eg.Retrieval still work because service.process_csv_embeddings_and_indexing now dynamically prefers eg.Retrieval if present.

Triggers & Entry Points

  • Event Grid: Function CSVProcessor defined in processors/csv/eg.py (previous docs referenced an old path).
  • (Optional) HTTP trigger or direct invocation can call run_csv_processing or run_csv_pipeline directly for local/testing scenarios.

Data Contracts

Input: - Blob: conversionfiles/csv/raw/<file>.csv

Primary Output Artifacts (in blob storage): - conversionfiles/csv/parsed/<file>/csv_documents.jsonl - conversionfiles/csv/parsed/<file>/csv_documents_index_ready.jsonl - conversionfiles/csv/parsed/<file>/csv_summary.json - (If embeddings enabled) conversionfiles/csv/parsed/<file>/csv_documents_index_ready_embedded.jsonl - (On failures) conversionfiles/csv/parsed/<file>/csv_index_upload_failures.json

Local Persistence Mirror (repository): - artifacts/csv/<file>_<YYYYMMDD_HHMMSS>/* – Timestamped copy of the above summary + JSONL artifacts (facilitates test assertions & manual inspection without browsing storage).

Processing Stages

  1. Download / local fetch of raw CSV (supports local dev storage via USE_LOCAL_STORAGE).
  2. Parse CSV → row documents with metadata; produce csv_documents.jsonl + identical initial csv_documents_index_ready.jsonl.
  3. (Optional) Embeddings & AI Search indexing (batch embed & upload with progress + retries).
  4. Write summary + embedded artifact (if embeddings run) + failures file when upload errors occur.
  5. Persist artifacts to blob parsed path and local timestamped directory.
  6. Update status store through lifecycle (progress percentages roughly: parse 30–50, doc creation 50–70, embedding 70–85, upload 85–95, completion 100).

Example Artifact Lines

csv_documents.jsonl: {"id":"-r12","original_id":"-r12","content":"name=Alice,score=95","row":12,"columns":["name","score"],"source_url":""}

csv_documents_index_ready.jsonl: {"id":"-r12","content":"name=Alice,score=95","row":12,"source_url":""}

csv_documents_index_ready_embedded.jsonl (example – after embedding; additional embedding metadata fields may be present per Retrieval implementation).

Configuration & Environment Variables

Central configuration: loaded once via shared/config_resolver.CONFIG (may download remote YAML or use local fallback).

Key environment variables: - AzureWebJobsStorage – Connection string (or UseDevelopmentStorage=true). - USE_LOCAL_STORAGE / LOCAL_STORAGE_ROOT – Enable & root path for local filesystem mirror during tests. - CSV_PERSIST_OUT_ROOT – Override destination root for persisted artifacts (default: artifacts/csv). - FORCE_REPARSE – Force re‑processing even if parsed artifacts exist. - Embedding/index tuning: - EMBED_PROGRESS_CHUNK – Doc batch size for progress increments during embedding (default 50). - INDEX_UPLOAD_PROGRESS_CHUNK – Doc batch size for upload progress (default 50). - CSV_UPLOAD_MAX_RETRIES – Upload batch retry attempts (default 2). - CSV_RETRY_BASE_DELAY – Base delay (seconds) for exponential retry backoff (default 1). - Progress/index routing: - CSV_TARGET_INDEX_ALIAS / per‑file metadata (mapping + target index hints).

ID Policy

Row number included to ensure uniqueness; if a row splits into multiple chunks (future extension) chunks should append -pN suffix. Embedded variant preserves original IDs.

Error Modes & Observability

  • Malformed CSV → parse errors logged; status updated with pipeline_error stage.
  • Missing index‑ready file before embeddings → logged and embeddings skipped.
  • Upload batch failures → retried; permanent failures appended to csv_index_upload_failures.json with counts in summary.
  • Status tracking performed via shared.status_utils wrappers (defensive try/except around store operations).
  • Progress callback events (embedding/upload) mapped to percentage ranges (see stages above).

Persistence Strategy

Artifacts always written to a temp working directory, uploaded (blob) and then copied into a timestamped folder under artifacts/csv using persist_artifacts(..., timestamp=True). This enables multiple test runs without clobbering prior results.

Tests

Location: tests/unit/csv/

Test File Purpose
test_csv_core_processing.py Exercises run_csv_processing core flow (parsing + artifact generation) with local persistence.
test_csv_embedding_orchestration.py Unit tests for embedding + indexing orchestration (success path, retry/failure classification, text dedup logic). Monkeypatches processors.csv.eg.Retrieval to inject dummy retrieval behavior.
test_csv_local_offline.py Fully local offline test (no embeddings) verifying artifacts & console output, using local storage abstraction.
test_csv_local_no_monkeypatch.py Natural path run (optionally skipping embeddings if RAG components absent) validating end‑to‑end pipeline outputs.

Running just the CSV unit suite:

pytest tests/unit/csv -q

Using marker selection (markers registered in root pytest.ini):

pytest -m unit tests/unit/csv

Embedding tests require RAG components; if those imports fail they are skipped automatically (pytest.skipif).

Maintenance Notes

  • When modifying embedding behavior ensure process_csv_embeddings_and_indexing continues to import eg.Retrieval dynamically so existing test monkeypatches remain valid.
  • To add new processors, replicate the core/service/trigger layering and reuse shared modules for consistency.

Quick Verification Checklist

  1. Upload CSV blob to conversionfiles/csv/raw/ (or run local test) – status store shows parsing stage.
  2. Confirm parsed artifacts under blob + local artifacts/csv/<file>_<timestamp>/.
  3. If embeddings enabled, ensure csv_documents_index_ready_embedded.jsonl present and csv_summary.json has documents_uploaded count.
  4. Review csv_index_upload_failures.json if failures > 0.
  5. Run unit tests before/after refactors to guarantee no regression in artifact schema.