TLC-Search/ingest.py
knight e998eadd79 Revert "Fix secondary transcript timestamps by indexing parts"
This reverts commit 2efe5e0c799d2a177a520506a22b7fcb037ffe47.
2025-11-05 00:57:29 -05:00

194 lines
6.1 KiB
Python

"""
Utilities for indexing transcript JSON documents into Elasticsearch.
Usage:
python -m python_app.ingest --source data/video_metadata --index corner
"""
from __future__ import annotations
import argparse
import json
import logging
from pathlib import Path
from typing import Dict, Iterable, Iterator, Optional
from .config import CONFIG, AppConfig
try:
from elasticsearch import Elasticsearch, helpers # type: ignore
except ImportError: # pragma: no cover - dependency optional
Elasticsearch = None
helpers = None
LOGGER = logging.getLogger(__name__)
def _ensure_client(config: AppConfig) -> "Elasticsearch":
if Elasticsearch is None:
raise RuntimeError(
"elasticsearch package not installed. "
"Install elasticsearch>=7 to index documents."
)
kwargs = {}
if config.elastic.api_key:
kwargs["api_key"] = config.elastic.api_key
elif config.elastic.username and config.elastic.password:
kwargs["basic_auth"] = (
config.elastic.username,
config.elastic.password,
)
if config.elastic.ca_cert:
kwargs["ca_certs"] = str(config.elastic.ca_cert)
kwargs["verify_certs"] = config.elastic.verify_certs
return Elasticsearch(config.elastic.url, **kwargs)
def iter_json_documents(data_root: Path) -> Iterator[Dict]:
"""Yield JSON objects from the provided directory tree."""
if not data_root.exists():
raise FileNotFoundError(f"{data_root} does not exist")
for path in sorted(data_root.rglob("*.json")):
try:
with path.open("r", encoding="utf-8") as handle:
doc = json.load(handle)
doc.setdefault("video_id", path.stem)
yield doc
except Exception as exc:
LOGGER.warning("Skipping %s: %s", path, exc)
def build_bulk_actions(
docs: Iterable[Dict], *, index: Optional[str] = None
) -> Iterator[Dict]:
"""Translate raw JSON dictionaries into Elasticsearch bulk actions."""
for doc in docs:
video_id = doc.get("video_id")
if not video_id:
continue
parts = doc.get("transcript_parts") or doc.get("transcript") or []
transcript_full = doc.get("transcript_full")
if not transcript_full and isinstance(parts, list):
transcript_full = " ".join(
segment.get("text", "") if isinstance(segment, dict) else str(segment)
for segment in parts
).strip()
yield {
"_id": video_id,
"_index": index or CONFIG.elastic.index,
"_op_type": "index",
"_source": {
"video_id": video_id,
"channel_id": doc.get("channel_id"),
"channel_name": doc.get("channel_name"),
"title": doc.get("title"),
"description": doc.get("description"),
"date": doc.get("date") or doc.get("published_at"),
"url": doc.get("url"),
"duration": doc.get("duration"),
"transcript_full": transcript_full,
"transcript_secondary_full": doc.get("transcript_secondary_full"),
"transcript_parts": parts,
},
}
def ensure_index(client: "Elasticsearch", index: str) -> None:
"""Create the target index with a minimal mapping if it is missing."""
if client.indices.exists(index=index):
return
LOGGER.info("Creating index %s", index)
client.indices.create(
index=index,
mappings={
"properties": {
"video_id": {"type": "keyword"},
"channel_id": {"type": "keyword"},
"channel_name": {"type": "keyword"},
"title": {"type": "text"},
"description": {"type": "text"},
"date": {"type": "date", "format": "strict_date_optional_time"},
"url": {"type": "keyword"},
"duration": {"type": "float"},
"transcript_full": {"type": "text"},
"transcript_secondary_full": {"type": "text"},
"transcript_parts": {
"type": "nested",
"properties": {
"start": {"type": "float"},
"duration": {"type": "float"},
"text": {"type": "text"},
},
},
}
},
)
def ingest_directory(
data_root: Path,
*,
config: AppConfig = CONFIG,
index: Optional[str] = None,
batch_size: int = 500,
request_timeout: int = 120,
) -> None:
"""Bulk index every JSON file in the directory tree."""
client = _ensure_client(config)
target_index = index or config.elastic.index
ensure_index(client, target_index)
docs = iter_json_documents(data_root)
actions = build_bulk_actions(docs, index=target_index)
bulk_client = client.options(request_timeout=request_timeout)
helpers.bulk(
bulk_client,
actions,
chunk_size=batch_size,
)
LOGGER.info("Ingestion complete for %s", target_index)
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Ingest transcript JSON files into Elasticsearch."
)
parser.add_argument(
"--source",
type=Path,
default=CONFIG.data.root,
help="Directory containing per-video JSON files.",
)
parser.add_argument(
"--index",
help="Override the Elasticsearch index name.",
)
parser.add_argument(
"--batch-size",
type=int,
default=500,
help="Bulk ingest batch size.",
)
parser.add_argument(
"--timeout",
type=int,
default=120,
help="Request timeout (seconds) for bulk operations.",
)
return parser
def main() -> None:
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
args = _build_parser().parse_args()
ingest_directory(
args.source,
index=args.index,
batch_size=args.batch_size,
request_timeout=args.timeout,
)
if __name__ == "__main__": # pragma: no cover
main()