""" Utilities for indexing transcript JSON documents into Elasticsearch. Usage: python -m python_app.ingest --source data/video_metadata --index corner """ from __future__ import annotations import argparse import json import logging from pathlib import Path from typing import Dict, Iterable, Iterator, Optional from .config import CONFIG, AppConfig try: from elasticsearch import Elasticsearch, helpers # type: ignore except ImportError: # pragma: no cover - dependency optional Elasticsearch = None helpers = None LOGGER = logging.getLogger(__name__) def _ensure_client(config: AppConfig) -> "Elasticsearch": if Elasticsearch is None: raise RuntimeError( "elasticsearch package not installed. " "Install elasticsearch>=7 to index documents." ) kwargs = {} if config.elastic.api_key: kwargs["api_key"] = config.elastic.api_key elif config.elastic.username and config.elastic.password: kwargs["basic_auth"] = ( config.elastic.username, config.elastic.password, ) if config.elastic.ca_cert: kwargs["ca_certs"] = str(config.elastic.ca_cert) kwargs["verify_certs"] = config.elastic.verify_certs return Elasticsearch(config.elastic.url, **kwargs) def iter_json_documents(data_root: Path) -> Iterator[Dict]: """Yield JSON objects from the provided directory tree.""" if not data_root.exists(): raise FileNotFoundError(f"{data_root} does not exist") for path in sorted(data_root.rglob("*.json")): try: with path.open("r", encoding="utf-8") as handle: doc = json.load(handle) doc.setdefault("video_id", path.stem) yield doc except Exception as exc: LOGGER.warning("Skipping %s: %s", path, exc) def build_bulk_actions( docs: Iterable[Dict], *, index: Optional[str] = None ) -> Iterator[Dict]: """Translate raw JSON dictionaries into Elasticsearch bulk actions.""" for doc in docs: video_id = doc.get("video_id") if not video_id: continue parts = doc.get("transcript_parts") or doc.get("transcript") or [] transcript_full = doc.get("transcript_full") if not transcript_full and isinstance(parts, list): transcript_full = " ".join( segment.get("text", "") if isinstance(segment, dict) else str(segment) for segment in parts ).strip() yield { "_id": video_id, "_index": index or CONFIG.elastic.index, "_op_type": "index", "_source": { "video_id": video_id, "channel_id": doc.get("channel_id"), "channel_name": doc.get("channel_name"), "title": doc.get("title"), "description": doc.get("description"), "date": doc.get("date") or doc.get("published_at"), "url": doc.get("url"), "duration": doc.get("duration"), "transcript_full": transcript_full, "transcript_secondary_full": doc.get("transcript_secondary_full"), "transcript_parts": parts, "internal_references": doc.get("internal_references", []), "internal_references_count": doc.get("internal_references_count", 0), "referenced_by": doc.get("referenced_by", []), "referenced_by_count": doc.get("referenced_by_count", 0), }, } def ensure_index(client: "Elasticsearch", index: str) -> None: """Create the target index with a minimal mapping if it is missing.""" if client.indices.exists(index=index): return LOGGER.info("Creating index %s", index) client.indices.create( index=index, mappings={ "properties": { "video_id": {"type": "keyword"}, "channel_id": {"type": "keyword"}, "channel_name": {"type": "keyword"}, "title": {"type": "text"}, "description": {"type": "text"}, "date": {"type": "date", "format": "strict_date_optional_time"}, "url": {"type": "keyword"}, "duration": {"type": "float"}, "transcript_full": {"type": "text"}, "transcript_secondary_full": {"type": "text"}, "transcript_parts": { "type": "nested", "properties": { "start": {"type": "float"}, "duration": {"type": "float"}, "text": {"type": "text"}, }, }, "internal_references": {"type": "keyword"}, "internal_references_count": {"type": "integer"}, "referenced_by": {"type": "keyword"}, "referenced_by_count": {"type": "integer"}, } }, ) def ingest_directory( data_root: Path, *, config: AppConfig = CONFIG, index: Optional[str] = None, batch_size: int = 500, request_timeout: int = 120, ) -> None: """Bulk index every JSON file in the directory tree.""" client = _ensure_client(config) target_index = index or config.elastic.index ensure_index(client, target_index) docs = iter_json_documents(data_root) actions = build_bulk_actions(docs, index=target_index) bulk_client = client.options(request_timeout=request_timeout) helpers.bulk( bulk_client, actions, chunk_size=batch_size, ) LOGGER.info("Ingestion complete for %s", target_index) def _build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Ingest transcript JSON files into Elasticsearch." ) parser.add_argument( "--source", type=Path, default=CONFIG.data.root, help="Directory containing per-video JSON files.", ) parser.add_argument( "--index", help="Override the Elasticsearch index name.", ) parser.add_argument( "--batch-size", type=int, default=500, help="Bulk ingest batch size.", ) parser.add_argument( "--timeout", type=int, default=120, help="Request timeout (seconds) for bulk operations.", ) return parser def main() -> None: logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") args = _build_parser().parse_args() ingest_directory( args.source, index=args.index, batch_size=args.batch_size, request_timeout=args.timeout, ) if __name__ == "__main__": # pragma: no cover main()