JSON Pipeline
This document explains the JSON ingestion pipeline in detail: triggers, validation, processing, artifacts, embeddings, indexing, and troubleshooting.
Overview
Processes JSON/JSONL files uploaded under conversionfiles/json/raw/. Produces parsed artifacts and indexes chunks into Azure AI Search.
Triggers & Code
- Event Grid:
JSONProcessor(BlobCreated forconversionfiles/json/raw). Implemented inprocessors/json/processor.py(functionjson_processor). - HTTP:
JSONProcessorHttpat/api/json/process?blob_url=<url>&force_reparse=1&mapping=<hint>using the Python v2 Functions blueprint (@bp.route). Implemented inprocessors/json/processor.py(functionjson_processor_http). - GET with
blob_urltriggers processing;force_reparseoverrides cache/short-circuit.
Contracts
- Input
- Blob URL under
https://<account>.blob.core.windows.net/conversionfiles/json/raw/<file>.json|.jsonl. - Content-Type:
application/jsonorapplication/x-ndjson. - Output
- Artifacts under
conversionfiles/json/parsed/<filename_without_ext>/. - Index docs adhere to the allowed schema subset below.
- Error
- HTTP returns
{status: error, reason: ...}and logs diagnostics; Event Grid writes artifacts + logs.
Input Validation
- Accepts
.jsonor.jsonlunderconversionfiles/json/raw/. - Rejects other paths with a clear error.
upload_id and Status Tracking
- Derives
upload_idfrom blob metadata or deterministic hash of theblob_url. - Writes progress to Table Storage (
ingestion), including stage and counts.
Processing Stages (Technical)
- Download source JSON/JSONL.
- Parse records: stream for large files when available.
- Build document objects with standard fields: id, source_file, source_path, file_type=json, mime_type, ingestion_timestamp, chunk_index, text, extra_metadata, section_title/subsection_title (when available).
- Filter to allowed schema fields.
- Write artifacts:
json_documents.jsonl(raw mapped docs)json_documents_index_ready.jsonl(schema-filtered)json_summary.json(counts, record array stats, timing)
Field Schema (allowed)
Only a subset is uploaded (enforced by allow-list): - id, source_file, source_path, file_type, mime_type - ingestion_timestamp, chunk_index - section_title, subsection_title, keyword, type - text, ocr_text, extra_metadata - contentVector (added after embedding)
Embeddings and Indexing
- Lazy-loads Retrieval + Config from
rag_sharedwhen needed. - Pre-splitting: optional max chars + overlap to bound text length before embedding.
- Adaptive fallback on context-window errors:
- Re-chunk with fallback sizes, then batch by token estimate.
- Unique IDs per split part: the first part keeps
id; subsequent parts useid-pNand includeoriginal_id. - Write
json_documents_index_ready_embedded.jsonl(addscontentVector). - Upload to Azure AI Search with strict field filtering.
- On failures, write
json_index_upload_failures.jsonwith per-item diagnostics.
Environment knobs
- JSON_EMBED_TEXT_MAX_CHARS / JSON_EMBED_CHUNK_OVERLAP: initial pre-split.
- JSON_EMBED_FALLBACK_MAX_CHARS / JSON_EMBED_FALLBACK_OVERLAP / JSON_EMBED_FALLBACK_BATCH_TOKEN_TARGET: retry sizing and batching.
- FORCE_FAKE_EMBEDDINGS: bypass real embeddings (testing).
Error handling
- Embedding 400 errors for context limits trigger fallback; other exceptions surface as
embedding_errorin status. - If embeddings expected but the embedded file is absent, pipeline marks failure (unless fallback produced a placeholder earlier versions; current logic retries to completion).
Implementation Details
- Orchestration
process_json_filecoordinates: download → parse → map → artifacts → embeddings → upload; used by both triggers.- Status store updates granular stages (
downloading_file,documents_created,generating_embeddings,uploading_to_index,indexing_completed). - Record discovery
- Supports root arrays/objects and
JSON_RECORD_ARRAYSenv for nested arrays (e.g.,logs,events). json_summary.jsonincludesrecord_arrayscounts.- ID policy & collisions
- Base deterministic id per logical doc; split parts suffix
-pNand includeoriginal_idto avoid duplicate-key upload failures. - Upload filtering
- Applies allow-list (
ALLOWED_INDEX_FIELDS) +contentVectorbefore callingRetrieval.upload_documents. - Writes
json_index_upload_failures.jsonwith per-result{id, status_code, error_message, text_len, vector_dim}.
Testing
Local HTTP
- Start Functions host.
- Call
/api/json/process?blob_url=<encoded>with optionalforce_reparse=1. - Check parsed artifacts under
conversionfiles/json/parsed/<file>/and Tableingestionrow forupload_id.
Event Grid
- Upload a JSON to
conversionfiles/json/raw/. - Verify function logs and the parsed folder for artifacts.
- If failures, open
json_index_upload_failures.jsonfor details.
Artifacts
- Blob path:
conversionfiles/json/parsed/<filename_without_ext>/ json_summary.jsonjson_documents.jsonljson_documents_index_ready.jsonljson_documents_index_ready_embedded.jsonljson_index_upload_failures.json(only if failures)
Schema examples
json_documents_index_ready.jsonl{ "id": "abc123", "source_file": "sample.json", "file_type": "json", "chunk_index": 0, "text": "...", "ingestion_timestamp": "...Z" }json_documents_index_ready_embedded.jsonl- Adds
contentVector: [float, ...].
HTTP vs Event Grid
- HTTP returns a JSON body with status, counts, and
upload_id; supportsforce_reparse. - Event Grid performs the same pipeline automatically upon blob creation; diagnostics in logs and artifacts.
Error Handling
- Embedding context errors trigger automatic fallback rechunking and batching.
- Missing embedding file (when embeddings expected) is treated as failure unless fallback succeeds.
- Index upload failures are summarized and written to a diagnostic artifact.
Troubleshooting
- If many upload failures with empty errors: check for duplicate ids; ensure suffixing is active.
- If embeddings missing: confirm
rag_sharedand credentials/config, then review logs for fallback activity. - Validate only allowed fields are uploaded; extra fields are dropped before indexing.
Configuration
ai_search.*(endpoint, key/MI, index name); embeddings configured globally underapp.models.embeddings.storage.blob_storage.file_mappingsroutes.jsontojson/rawand outputs tojson/parsed.- JSON embedding fallback envs for large documents.
Observability
- Status table row keyed by
upload_id:state,progress,processing_stage, counts,error_message. Uses direct StatusStore initialization (same pattern as PDF) withSTORAGE_ACCOUNT_NAMEand optionalAZURE_TABLES_CONNECTION_STRING/AzureWebJobsStorage. - Logs include counts (
items_processed,documents_created), upload success/fail, and size metrics.