RAG Service
← Back to Documentation Center

JSON Pipeline

This document explains the JSON ingestion pipeline in detail: triggers, validation, processing, artifacts, embeddings, indexing, and troubleshooting.

Overview

Processes JSON/JSONL files uploaded under conversionfiles/json/raw/. Produces parsed artifacts and indexes chunks into Azure AI Search.

Triggers & Code

  • Event Grid: JSONProcessor (BlobCreated for conversionfiles/json/raw). Implemented in processors/json/processor.py (function json_processor).
  • HTTP: JSONProcessorHttp at /api/json/process?blob_url=<url>&force_reparse=1&mapping=<hint> using the Python v2 Functions blueprint (@bp.route). Implemented in processors/json/processor.py (function json_processor_http).
  • GET with blob_url triggers processing; force_reparse overrides cache/short-circuit.

Contracts

  • Input
  • Blob URL under https://<account>.blob.core.windows.net/conversionfiles/json/raw/<file>.json|.jsonl.
  • Content-Type: application/json or application/x-ndjson.
  • Output
  • Artifacts under conversionfiles/json/parsed/<filename_without_ext>/.
  • Index docs adhere to the allowed schema subset below.
  • Error
  • HTTP returns {status: error, reason: ...} and logs diagnostics; Event Grid writes artifacts + logs.

Input Validation

  • Accepts .json or .jsonl under conversionfiles/json/raw/.
  • Rejects other paths with a clear error.

upload_id and Status Tracking

  • Derives upload_id from blob metadata or deterministic hash of the blob_url.
  • Writes progress to Table Storage (ingestion), including stage and counts.

Processing Stages (Technical)

  1. Download source JSON/JSONL.
  2. Parse records: stream for large files when available.
  3. Build document objects with standard fields: id, source_file, source_path, file_type=json, mime_type, ingestion_timestamp, chunk_index, text, extra_metadata, section_title/subsection_title (when available).
  4. Filter to allowed schema fields.
  5. Write artifacts:
  6. json_documents.jsonl (raw mapped docs)
  7. json_documents_index_ready.jsonl (schema-filtered)
  8. json_summary.json (counts, record array stats, timing)

Field Schema (allowed)

Only a subset is uploaded (enforced by allow-list): - id, source_file, source_path, file_type, mime_type - ingestion_timestamp, chunk_index - section_title, subsection_title, keyword, type - text, ocr_text, extra_metadata - contentVector (added after embedding)

Embeddings and Indexing

  • Lazy-loads Retrieval + Config from rag_shared when needed.
  • Pre-splitting: optional max chars + overlap to bound text length before embedding.
  • Adaptive fallback on context-window errors:
  • Re-chunk with fallback sizes, then batch by token estimate.
  • Unique IDs per split part: the first part keeps id; subsequent parts use id-pN and include original_id.
  • Write json_documents_index_ready_embedded.jsonl (adds contentVector).
  • Upload to Azure AI Search with strict field filtering.
  • On failures, write json_index_upload_failures.json with per-item diagnostics.

Environment knobs

  • JSON_EMBED_TEXT_MAX_CHARS / JSON_EMBED_CHUNK_OVERLAP: initial pre-split.
  • JSON_EMBED_FALLBACK_MAX_CHARS / JSON_EMBED_FALLBACK_OVERLAP / JSON_EMBED_FALLBACK_BATCH_TOKEN_TARGET: retry sizing and batching.
  • FORCE_FAKE_EMBEDDINGS: bypass real embeddings (testing).

Error handling

  • Embedding 400 errors for context limits trigger fallback; other exceptions surface as embedding_error in status.
  • If embeddings expected but the embedded file is absent, pipeline marks failure (unless fallback produced a placeholder earlier versions; current logic retries to completion).

Implementation Details

  • Orchestration
  • process_json_file coordinates: download → parse → map → artifacts → embeddings → upload; used by both triggers.
  • Status store updates granular stages (downloading_file, documents_created, generating_embeddings, uploading_to_index, indexing_completed).
  • Record discovery
  • Supports root arrays/objects and JSON_RECORD_ARRAYS env for nested arrays (e.g., logs,events).
  • json_summary.json includes record_arrays counts.
  • ID policy & collisions
  • Base deterministic id per logical doc; split parts suffix -pN and include original_id to avoid duplicate-key upload failures.
  • Upload filtering
  • Applies allow-list (ALLOWED_INDEX_FIELDS) + contentVector before calling Retrieval.upload_documents.
  • Writes json_index_upload_failures.json with per-result {id, status_code, error_message, text_len, vector_dim}.

Testing

Local HTTP

  1. Start Functions host.
  2. Call /api/json/process?blob_url=<encoded> with optional force_reparse=1.
  3. Check parsed artifacts under conversionfiles/json/parsed/<file>/ and Table ingestion row for upload_id.

Event Grid

  1. Upload a JSON to conversionfiles/json/raw/.
  2. Verify function logs and the parsed folder for artifacts.
  3. If failures, open json_index_upload_failures.json for details.

Artifacts

  • Blob path: conversionfiles/json/parsed/<filename_without_ext>/
  • json_summary.json
  • json_documents.jsonl
  • json_documents_index_ready.jsonl
  • json_documents_index_ready_embedded.jsonl
  • json_index_upload_failures.json (only if failures)

Schema examples

  • json_documents_index_ready.jsonl
  • { "id": "abc123", "source_file": "sample.json", "file_type": "json", "chunk_index": 0, "text": "...", "ingestion_timestamp": "...Z" }
  • json_documents_index_ready_embedded.jsonl
  • Adds contentVector: [float, ...].

HTTP vs Event Grid

  • HTTP returns a JSON body with status, counts, and upload_id; supports force_reparse.
  • Event Grid performs the same pipeline automatically upon blob creation; diagnostics in logs and artifacts.

Error Handling

  • Embedding context errors trigger automatic fallback rechunking and batching.
  • Missing embedding file (when embeddings expected) is treated as failure unless fallback succeeds.
  • Index upload failures are summarized and written to a diagnostic artifact.

Troubleshooting

  • If many upload failures with empty errors: check for duplicate ids; ensure suffixing is active.
  • If embeddings missing: confirm rag_shared and credentials/config, then review logs for fallback activity.
  • Validate only allowed fields are uploaded; extra fields are dropped before indexing.

Configuration

  • ai_search.* (endpoint, key/MI, index name); embeddings configured globally under app.models.embeddings.
  • storage.blob_storage.file_mappings routes .json to json/raw and outputs to json/parsed.
  • JSON embedding fallback envs for large documents.

Observability

  • Status table row keyed by upload_id: state, progress, processing_stage, counts, error_message. Uses direct StatusStore initialization (same pattern as PDF) with STORAGE_ACCOUNT_NAME and optional AZURE_TABLES_CONNECTION_STRING/AzureWebJobsStorage.
  • Logs include counts (items_processed, documents_created), upload success/fail, and size metrics.