194 lines
6.1 KiB
Python
194 lines
6.1 KiB
Python
"""
|
|
Utilities for indexing transcript JSON documents into Elasticsearch.
|
|
|
|
Usage:
|
|
python -m python_app.ingest --source data/video_metadata --index corner
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Dict, Iterable, Iterator, Optional
|
|
|
|
from .config import CONFIG, AppConfig
|
|
|
|
try:
|
|
from elasticsearch import Elasticsearch, helpers # type: ignore
|
|
except ImportError: # pragma: no cover - dependency optional
|
|
Elasticsearch = None
|
|
helpers = None
|
|
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
def _ensure_client(config: AppConfig) -> "Elasticsearch":
|
|
if Elasticsearch is None:
|
|
raise RuntimeError(
|
|
"elasticsearch package not installed. "
|
|
"Install elasticsearch>=7 to index documents."
|
|
)
|
|
kwargs = {}
|
|
if config.elastic.api_key:
|
|
kwargs["api_key"] = config.elastic.api_key
|
|
elif config.elastic.username and config.elastic.password:
|
|
kwargs["basic_auth"] = (
|
|
config.elastic.username,
|
|
config.elastic.password,
|
|
)
|
|
if config.elastic.ca_cert:
|
|
kwargs["ca_certs"] = str(config.elastic.ca_cert)
|
|
kwargs["verify_certs"] = config.elastic.verify_certs
|
|
return Elasticsearch(config.elastic.url, **kwargs)
|
|
|
|
|
|
def iter_json_documents(data_root: Path) -> Iterator[Dict]:
|
|
"""Yield JSON objects from the provided directory tree."""
|
|
if not data_root.exists():
|
|
raise FileNotFoundError(f"{data_root} does not exist")
|
|
for path in sorted(data_root.rglob("*.json")):
|
|
try:
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
doc = json.load(handle)
|
|
doc.setdefault("video_id", path.stem)
|
|
yield doc
|
|
except Exception as exc:
|
|
LOGGER.warning("Skipping %s: %s", path, exc)
|
|
|
|
|
|
def build_bulk_actions(
|
|
docs: Iterable[Dict], *, index: Optional[str] = None
|
|
) -> Iterator[Dict]:
|
|
"""Translate raw JSON dictionaries into Elasticsearch bulk actions."""
|
|
for doc in docs:
|
|
video_id = doc.get("video_id")
|
|
if not video_id:
|
|
continue
|
|
parts = doc.get("transcript_parts") or doc.get("transcript") or []
|
|
transcript_full = doc.get("transcript_full")
|
|
if not transcript_full and isinstance(parts, list):
|
|
transcript_full = " ".join(
|
|
segment.get("text", "") if isinstance(segment, dict) else str(segment)
|
|
for segment in parts
|
|
).strip()
|
|
yield {
|
|
"_id": video_id,
|
|
"_index": index or CONFIG.elastic.index,
|
|
"_op_type": "index",
|
|
"_source": {
|
|
"video_id": video_id,
|
|
"channel_id": doc.get("channel_id"),
|
|
"channel_name": doc.get("channel_name"),
|
|
"title": doc.get("title"),
|
|
"description": doc.get("description"),
|
|
"date": doc.get("date") or doc.get("published_at"),
|
|
"url": doc.get("url"),
|
|
"duration": doc.get("duration"),
|
|
"transcript_full": transcript_full,
|
|
"transcript_secondary_full": doc.get("transcript_secondary_full"),
|
|
"transcript_parts": parts,
|
|
},
|
|
}
|
|
|
|
|
|
def ensure_index(client: "Elasticsearch", index: str) -> None:
|
|
"""Create the target index with a minimal mapping if it is missing."""
|
|
if client.indices.exists(index=index):
|
|
return
|
|
LOGGER.info("Creating index %s", index)
|
|
client.indices.create(
|
|
index=index,
|
|
mappings={
|
|
"properties": {
|
|
"video_id": {"type": "keyword"},
|
|
"channel_id": {"type": "keyword"},
|
|
"channel_name": {"type": "keyword"},
|
|
"title": {"type": "text"},
|
|
"description": {"type": "text"},
|
|
"date": {"type": "date", "format": "strict_date_optional_time"},
|
|
"url": {"type": "keyword"},
|
|
"duration": {"type": "float"},
|
|
"transcript_full": {"type": "text"},
|
|
"transcript_secondary_full": {"type": "text"},
|
|
"transcript_parts": {
|
|
"type": "nested",
|
|
"properties": {
|
|
"start": {"type": "float"},
|
|
"duration": {"type": "float"},
|
|
"text": {"type": "text"},
|
|
},
|
|
},
|
|
}
|
|
},
|
|
)
|
|
|
|
|
|
def ingest_directory(
|
|
data_root: Path,
|
|
*,
|
|
config: AppConfig = CONFIG,
|
|
index: Optional[str] = None,
|
|
batch_size: int = 500,
|
|
request_timeout: int = 120,
|
|
) -> None:
|
|
"""Bulk index every JSON file in the directory tree."""
|
|
client = _ensure_client(config)
|
|
target_index = index or config.elastic.index
|
|
ensure_index(client, target_index)
|
|
docs = iter_json_documents(data_root)
|
|
actions = build_bulk_actions(docs, index=target_index)
|
|
bulk_client = client.options(request_timeout=request_timeout)
|
|
helpers.bulk(
|
|
bulk_client,
|
|
actions,
|
|
chunk_size=batch_size,
|
|
)
|
|
LOGGER.info("Ingestion complete for %s", target_index)
|
|
|
|
|
|
def _build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
description="Ingest transcript JSON files into Elasticsearch."
|
|
)
|
|
parser.add_argument(
|
|
"--source",
|
|
type=Path,
|
|
default=CONFIG.data.root,
|
|
help="Directory containing per-video JSON files.",
|
|
)
|
|
parser.add_argument(
|
|
"--index",
|
|
help="Override the Elasticsearch index name.",
|
|
)
|
|
parser.add_argument(
|
|
"--batch-size",
|
|
type=int,
|
|
default=500,
|
|
help="Bulk ingest batch size.",
|
|
)
|
|
parser.add_argument(
|
|
"--timeout",
|
|
type=int,
|
|
default=120,
|
|
help="Request timeout (seconds) for bulk operations.",
|
|
)
|
|
return parser
|
|
|
|
|
|
def main() -> None:
|
|
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
|
|
args = _build_parser().parse_args()
|
|
ingest_directory(
|
|
args.source,
|
|
index=args.index,
|
|
batch_size=args.batch_size,
|
|
request_timeout=args.timeout,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__": # pragma: no cover
|
|
main()
|