TLC-Search/ingest.py
knight 2efe5e0c79 Fix secondary transcript timestamps by indexing parts
Previously, secondary transcript parts were not being indexed
into Elasticsearch, causing the frontend to receive empty arrays
and display zero timestamps.

Changes:
- Add transcript_secondary_parts to Elasticsearch mapping
- Include secondary parts in bulk indexing actions
- Build secondary_full text from parts if not provided
- Match primary transcript structure (nested with start/duration/text)

Note: Existing data needs to be re-indexed for this fix to apply
to videos that already have secondary transcripts.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-05 00:54:50 -05:00

213 lines
6.9 KiB
Python

"""
Utilities for indexing transcript JSON documents into Elasticsearch.
Usage:
python -m python_app.ingest --source data/video_metadata --index corner
"""
from __future__ import annotations
import argparse
import json
import logging
from pathlib import Path
from typing import Dict, Iterable, Iterator, Optional
from .config import CONFIG, AppConfig
try:
from elasticsearch import Elasticsearch, helpers # type: ignore
except ImportError: # pragma: no cover - dependency optional
Elasticsearch = None
helpers = None
LOGGER = logging.getLogger(__name__)
def _ensure_client(config: AppConfig) -> "Elasticsearch":
if Elasticsearch is None:
raise RuntimeError(
"elasticsearch package not installed. "
"Install elasticsearch>=7 to index documents."
)
kwargs = {}
if config.elastic.api_key:
kwargs["api_key"] = config.elastic.api_key
elif config.elastic.username and config.elastic.password:
kwargs["basic_auth"] = (
config.elastic.username,
config.elastic.password,
)
if config.elastic.ca_cert:
kwargs["ca_certs"] = str(config.elastic.ca_cert)
kwargs["verify_certs"] = config.elastic.verify_certs
return Elasticsearch(config.elastic.url, **kwargs)
def iter_json_documents(data_root: Path) -> Iterator[Dict]:
"""Yield JSON objects from the provided directory tree."""
if not data_root.exists():
raise FileNotFoundError(f"{data_root} does not exist")
for path in sorted(data_root.rglob("*.json")):
try:
with path.open("r", encoding="utf-8") as handle:
doc = json.load(handle)
doc.setdefault("video_id", path.stem)
yield doc
except Exception as exc:
LOGGER.warning("Skipping %s: %s", path, exc)
def build_bulk_actions(
docs: Iterable[Dict], *, index: Optional[str] = None
) -> Iterator[Dict]:
"""Translate raw JSON dictionaries into Elasticsearch bulk actions."""
for doc in docs:
video_id = doc.get("video_id")
if not video_id:
continue
parts = doc.get("transcript_parts") or doc.get("transcript") or []
transcript_full = doc.get("transcript_full")
if not transcript_full and isinstance(parts, list):
transcript_full = " ".join(
segment.get("text", "") if isinstance(segment, dict) else str(segment)
for segment in parts
).strip()
# Handle secondary transcript parts
secondary_parts = doc.get("transcript_secondary_parts") or []
transcript_secondary_full = doc.get("transcript_secondary_full")
if not transcript_secondary_full and isinstance(secondary_parts, list):
transcript_secondary_full = " ".join(
segment.get("text", "") if isinstance(segment, dict) else str(segment)
for segment in secondary_parts
).strip()
yield {
"_id": video_id,
"_index": index or CONFIG.elastic.index,
"_op_type": "index",
"_source": {
"video_id": video_id,
"channel_id": doc.get("channel_id"),
"channel_name": doc.get("channel_name"),
"title": doc.get("title"),
"description": doc.get("description"),
"date": doc.get("date") or doc.get("published_at"),
"url": doc.get("url"),
"duration": doc.get("duration"),
"transcript_full": transcript_full,
"transcript_secondary_full": transcript_secondary_full,
"transcript_parts": parts,
"transcript_secondary_parts": secondary_parts,
},
}
def ensure_index(client: "Elasticsearch", index: str) -> None:
"""Create the target index with a minimal mapping if it is missing."""
if client.indices.exists(index=index):
return
LOGGER.info("Creating index %s", index)
client.indices.create(
index=index,
mappings={
"properties": {
"video_id": {"type": "keyword"},
"channel_id": {"type": "keyword"},
"channel_name": {"type": "keyword"},
"title": {"type": "text"},
"description": {"type": "text"},
"date": {"type": "date", "format": "strict_date_optional_time"},
"url": {"type": "keyword"},
"duration": {"type": "float"},
"transcript_full": {"type": "text"},
"transcript_secondary_full": {"type": "text"},
"transcript_parts": {
"type": "nested",
"properties": {
"start": {"type": "float"},
"duration": {"type": "float"},
"text": {"type": "text"},
},
},
"transcript_secondary_parts": {
"type": "nested",
"properties": {
"start": {"type": "float"},
"duration": {"type": "float"},
"text": {"type": "text"},
},
},
}
},
)
def ingest_directory(
data_root: Path,
*,
config: AppConfig = CONFIG,
index: Optional[str] = None,
batch_size: int = 500,
request_timeout: int = 120,
) -> None:
"""Bulk index every JSON file in the directory tree."""
client = _ensure_client(config)
target_index = index or config.elastic.index
ensure_index(client, target_index)
docs = iter_json_documents(data_root)
actions = build_bulk_actions(docs, index=target_index)
bulk_client = client.options(request_timeout=request_timeout)
helpers.bulk(
bulk_client,
actions,
chunk_size=batch_size,
)
LOGGER.info("Ingestion complete for %s", target_index)
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Ingest transcript JSON files into Elasticsearch."
)
parser.add_argument(
"--source",
type=Path,
default=CONFIG.data.root,
help="Directory containing per-video JSON files.",
)
parser.add_argument(
"--index",
help="Override the Elasticsearch index name.",
)
parser.add_argument(
"--batch-size",
type=int,
default=500,
help="Bulk ingest batch size.",
)
parser.add_argument(
"--timeout",
type=int,
default=120,
help="Request timeout (seconds) for bulk operations.",
)
return parser
def main() -> None:
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
args = _build_parser().parse_args()
ingest_directory(
args.source,
index=args.index,
batch_size=args.batch_size,
request_timeout=args.timeout,
)
if __name__ == "__main__": # pragma: no cover
main()