1734 lines
61 KiB
Python
1734 lines
61 KiB
Python
"""
|
|
Flask application exposing search, graph, and transcript endpoints for TLC.
|
|
|
|
Routes:
|
|
GET / -> static HTML search page.
|
|
GET /graph -> static reference graph UI.
|
|
GET /vector-search -> experimental Qdrant vector search UI.
|
|
GET /api/channels -> channels aggregation.
|
|
GET /api/search -> Elasticsearch keyword search.
|
|
POST /api/vector-search -> Qdrant vector similarity query.
|
|
GET /api/graph -> reference graph API.
|
|
GET /api/transcript -> transcript JSON payload.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import copy
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Iterable, List, Optional, Sequence, Set, Tuple
|
|
|
|
from collections import Counter, deque
|
|
from datetime import datetime
|
|
|
|
from flask import Flask, jsonify, request, send_from_directory
|
|
|
|
import requests
|
|
|
|
try:
|
|
from sentence_transformers import SentenceTransformer # type: ignore
|
|
except ImportError: # pragma: no cover - optional dependency
|
|
SentenceTransformer = None
|
|
|
|
from .config import CONFIG, AppConfig
|
|
|
|
try:
|
|
from elasticsearch import Elasticsearch # type: ignore
|
|
from elasticsearch import BadRequestError # type: ignore
|
|
except ImportError: # pragma: no cover - dependency optional
|
|
Elasticsearch = None
|
|
BadRequestError = Exception # type: ignore
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
_EMBED_MODEL = None
|
|
_EMBED_MODEL_NAME: Optional[str] = None
|
|
|
|
# Security constants
|
|
MAX_QUERY_SIZE = 100
|
|
MAX_OFFSET = 10000
|
|
ALLOWED_QDRANT_FILTER_FIELDS = {"channel_id", "date", "video_status", "external_reference"}
|
|
|
|
|
|
def sanitize_query_string(query: str) -> str:
|
|
"""
|
|
Sanitize user input for Elasticsearch query_string queries.
|
|
Removes dangerous field targeting and script injection patterns.
|
|
"""
|
|
if not query:
|
|
return "*"
|
|
sanitized = query.strip()
|
|
# Remove field targeting patterns like "_id:", "_source:", "script:"
|
|
dangerous_field_patterns = [
|
|
r'\b_[a-z_]+\s*:', # Internal fields like _id:, _source:
|
|
r'\bscript\s*:', # Script injection
|
|
]
|
|
for pattern in dangerous_field_patterns:
|
|
sanitized = re.sub(pattern, '', sanitized, flags=re.IGNORECASE)
|
|
# Remove excessive wildcards that could cause ReDoS
|
|
sanitized = re.sub(r'\*{2,}', '*', sanitized)
|
|
sanitized = re.sub(r'\?{2,}', '?', sanitized)
|
|
return sanitized.strip() or "*"
|
|
|
|
|
|
def validate_qdrant_filter(filters: Any) -> Dict[str, Any]:
|
|
"""
|
|
Validate and sanitize Qdrant filter objects.
|
|
Only allows whitelisted fields to prevent filter injection.
|
|
"""
|
|
if not isinstance(filters, dict):
|
|
return {}
|
|
validated: Dict[str, Any] = {}
|
|
for key, value in filters.items():
|
|
if key in ALLOWED_QDRANT_FILTER_FIELDS:
|
|
validated[key] = value
|
|
return validated
|
|
|
|
|
|
def _ensure_embedder(model_name: str) -> "SentenceTransformer":
|
|
global _EMBED_MODEL, _EMBED_MODEL_NAME
|
|
if SentenceTransformer is None: # pragma: no cover - optional dependency
|
|
raise RuntimeError(
|
|
"sentence-transformers is required for vector search. Install via pip install sentence-transformers."
|
|
)
|
|
if _EMBED_MODEL is None or _EMBED_MODEL_NAME != model_name:
|
|
LOGGER.info("Loading embedding model: %s", model_name)
|
|
_EMBED_MODEL = SentenceTransformer(model_name)
|
|
_EMBED_MODEL_NAME = model_name
|
|
return _EMBED_MODEL
|
|
|
|
|
|
def embed_query(text: str, *, model_name: str, expected_dim: int) -> List[float]:
|
|
embedder = _ensure_embedder(model_name)
|
|
vector = embedder.encode(
|
|
[f"query: {text}"],
|
|
show_progress_bar=False,
|
|
normalize_embeddings=True,
|
|
)[0].tolist()
|
|
if len(vector) != expected_dim:
|
|
raise RuntimeError(
|
|
f"Embedding dimension mismatch (expected {expected_dim}, got {len(vector)})"
|
|
)
|
|
return vector
|
|
|
|
|
|
def _ensure_client(config: AppConfig) -> "Elasticsearch":
|
|
if Elasticsearch is None:
|
|
raise RuntimeError(
|
|
"elasticsearch package not installed. "
|
|
"Install elasticsearch>=7 to run the Flask search app."
|
|
)
|
|
kwargs = {}
|
|
if config.elastic.api_key:
|
|
kwargs["api_key"] = config.elastic.api_key
|
|
elif config.elastic.username and config.elastic.password:
|
|
kwargs["basic_auth"] = (
|
|
config.elastic.username,
|
|
config.elastic.password,
|
|
)
|
|
if config.elastic.ca_cert:
|
|
kwargs["ca_certs"] = str(config.elastic.ca_cert)
|
|
kwargs["verify_certs"] = config.elastic.verify_certs
|
|
return Elasticsearch(config.elastic.url, **kwargs)
|
|
|
|
|
|
def metrics_payload(data_root: Path, include_external: bool = True) -> Dict[str, Any]:
|
|
total_items = 0
|
|
channel_counter: Counter = Counter()
|
|
channel_name_map: Dict[str, str] = {}
|
|
year_counter: Counter = Counter()
|
|
month_counter: Counter = Counter()
|
|
|
|
if not data_root.exists():
|
|
LOGGER.warning("Data directory %s not found; metrics will be empty.", data_root)
|
|
return {
|
|
"totalItems": 0,
|
|
"totalChannels": 0,
|
|
"itemsPerChannel": [],
|
|
"yearHistogram": [],
|
|
"recentMonths": [],
|
|
}
|
|
|
|
for path in data_root.rglob("*.json"):
|
|
try:
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
doc = json.load(handle)
|
|
except Exception:
|
|
continue
|
|
if not include_external and doc.get("external_reference"):
|
|
continue
|
|
total_items += 1
|
|
|
|
channel_id = doc.get("channel_id")
|
|
channel_name = doc.get("channel_name") or channel_id
|
|
if channel_id:
|
|
channel_counter[channel_id] += 1
|
|
if channel_name and channel_id not in channel_name_map:
|
|
channel_name_map[channel_id] = channel_name
|
|
|
|
date_value = doc.get("date") or doc.get("published_at")
|
|
dt: Optional[datetime] = None
|
|
if isinstance(date_value, str):
|
|
for fmt in ("%Y-%m-%d", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%SZ"):
|
|
try:
|
|
dt = datetime.strptime(date_value[: len(fmt)], fmt)
|
|
break
|
|
except Exception:
|
|
continue
|
|
elif isinstance(date_value, (int, float)):
|
|
try:
|
|
dt = datetime.fromtimestamp(date_value)
|
|
except Exception:
|
|
dt = None
|
|
|
|
if dt:
|
|
year_counter[str(dt.year)] += 1
|
|
month_counter[dt.strftime("%Y-%m")] += 1
|
|
|
|
items_per_channel = [
|
|
{
|
|
"label": channel_name_map.get(cid, cid),
|
|
"count": count,
|
|
}
|
|
for cid, count in channel_counter.most_common()
|
|
]
|
|
|
|
year_histogram = [
|
|
{"bucket": year, "count": year_counter[year]}
|
|
for year in sorted(year_counter.keys())
|
|
]
|
|
|
|
recent_months = sorted(month_counter.keys())
|
|
recent_months = recent_months[-12:]
|
|
recent_months_payload = [
|
|
{"bucket": month, "count": month_counter[month]} for month in recent_months
|
|
]
|
|
|
|
return {
|
|
"totalItems": total_items,
|
|
"totalChannels": len(channel_counter),
|
|
"itemsPerChannel": items_per_channel,
|
|
"yearHistogram": year_histogram,
|
|
"recentMonths": recent_months_payload,
|
|
}
|
|
|
|
|
|
def elastic_metrics_payload(
|
|
client: "Elasticsearch",
|
|
index: str,
|
|
*,
|
|
channel_field_candidates: Optional[List[str]] = None,
|
|
debug: bool = False,
|
|
include_external: bool = True,
|
|
) -> Dict[str, Any]:
|
|
if channel_field_candidates is None:
|
|
channel_field_candidates = ["channel_id.keyword", "channel_id"]
|
|
|
|
base_body: Dict[str, Any] = {
|
|
"size": 0,
|
|
"track_total_hits": True,
|
|
"aggs": {
|
|
"channels": {
|
|
"terms": {
|
|
"field": "channel_id.keyword",
|
|
"size": 500,
|
|
"order": {"_count": "desc"},
|
|
},
|
|
"aggs": {
|
|
"name": {
|
|
"top_hits": {
|
|
"size": 1,
|
|
"_source": {"includes": ["channel_name"]},
|
|
"sort": [
|
|
{
|
|
"channel_name.keyword": {
|
|
"order": "asc",
|
|
"missing": "_last",
|
|
"unmapped_type": "keyword",
|
|
}
|
|
}
|
|
],
|
|
}
|
|
}
|
|
},
|
|
},
|
|
"year_histogram": {
|
|
"date_histogram": {
|
|
"field": "date",
|
|
"calendar_interval": "year",
|
|
"format": "yyyy",
|
|
}
|
|
},
|
|
"month_histogram": {
|
|
"date_histogram": {
|
|
"field": "date",
|
|
"calendar_interval": "month",
|
|
"format": "yyyy-MM",
|
|
"order": {"_key": "asc"},
|
|
}
|
|
},
|
|
},
|
|
}
|
|
if not include_external:
|
|
base_body["query"] = {"bool": {"must_not": [{"term": {"external_reference": True}}]}}
|
|
|
|
last_error: Optional[Exception] = None
|
|
response: Optional[Dict[str, Any]] = None
|
|
for candidate_field in channel_field_candidates:
|
|
body = json.loads(json.dumps(base_body))
|
|
body["aggs"]["channels"]["terms"]["field"] = candidate_field
|
|
try:
|
|
if debug:
|
|
LOGGER.info(
|
|
"Elasticsearch metrics request: %s",
|
|
json.dumps({"index": index, "body": body}, indent=2),
|
|
)
|
|
response = client.search(index=index, body=body)
|
|
break
|
|
except BadRequestError as exc:
|
|
last_error = exc
|
|
if debug:
|
|
LOGGER.warning(
|
|
"Metrics aggregation failed for field %s: %s",
|
|
candidate_field,
|
|
exc,
|
|
)
|
|
if response is None:
|
|
raise last_error or RuntimeError("Unable to compute metrics from Elasticsearch.")
|
|
|
|
hits = response.get("hits", {})
|
|
total_items = hits.get("total", {}).get("value", 0)
|
|
|
|
if debug:
|
|
LOGGER.info(
|
|
"Elasticsearch metrics response: %s",
|
|
json.dumps(response, indent=2, default=str),
|
|
)
|
|
|
|
aggregations = response.get("aggregations", {})
|
|
channel_buckets = aggregations.get("channels", {}).get("buckets", [])
|
|
items_per_channel = []
|
|
for bucket in channel_buckets:
|
|
key = bucket.get("key")
|
|
channel_name = key
|
|
top_hits = (
|
|
bucket.get("name", {})
|
|
.get("hits", {})
|
|
.get("hits", [])
|
|
)
|
|
if top_hits:
|
|
channel_name = (
|
|
top_hits[0]
|
|
.get("_source", {})
|
|
.get("channel_name", channel_name)
|
|
)
|
|
items_per_channel.append(
|
|
{"label": channel_name or key, "count": bucket.get("doc_count", 0)}
|
|
)
|
|
|
|
year_buckets = aggregations.get("year_histogram", {}).get("buckets", [])
|
|
year_histogram = [
|
|
{
|
|
"bucket": bucket.get("key_as_string")
|
|
or str(bucket.get("key")),
|
|
"count": bucket.get("doc_count", 0),
|
|
}
|
|
for bucket in year_buckets
|
|
]
|
|
|
|
month_buckets = aggregations.get("month_histogram", {}).get("buckets", [])
|
|
recent_months_entries = [
|
|
{
|
|
"bucket": bucket.get("key_as_string")
|
|
or str(bucket.get("key")),
|
|
"count": bucket.get("doc_count", 0),
|
|
"_key": bucket.get("key"),
|
|
}
|
|
for bucket in month_buckets
|
|
]
|
|
recent_months_entries.sort(key=lambda item: item.get("_key", 0))
|
|
recent_months_payload = [
|
|
{"bucket": entry["bucket"], "count": entry["count"]}
|
|
for entry in recent_months_entries[-12:]
|
|
]
|
|
|
|
return {
|
|
"totalItems": total_items,
|
|
"totalChannels": len(items_per_channel),
|
|
"itemsPerChannel": items_per_channel,
|
|
"yearHistogram": year_histogram,
|
|
"recentMonths": recent_months_payload,
|
|
}
|
|
|
|
|
|
def parse_channel_params(values: Iterable[Optional[str]]) -> List[str]:
|
|
seen: Set[str] = set()
|
|
channels: List[str] = []
|
|
for value in values:
|
|
if not value:
|
|
continue
|
|
for part in str(value).split(","):
|
|
cleaned = part.strip()
|
|
if not cleaned or cleaned.lower() == "all":
|
|
continue
|
|
if cleaned not in seen:
|
|
seen.add(cleaned)
|
|
channels.append(cleaned)
|
|
return channels
|
|
|
|
|
|
def build_year_filter(year: Optional[str]) -> Optional[Dict]:
|
|
if not year:
|
|
return None
|
|
try:
|
|
year_int = int(year)
|
|
return {
|
|
"range": {
|
|
"date": {
|
|
"gte": f"{year_int}-01-01",
|
|
"lt": f"{year_int + 1}-01-01",
|
|
"format": "yyyy-MM-dd"
|
|
}
|
|
}
|
|
}
|
|
except (ValueError, TypeError):
|
|
return None
|
|
|
|
|
|
def build_channel_filter(channels: Optional[Sequence[str]]) -> Optional[Dict]:
|
|
if not channels:
|
|
return None
|
|
per_channel_clauses: List[Dict[str, Any]] = []
|
|
for value in channels:
|
|
if not value:
|
|
continue
|
|
per_channel_clauses.append(
|
|
{
|
|
"bool": {
|
|
"should": [
|
|
{"term": {"channel_id.keyword": value}},
|
|
{"term": {"channel_id": value}},
|
|
],
|
|
"minimum_should_match": 1,
|
|
}
|
|
}
|
|
)
|
|
if not per_channel_clauses:
|
|
return None
|
|
if len(per_channel_clauses) == 1:
|
|
return per_channel_clauses[0]
|
|
return {
|
|
"bool": {
|
|
"should": per_channel_clauses,
|
|
"minimum_should_match": 1,
|
|
}
|
|
}
|
|
|
|
|
|
def build_query_payload(
|
|
query: str,
|
|
*,
|
|
channels: Optional[Sequence[str]] = None,
|
|
year: Optional[str] = None,
|
|
sort: str = "relevant",
|
|
use_exact: bool = True,
|
|
use_fuzzy: bool = True,
|
|
use_phrase: bool = True,
|
|
use_query_string: bool = False,
|
|
include_external: bool = True,
|
|
) -> Dict:
|
|
filters: List[Dict] = []
|
|
should: List[Dict] = []
|
|
|
|
channel_filter = build_channel_filter(channels)
|
|
if channel_filter:
|
|
filters.append(channel_filter)
|
|
|
|
year_filter = build_year_filter(year)
|
|
if year_filter:
|
|
filters.append(year_filter)
|
|
|
|
if not include_external:
|
|
filters.append({"bool": {"must_not": [{"term": {"external_reference": True}}]}})
|
|
|
|
if use_query_string:
|
|
base_fields = ["title^3", "description^2", "transcript_full", "transcript_secondary_full"]
|
|
qs_query = sanitize_query_string(query or "")
|
|
query_body: Dict[str, Any] = {
|
|
"query_string": {
|
|
"query": qs_query,
|
|
"default_operator": "AND",
|
|
"fields": base_fields,
|
|
}
|
|
}
|
|
if filters:
|
|
query_body = {"bool": {"must": query_body, "filter": filters}}
|
|
body: Dict = {
|
|
"query": query_body,
|
|
"highlight": {
|
|
"fields": {
|
|
"transcript_full": {
|
|
"fragment_size": 160,
|
|
"number_of_fragments": 5,
|
|
"fragmenter": "span",
|
|
},
|
|
"transcript_secondary_full": {
|
|
"fragment_size": 160,
|
|
"number_of_fragments": 5,
|
|
"fragmenter": "span",
|
|
},
|
|
"title": {"number_of_fragments": 0},
|
|
"description": {
|
|
"fragment_size": 160,
|
|
"number_of_fragments": 1,
|
|
},
|
|
},
|
|
"require_field_match": False,
|
|
"pre_tags": ["<mark>"],
|
|
"post_tags": ["</mark>"],
|
|
"encoder": "html",
|
|
"max_analyzed_offset": 900000,
|
|
},
|
|
}
|
|
if sort == "newer":
|
|
body["sort"] = [{"date": {"order": "desc"}}]
|
|
elif sort == "older":
|
|
body["sort"] = [{"date": {"order": "asc"}}]
|
|
elif sort == "referenced":
|
|
body["sort"] = [{"referenced_by_count": {"order": "desc", "unmapped_type": "long"}}]
|
|
return body
|
|
|
|
if query:
|
|
base_fields = ["title^3", "description^2", "transcript_full", "transcript_secondary_full"]
|
|
if use_phrase:
|
|
should.append(
|
|
{
|
|
"match_phrase": {
|
|
"transcript_full": {
|
|
"query": query,
|
|
"slop": 2,
|
|
"boost": 10.0,
|
|
}
|
|
}
|
|
}
|
|
)
|
|
should.append(
|
|
{
|
|
"match_phrase": {
|
|
"transcript_secondary_full": {
|
|
"query": query,
|
|
"slop": 2,
|
|
"boost": 10.0,
|
|
}
|
|
}
|
|
}
|
|
)
|
|
should.append(
|
|
{
|
|
"match_phrase": {
|
|
"title": {
|
|
"query": query,
|
|
"slop": 0,
|
|
"boost": 50.0,
|
|
}
|
|
}
|
|
}
|
|
)
|
|
if use_fuzzy:
|
|
should.append(
|
|
{
|
|
"multi_match": {
|
|
"query": query,
|
|
"fields": base_fields,
|
|
"type": "best_fields",
|
|
"operator": "and",
|
|
"fuzziness": "AUTO",
|
|
"prefix_length": 1,
|
|
"max_expansions": 50,
|
|
"boost": 1.5,
|
|
}
|
|
}
|
|
)
|
|
if use_exact:
|
|
should.append(
|
|
{
|
|
"multi_match": {
|
|
"query": query,
|
|
"fields": base_fields,
|
|
"type": "best_fields",
|
|
"operator": "and",
|
|
"boost": 3.0,
|
|
}
|
|
}
|
|
)
|
|
|
|
if should:
|
|
query_body: Dict = {
|
|
"bool": {
|
|
"should": should,
|
|
"minimum_should_match": 1,
|
|
}
|
|
}
|
|
if filters:
|
|
query_body["bool"]["filter"] = filters
|
|
elif filters:
|
|
query_body = {"bool": {"filter": filters}}
|
|
else:
|
|
query_body = {"match_all": {}}
|
|
|
|
body: Dict = {
|
|
"query": query_body,
|
|
"highlight": {
|
|
"fields": {
|
|
"transcript_full": {
|
|
"fragment_size": 160,
|
|
"number_of_fragments": 5,
|
|
"fragmenter": "span",
|
|
},
|
|
"transcript_secondary_full": {
|
|
"fragment_size": 160,
|
|
"number_of_fragments": 5,
|
|
"fragmenter": "span",
|
|
},
|
|
"title": {"number_of_fragments": 0},
|
|
"description": {
|
|
"fragment_size": 160,
|
|
"number_of_fragments": 1,
|
|
},
|
|
},
|
|
"require_field_match": False,
|
|
"pre_tags": ["<mark>"],
|
|
"post_tags": ["</mark>"],
|
|
"encoder": "html",
|
|
"max_analyzed_offset": 900000,
|
|
},
|
|
}
|
|
if query_body.get("match_all") is None:
|
|
body["highlight"]["highlight_query"] = copy.deepcopy(query_body)
|
|
|
|
if sort == "newer":
|
|
body["sort"] = [{"date": {"order": "desc"}}]
|
|
elif sort == "older":
|
|
body["sort"] = [{"date": {"order": "asc"}}]
|
|
elif sort == "referenced":
|
|
body["sort"] = [{"referenced_by_count": {"order": "desc", "unmapped_type": "long"}}]
|
|
return body
|
|
|
|
|
|
def build_graph_payload(
|
|
client: "Elasticsearch",
|
|
index: str,
|
|
root_id: str,
|
|
depth: int,
|
|
max_nodes: int,
|
|
*,
|
|
include_external: bool = True,
|
|
) -> Dict[str, Any]:
|
|
root_id = root_id.strip()
|
|
if not root_id:
|
|
return {"nodes": [], "links": [], "root": root_id, "depth": depth, "meta": {}}
|
|
|
|
doc_cache: Dict[str, Optional[Dict[str, Any]]] = {}
|
|
|
|
def fetch_document(video_id: str) -> Optional[Dict[str, Any]]:
|
|
if video_id in doc_cache:
|
|
return doc_cache[video_id]
|
|
try:
|
|
result = client.get(index=index, id=video_id)
|
|
doc_cache[video_id] = result.get("_source")
|
|
except Exception as exc: # pragma: no cover - elasticsearch handles errors
|
|
LOGGER.debug("Graph: failed to load %s: %s", video_id, exc)
|
|
doc_cache[video_id] = None
|
|
doc = doc_cache[video_id]
|
|
if doc is not None and not include_external and doc.get("external_reference"):
|
|
doc_cache[video_id] = None
|
|
return None
|
|
return doc_cache[video_id]
|
|
|
|
nodes: Dict[str, Dict[str, Any]] = {}
|
|
links: List[Dict[str, Any]] = []
|
|
link_seen: Set[Tuple[str, str, str]] = set()
|
|
queue: deque[Tuple[str, int]] = deque([(root_id, 0)])
|
|
queued: Set[str] = {root_id}
|
|
visited: Set[str] = set()
|
|
|
|
while queue and len(nodes) < max_nodes:
|
|
current_id, level = queue.popleft()
|
|
queued.discard(current_id)
|
|
if current_id in visited:
|
|
continue
|
|
doc = fetch_document(current_id)
|
|
if doc is None:
|
|
if current_id == root_id:
|
|
break
|
|
visited.add(current_id)
|
|
continue
|
|
|
|
visited.add(current_id)
|
|
nodes[current_id] = {
|
|
"id": current_id,
|
|
"title": doc.get("title") or current_id,
|
|
"channel_id": doc.get("channel_id"),
|
|
"channel_name": doc.get("channel_name") or doc.get("channel_id") or "Unknown",
|
|
"url": doc.get("url"),
|
|
"date": doc.get("date"),
|
|
"is_root": current_id == root_id,
|
|
"external_reference": bool(doc.get("external_reference")),
|
|
}
|
|
|
|
if level >= depth:
|
|
continue
|
|
|
|
neighbor_ids: List[str] = []
|
|
|
|
for ref_id in normalize_reference_list(doc.get("internal_references")):
|
|
if ref_id == current_id:
|
|
continue
|
|
key = (current_id, ref_id, "references")
|
|
if key not in link_seen:
|
|
links.append(
|
|
{"source": current_id, "target": ref_id, "relation": "references"}
|
|
)
|
|
link_seen.add(key)
|
|
neighbor_ids.append(ref_id)
|
|
|
|
for ref_id in normalize_reference_list(doc.get("referenced_by")):
|
|
if ref_id == current_id:
|
|
continue
|
|
key = (ref_id, current_id, "referenced_by")
|
|
if key not in link_seen:
|
|
links.append(
|
|
{"source": ref_id, "target": current_id, "relation": "referenced_by"}
|
|
)
|
|
link_seen.add(key)
|
|
neighbor_ids.append(ref_id)
|
|
|
|
for neighbor in neighbor_ids:
|
|
if neighbor in visited or neighbor in queued:
|
|
continue
|
|
if len(nodes) + len(queue) >= max_nodes:
|
|
break
|
|
queue.append((neighbor, level + 1))
|
|
queued.add(neighbor)
|
|
|
|
# Ensure nodes referenced by links exist in the payload.
|
|
for link in links:
|
|
for key in ("source", "target"):
|
|
node_id = link[key]
|
|
if node_id in nodes:
|
|
continue
|
|
doc = fetch_document(node_id)
|
|
if doc is None:
|
|
if include_external:
|
|
nodes[node_id] = {
|
|
"id": node_id,
|
|
"title": node_id,
|
|
"channel_id": None,
|
|
"channel_name": "Unknown",
|
|
"url": None,
|
|
"date": None,
|
|
"is_root": node_id == root_id,
|
|
"external_reference": False,
|
|
}
|
|
continue
|
|
nodes[node_id] = {
|
|
"id": node_id,
|
|
"title": doc.get("title") or node_id,
|
|
"channel_id": doc.get("channel_id"),
|
|
"channel_name": doc.get("channel_name") or doc.get("channel_id") or "Unknown",
|
|
"url": doc.get("url"),
|
|
"date": doc.get("date"),
|
|
"is_root": node_id == root_id,
|
|
"external_reference": bool(doc.get("external_reference")),
|
|
}
|
|
|
|
links = [
|
|
link
|
|
for link in links
|
|
if link.get("source") in nodes and link.get("target") in nodes
|
|
]
|
|
|
|
return {
|
|
"root": root_id,
|
|
"depth": depth,
|
|
"nodes": list(nodes.values()),
|
|
"links": links,
|
|
"meta": {
|
|
"node_count": len(nodes),
|
|
"link_count": len(links),
|
|
},
|
|
}
|
|
|
|
|
|
def build_full_graph_payload(
|
|
client: "Elasticsearch",
|
|
index: str,
|
|
max_nodes: Optional[int],
|
|
*,
|
|
highlight_id: Optional[str] = None,
|
|
include_external: bool = True,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Attempt to render the entire reference graph by gathering every video that
|
|
references another (or is referenced).
|
|
"""
|
|
|
|
query = {
|
|
"bool": {
|
|
"should": [
|
|
{"range": {"internal_references_count": {"gt": 0}}},
|
|
{"range": {"referenced_by_count": {"gt": 0}}},
|
|
{"exists": {"field": "internal_references"}},
|
|
{"exists": {"field": "referenced_by"}},
|
|
],
|
|
"minimum_should_match": 1,
|
|
}
|
|
}
|
|
source_fields = [
|
|
"video_id",
|
|
"title",
|
|
"channel_id",
|
|
"channel_name",
|
|
"url",
|
|
"date",
|
|
"internal_references",
|
|
"referenced_by",
|
|
]
|
|
nodes: Dict[str, Dict[str, Any]] = {}
|
|
links: List[Dict[str, Any]] = []
|
|
link_seen: Set[Tuple[str, str, str]] = set()
|
|
node_limit = max_nodes if max_nodes and max_nodes > 0 else None
|
|
batch_size = 500 if node_limit is None else min(500, max(50, node_limit * 2))
|
|
truncated = False
|
|
|
|
def ensure_node(node_id: Optional[str], doc: Optional[Dict[str, Any]] = None) -> bool:
|
|
if not node_id:
|
|
return False
|
|
if node_id in nodes:
|
|
if doc:
|
|
if not include_external and doc.get("external_reference"):
|
|
nodes.pop(node_id, None)
|
|
return False
|
|
existing = nodes[node_id]
|
|
if (not existing.get("title") or existing["title"] == node_id) and doc.get("title"):
|
|
existing["title"] = doc["title"]
|
|
if not existing.get("channel_id") and doc.get("channel_id"):
|
|
existing["channel_id"] = doc.get("channel_id")
|
|
if (
|
|
existing.get("channel_name") in {"Unknown", node_id, None}
|
|
and (doc.get("channel_name") or doc.get("channel_id"))
|
|
):
|
|
existing["channel_name"] = doc.get("channel_name") or doc.get("channel_id")
|
|
if not existing.get("url") and doc.get("url"):
|
|
existing["url"] = doc.get("url")
|
|
if not existing.get("date") and doc.get("date"):
|
|
existing["date"] = doc.get("date")
|
|
return True
|
|
if node_limit is not None and len(nodes) >= node_limit:
|
|
return False
|
|
channel_name = None
|
|
channel_id = None
|
|
url = None
|
|
date_val = None
|
|
title = node_id
|
|
if doc:
|
|
if not include_external and doc.get("external_reference"):
|
|
return False
|
|
title = doc.get("title") or title
|
|
channel_id = doc.get("channel_id")
|
|
channel_name = doc.get("channel_name") or channel_id
|
|
url = doc.get("url")
|
|
date_val = doc.get("date")
|
|
nodes[node_id] = {
|
|
"id": node_id,
|
|
"title": title,
|
|
"channel_id": channel_id,
|
|
"channel_name": channel_name or "Unknown",
|
|
"url": url,
|
|
"date": date_val,
|
|
"is_root": False,
|
|
"external_reference": bool(doc.get("external_reference")) if doc else False,
|
|
}
|
|
return True
|
|
|
|
scroll_id: Optional[str] = None
|
|
try:
|
|
body = {"query": query, "_source": source_fields, "sort": ["_doc"]}
|
|
response = client.search(index=index, body=body, size=batch_size, scroll="1m")
|
|
scroll_id = response.get("_scroll_id")
|
|
stop_fetch = False
|
|
while not stop_fetch:
|
|
hits = response.get("hits", {}).get("hits", [])
|
|
if not hits:
|
|
break
|
|
for hit in hits:
|
|
if node_limit is not None and len(nodes) >= node_limit:
|
|
truncated = True
|
|
stop_fetch = True
|
|
break
|
|
source = hit.get("_source", {}) or {}
|
|
video_id = source.get("video_id")
|
|
if not video_id:
|
|
continue
|
|
if not include_external and source.get("external_reference"):
|
|
nodes.pop(video_id, None)
|
|
continue
|
|
if not ensure_node(video_id, source):
|
|
continue
|
|
for target in normalize_reference_list(source.get("internal_references")):
|
|
if target == video_id:
|
|
continue
|
|
if not ensure_node(target):
|
|
continue
|
|
key = (video_id, target, "references")
|
|
if key not in link_seen:
|
|
links.append(
|
|
{"source": video_id, "target": target, "relation": "references"}
|
|
)
|
|
link_seen.add(key)
|
|
for origin in normalize_reference_list(source.get("referenced_by")):
|
|
if origin == video_id:
|
|
continue
|
|
if not ensure_node(origin):
|
|
continue
|
|
key = (origin, video_id, "referenced_by")
|
|
if key not in link_seen:
|
|
links.append(
|
|
{"source": origin, "target": video_id, "relation": "referenced_by"}
|
|
)
|
|
link_seen.add(key)
|
|
if stop_fetch or not scroll_id:
|
|
break
|
|
response = client.scroll(scroll_id=scroll_id, scroll="1m")
|
|
scroll_id = response.get("_scroll_id")
|
|
if not scroll_id:
|
|
break
|
|
finally:
|
|
if scroll_id:
|
|
try:
|
|
client.clear_scroll(scroll_id=scroll_id)
|
|
except Exception:
|
|
pass
|
|
|
|
if highlight_id and highlight_id in nodes:
|
|
nodes[highlight_id]["is_root"] = True
|
|
|
|
links = [
|
|
link
|
|
for link in links
|
|
if link.get("source") in nodes and link.get("target") in nodes
|
|
]
|
|
|
|
return {
|
|
"root": highlight_id or "",
|
|
"depth": 0,
|
|
"nodes": list(nodes.values()),
|
|
"links": links,
|
|
"meta": {
|
|
"node_count": len(nodes),
|
|
"link_count": len(links),
|
|
"mode": "full",
|
|
"truncated": truncated,
|
|
},
|
|
}
|
|
|
|
|
|
def create_app(config: AppConfig = CONFIG) -> Flask:
|
|
app = Flask(__name__, static_folder=str(Path(__file__).parent / "static"))
|
|
app.config['MAX_CONTENT_LENGTH'] = 1 * 1024 * 1024
|
|
|
|
@app.after_request
|
|
def add_security_headers(response):
|
|
response.headers['X-Frame-Options'] = 'DENY'
|
|
response.headers['X-Content-Type-Options'] = 'nosniff'
|
|
response.headers['Permissions-Policy'] = 'geolocation=(), microphone=(), camera=()'
|
|
response.headers['Content-Security-Policy'] = (
|
|
"default-src 'self'; "
|
|
"script-src 'self' https://cdn.jsdelivr.net https://unpkg.com; "
|
|
"style-src 'self' 'unsafe-inline' https://unpkg.com; "
|
|
"img-src 'self' data: https:; "
|
|
"font-src 'self' https://unpkg.com; "
|
|
"connect-src 'self'"
|
|
)
|
|
return response
|
|
|
|
client = _ensure_client(config)
|
|
index = config.elastic.index
|
|
qdrant_url = config.qdrant_url
|
|
qdrant_collection = config.qdrant_collection
|
|
qdrant_vector_name = config.qdrant_vector_name
|
|
qdrant_vector_size = config.qdrant_vector_size
|
|
qdrant_embed_model = config.qdrant_embed_model
|
|
|
|
@app.route("/")
|
|
def index_page():
|
|
return send_from_directory(app.static_folder, "index.html")
|
|
|
|
@app.route("/graph")
|
|
def graph_page():
|
|
return send_from_directory(app.static_folder, "graph.html")
|
|
|
|
@app.route("/vector-search")
|
|
def vector_search_page():
|
|
return send_from_directory(app.static_folder, "vector.html")
|
|
|
|
@app.route("/static/<path:filename>")
|
|
def static_files(filename: str):
|
|
return send_from_directory(app.static_folder, filename)
|
|
|
|
def normalize_reference_list(values: Any) -> List[str]:
|
|
if values is None:
|
|
return []
|
|
if isinstance(values, (list, tuple, set)):
|
|
iterable = values
|
|
else:
|
|
iterable = [values]
|
|
normalized: List[str] = []
|
|
for item in iterable:
|
|
candidate: Optional[str]
|
|
if isinstance(item, dict):
|
|
candidate = item.get("video_id") or item.get("id") # type: ignore[assignment]
|
|
else:
|
|
candidate = item # type: ignore[assignment]
|
|
if candidate is None:
|
|
continue
|
|
text = str(candidate).strip()
|
|
if not text:
|
|
continue
|
|
if text.lower() in {"none", "null"}:
|
|
continue
|
|
normalized.append(text)
|
|
return normalized
|
|
|
|
|
|
@app.route("/api/channels")
|
|
def channels():
|
|
include_external = request.args.get("external", default="0", type=str)
|
|
include_external = include_external.lower() not in {"0", "false", "no"}
|
|
base_channels_body = {
|
|
"size": 0,
|
|
"aggs": {
|
|
"channels": {
|
|
"terms": {"field": "channel_id", "size": 200},
|
|
"aggs": {
|
|
"name": {
|
|
"top_hits": {
|
|
"size": 1,
|
|
"_source": {"includes": ["channel_name"]},
|
|
"sort": [
|
|
{
|
|
"channel_name.keyword": {
|
|
"order": "asc",
|
|
"missing": "_last",
|
|
"unmapped_type": "keyword",
|
|
}
|
|
}
|
|
],
|
|
}
|
|
}
|
|
},
|
|
}
|
|
},
|
|
}
|
|
if not include_external:
|
|
base_channels_body["query"] = {
|
|
"bool": {"must_not": [{"term": {"external_reference": True}}]}
|
|
}
|
|
|
|
def run_channels_request(field_name: str):
|
|
body = json.loads(json.dumps(base_channels_body)) # deep copy
|
|
body["aggs"]["channels"]["terms"]["field"] = field_name
|
|
if config.elastic.debug:
|
|
LOGGER.info(
|
|
"Elasticsearch channels request: %s",
|
|
json.dumps({"index": index, "body": body}, indent=2),
|
|
)
|
|
return client.search(index=index, body=body)
|
|
|
|
response = None
|
|
last_error = None
|
|
for candidate_field in ("channel_id.keyword", "channel_id"):
|
|
try:
|
|
response = run_channels_request(candidate_field)
|
|
if config.elastic.debug:
|
|
LOGGER.info("Channels aggregation used field: %s", candidate_field)
|
|
break
|
|
except BadRequestError as exc:
|
|
last_error = exc
|
|
if config.elastic.debug:
|
|
LOGGER.warning(
|
|
"Channels aggregation failed for field %s: %s",
|
|
candidate_field,
|
|
exc,
|
|
)
|
|
if response is None:
|
|
raise last_error or RuntimeError("Unable to aggregate channels.")
|
|
|
|
if config.elastic.debug:
|
|
LOGGER.info(
|
|
"Elasticsearch channels response: %s",
|
|
json.dumps(response, indent=2, default=str),
|
|
)
|
|
buckets = (
|
|
response.get("aggregations", {})
|
|
.get("channels", {})
|
|
.get("buckets", [])
|
|
)
|
|
data = []
|
|
for bucket in buckets:
|
|
key = bucket.get("key")
|
|
name_hit = (
|
|
bucket.get("name", {})
|
|
.get("hits", {})
|
|
.get("hits", [{}])[0]
|
|
.get("_source", {})
|
|
.get("channel_name")
|
|
)
|
|
display_name = name_hit or key or "Unknown"
|
|
data.append(
|
|
{
|
|
"Id": key,
|
|
"Name": display_name,
|
|
"Count": bucket.get("doc_count", 0),
|
|
}
|
|
)
|
|
data.sort(key=lambda item: item["Name"].lower())
|
|
return jsonify(data)
|
|
|
|
@app.route("/api/graph")
|
|
def graph_api():
|
|
video_id = (request.args.get("video_id") or "").strip()
|
|
full_graph = request.args.get("full_graph", default="0", type=str)
|
|
full_graph = full_graph.lower() in {"1", "true", "yes"}
|
|
if not full_graph and not video_id:
|
|
return jsonify({"error": "video_id is required"}), 400
|
|
include_external = request.args.get("external", default="1", type=str)
|
|
include_external = include_external.lower() not in {"0", "false", "no"}
|
|
|
|
try:
|
|
depth = int(request.args.get("depth", "1"))
|
|
except ValueError:
|
|
depth = 1
|
|
depth = max(0, min(depth, 3))
|
|
|
|
if full_graph:
|
|
max_nodes = None
|
|
else:
|
|
try:
|
|
max_nodes = int(request.args.get("max_nodes", "200"))
|
|
except ValueError:
|
|
max_nodes = 200
|
|
max_nodes = max(10, min(max_nodes, 400))
|
|
|
|
if full_graph:
|
|
payload = build_full_graph_payload(
|
|
client,
|
|
index,
|
|
None,
|
|
highlight_id=video_id or None,
|
|
include_external=include_external,
|
|
)
|
|
else:
|
|
payload = build_graph_payload(
|
|
client,
|
|
index,
|
|
video_id,
|
|
depth,
|
|
max_nodes,
|
|
include_external=include_external,
|
|
)
|
|
if not payload["nodes"]:
|
|
return (
|
|
jsonify({"error": f"Video '{video_id}' was not found in the index."}),
|
|
404,
|
|
)
|
|
payload["meta"]["max_nodes"] = (
|
|
len(payload["nodes"]) if full_graph else max_nodes
|
|
)
|
|
return jsonify(payload)
|
|
|
|
@app.route("/api/years")
|
|
def years():
|
|
body = {
|
|
"size": 0,
|
|
"aggs": {
|
|
"years": {
|
|
"date_histogram": {
|
|
"field": "date",
|
|
"calendar_interval": "year",
|
|
"format": "yyyy",
|
|
"order": {"_key": "desc"}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if config.elastic.debug:
|
|
LOGGER.info(
|
|
"Elasticsearch years request: %s",
|
|
json.dumps({"index": index, "body": body}, indent=2),
|
|
)
|
|
|
|
response = client.search(index=index, body=body)
|
|
|
|
if config.elastic.debug:
|
|
LOGGER.info(
|
|
"Elasticsearch years response: %s",
|
|
json.dumps(response, indent=2, default=str),
|
|
)
|
|
|
|
buckets = (
|
|
response.get("aggregations", {})
|
|
.get("years", {})
|
|
.get("buckets", [])
|
|
)
|
|
|
|
data = [
|
|
{
|
|
"Year": bucket.get("key_as_string"),
|
|
"Count": bucket.get("doc_count", 0),
|
|
}
|
|
for bucket in buckets
|
|
if bucket.get("doc_count", 0) > 0
|
|
]
|
|
|
|
return jsonify(data)
|
|
|
|
@app.route("/api/search")
|
|
def search():
|
|
query = request.args.get("q", "", type=str)
|
|
raw_channels: List[Optional[str]] = request.args.getlist("channel_id")
|
|
legacy_channel = request.args.get("channel", type=str)
|
|
if legacy_channel:
|
|
raw_channels.append(legacy_channel)
|
|
channels = parse_channel_params(raw_channels)
|
|
year = request.args.get("year", "", type=str) or None
|
|
sort = request.args.get("sort", "relevant", type=str)
|
|
page = max(request.args.get("page", 0, type=int), 0)
|
|
size = max(request.args.get("size", 10, type=int), 1)
|
|
|
|
def parse_flag(name: str, default: bool = True) -> bool:
|
|
value = request.args.get(name)
|
|
if value is None:
|
|
return default
|
|
return value.lower() not in {"0", "false", "no"}
|
|
|
|
use_exact = parse_flag("exact", True)
|
|
use_fuzzy = parse_flag("fuzzy", True)
|
|
use_phrase = parse_flag("phrase", True)
|
|
use_query_string = parse_flag("query_string", False)
|
|
include_external = parse_flag("external", False)
|
|
if use_query_string:
|
|
use_exact = use_fuzzy = use_phrase = False
|
|
|
|
payload = build_query_payload(
|
|
query,
|
|
channels=channels,
|
|
year=year,
|
|
sort=sort,
|
|
use_exact=use_exact,
|
|
use_fuzzy=use_fuzzy,
|
|
use_phrase=use_phrase,
|
|
use_query_string=use_query_string,
|
|
include_external=include_external,
|
|
)
|
|
start = page * size
|
|
if config.elastic.debug:
|
|
LOGGER.info(
|
|
"Elasticsearch search request: %s",
|
|
json.dumps(
|
|
{
|
|
"index": index,
|
|
"from": start,
|
|
"size": size,
|
|
"body": payload,
|
|
"channels": channels,
|
|
"toggles": {
|
|
"exact": use_exact,
|
|
"fuzzy": use_fuzzy,
|
|
"phrase": use_phrase,
|
|
},
|
|
},
|
|
indent=2,
|
|
),
|
|
)
|
|
response = client.search(
|
|
index=index,
|
|
from_=start,
|
|
size=size,
|
|
body=payload,
|
|
)
|
|
if config.elastic.debug:
|
|
LOGGER.info(
|
|
"Elasticsearch search response: %s",
|
|
json.dumps(response, indent=2, default=str),
|
|
)
|
|
|
|
hits = response.get("hits", {})
|
|
total = hits.get("total", {}).get("value", 0)
|
|
documents = []
|
|
for hit in hits.get("hits", []):
|
|
source = hit.get("_source", {})
|
|
highlight_map = hit.get("highlight", {})
|
|
transcript_highlight = [
|
|
{"html": value, "source": "primary"}
|
|
for value in (highlight_map.get("transcript_full", []) or [])
|
|
] + [
|
|
{"html": value, "source": "secondary"}
|
|
for value in (highlight_map.get("transcript_secondary_full", []) or [])
|
|
]
|
|
|
|
title_html = (
|
|
highlight_map.get("title")
|
|
or [source.get("title") or "Untitled"]
|
|
)[0]
|
|
description_html = (
|
|
highlight_map.get("description")
|
|
or [source.get("description") or ""]
|
|
)[0]
|
|
documents.append(
|
|
{
|
|
"video_id": source.get("video_id"),
|
|
"channel_id": source.get("channel_id"),
|
|
"channel_name": source.get("channel_name"),
|
|
"title": source.get("title"),
|
|
"titleHtml": title_html,
|
|
"description": source.get("description"),
|
|
"descriptionHtml": description_html,
|
|
"date": source.get("date"),
|
|
"duration": source.get("duration"),
|
|
"url": source.get("url"),
|
|
"toHighlight": transcript_highlight,
|
|
"highlightSource": {
|
|
"primary": bool(highlight_map.get("transcript_full")),
|
|
"secondary": bool(highlight_map.get("transcript_secondary_full")),
|
|
},
|
|
"internal_references_count": source.get("internal_references_count", 0),
|
|
"internal_references": source.get("internal_references", []),
|
|
"referenced_by_count": source.get("referenced_by_count", 0),
|
|
"referenced_by": source.get("referenced_by", []),
|
|
"video_status": source.get("video_status"),
|
|
"external_reference": source.get("external_reference"),
|
|
}
|
|
)
|
|
|
|
return jsonify(
|
|
{
|
|
"items": documents,
|
|
"totalResults": total,
|
|
"totalPages": (total + size - 1) // size,
|
|
"currentPage": page,
|
|
}
|
|
)
|
|
|
|
@app.route("/api/metrics")
|
|
def metrics():
|
|
include_external = request.args.get("external", default="1", type=str)
|
|
include_external = include_external.lower() not in {"0", "false", "no"}
|
|
try:
|
|
data = elastic_metrics_payload(
|
|
client,
|
|
index,
|
|
channel_field_candidates=["channel_id.keyword", "channel_id"],
|
|
debug=config.elastic.debug,
|
|
include_external=include_external,
|
|
)
|
|
except Exception:
|
|
LOGGER.exception(
|
|
"Falling back to local metrics payload due to Elasticsearch error.",
|
|
exc_info=True,
|
|
)
|
|
data = metrics_payload(config.data.root, include_external=include_external)
|
|
return jsonify(data)
|
|
|
|
@app.route("/api/frequency")
|
|
def frequency():
|
|
raw_term = request.args.get("term", type=str) or ""
|
|
use_query_string = request.args.get("query_string", default="0", type=str)
|
|
use_query_string = (use_query_string or "").lower() in {"1", "true", "yes"}
|
|
term = raw_term.strip()
|
|
if not term and not use_query_string:
|
|
return ("term parameter is required", 400)
|
|
if use_query_string and not term:
|
|
term = "*"
|
|
|
|
raw_channels: List[Optional[str]] = request.args.getlist("channel_id")
|
|
legacy_channel = request.args.get("channel", type=str)
|
|
if legacy_channel:
|
|
raw_channels.append(legacy_channel)
|
|
channels = parse_channel_params(raw_channels)
|
|
year = request.args.get("year", "", type=str) or None
|
|
interval = (request.args.get("interval", "month") or "month").lower()
|
|
allowed_intervals = {"day", "week", "month", "quarter", "year"}
|
|
if interval not in allowed_intervals:
|
|
interval = "month"
|
|
start = request.args.get("start", type=str)
|
|
end = request.args.get("end", type=str)
|
|
|
|
def parse_flag(name: str, default: bool = True) -> bool:
|
|
value = request.args.get(name)
|
|
if value is None:
|
|
return default
|
|
lowered = value.lower()
|
|
return lowered not in {"0", "false", "no"}
|
|
|
|
use_exact = parse_flag("exact", True)
|
|
use_fuzzy = parse_flag("fuzzy", True)
|
|
use_phrase = parse_flag("phrase", True)
|
|
include_external = parse_flag("external", False)
|
|
if use_query_string:
|
|
use_exact = use_fuzzy = use_phrase = False
|
|
|
|
search_payload = build_query_payload(
|
|
term,
|
|
channels=channels,
|
|
year=year,
|
|
sort="relevant",
|
|
use_exact=use_exact,
|
|
use_fuzzy=use_fuzzy,
|
|
use_phrase=use_phrase,
|
|
use_query_string=use_query_string,
|
|
include_external=include_external,
|
|
)
|
|
query = search_payload.get("query", {"match_all": {}})
|
|
|
|
if start or end:
|
|
range_filter: Dict[str, Dict[str, Dict[str, str]]] = {"range": {"date": {}}}
|
|
if start:
|
|
range_filter["range"]["date"]["gte"] = start
|
|
if end:
|
|
range_filter["range"]["date"]["lte"] = end
|
|
if "bool" in query:
|
|
bool_clause = query.setdefault("bool", {})
|
|
existing_filter = bool_clause.get("filter")
|
|
if existing_filter is None:
|
|
bool_clause["filter"] = [range_filter]
|
|
elif isinstance(existing_filter, list):
|
|
bool_clause["filter"].append(range_filter)
|
|
else:
|
|
bool_clause["filter"] = [existing_filter, range_filter]
|
|
elif query.get("match_all") is not None:
|
|
query = {"bool": {"filter": [range_filter]}}
|
|
else:
|
|
query = {"bool": {"must": [query], "filter": [range_filter]}}
|
|
|
|
histogram: Dict[str, Any] = {
|
|
"field": "date",
|
|
"calendar_interval": interval,
|
|
"min_doc_count": 0,
|
|
}
|
|
if start or end:
|
|
bounds: Dict[str, str] = {}
|
|
if start:
|
|
bounds["min"] = start
|
|
if end:
|
|
bounds["max"] = end
|
|
if bounds:
|
|
histogram["extended_bounds"] = bounds
|
|
|
|
channel_terms_size = max(6, len(channels)) if channels else 6
|
|
|
|
body = {
|
|
"size": 0,
|
|
"query": query,
|
|
"aggs": {
|
|
"over_time": {
|
|
"date_histogram": histogram,
|
|
"aggs": {
|
|
"by_channel": {
|
|
"terms": {
|
|
"field": "channel_id.keyword",
|
|
"size": channel_terms_size,
|
|
"order": {"_count": "desc"},
|
|
},
|
|
"aggs": {
|
|
"channel_name_hit": {
|
|
"top_hits": {
|
|
"size": 1,
|
|
"_source": {"includes": ["channel_name"]},
|
|
"sort": [
|
|
{
|
|
"channel_name.keyword": {
|
|
"order": "asc",
|
|
"missing": "_last",
|
|
"unmapped_type": "keyword",
|
|
}
|
|
}
|
|
],
|
|
}
|
|
}
|
|
},
|
|
}
|
|
},
|
|
}
|
|
},
|
|
}
|
|
|
|
if config.elastic.debug:
|
|
LOGGER.info(
|
|
"Elasticsearch frequency request: %s",
|
|
json.dumps(
|
|
{
|
|
"index": index,
|
|
"body": body,
|
|
"term": term,
|
|
"interval": interval,
|
|
"channels": channels,
|
|
"start": start,
|
|
"end": end,
|
|
"query_string": use_query_string,
|
|
},
|
|
indent=2,
|
|
),
|
|
)
|
|
|
|
response = client.search(index=index, body=body)
|
|
|
|
if config.elastic.debug:
|
|
LOGGER.info(
|
|
"Elasticsearch frequency response: %s",
|
|
json.dumps(response, indent=2, default=str),
|
|
)
|
|
|
|
raw_buckets = (
|
|
response.get("aggregations", {})
|
|
.get("over_time", {})
|
|
.get("buckets", [])
|
|
)
|
|
|
|
channel_totals: Dict[str, Dict[str, Any]] = {}
|
|
buckets: List[Dict[str, Any]] = []
|
|
for bucket in raw_buckets:
|
|
date_str = bucket.get("key_as_string")
|
|
total = bucket.get("doc_count", 0)
|
|
channel_entries: List[Dict[str, Any]] = []
|
|
for ch_bucket in bucket.get("by_channel", {}).get("buckets", []):
|
|
cid = ch_bucket.get("key")
|
|
count = ch_bucket.get("doc_count", 0)
|
|
if cid:
|
|
hit_source = (
|
|
ch_bucket.get("channel_name_hit", {})
|
|
.get("hits", {})
|
|
.get("hits", [{}])[0]
|
|
.get("_source", {})
|
|
)
|
|
channel_name = hit_source.get("channel_name") if isinstance(hit_source, dict) else None
|
|
channel_entries.append({"id": cid, "count": count, "name": channel_name})
|
|
if cid not in channel_totals:
|
|
channel_totals[cid] = {"total": 0, "name": channel_name}
|
|
channel_totals[cid]["total"] += count
|
|
if channel_name and not channel_totals[cid].get("name"):
|
|
channel_totals[cid]["name"] = channel_name
|
|
buckets.append(
|
|
{"date": date_str, "total": total, "channels": channel_entries}
|
|
)
|
|
|
|
ranked_channels = sorted(
|
|
[
|
|
{"id": cid, "total": info.get("total", 0), "name": info.get("name")}
|
|
for cid, info in channel_totals.items()
|
|
],
|
|
key=lambda item: item["total"],
|
|
reverse=True,
|
|
)
|
|
|
|
payload = {
|
|
"term": raw_term if not use_query_string else term,
|
|
"interval": interval,
|
|
"buckets": buckets,
|
|
"channels": ranked_channels,
|
|
"totalResults": response.get("hits", {})
|
|
.get("total", {})
|
|
.get("value", 0),
|
|
}
|
|
return jsonify(payload)
|
|
|
|
@app.route("/frequency")
|
|
def frequency_page():
|
|
return send_from_directory(app.static_folder, "frequency.html")
|
|
|
|
@app.route("/api/vector-search", methods=["POST"])
|
|
def api_vector_search():
|
|
payload = request.get_json(silent=True) or {}
|
|
query_text = (payload.get("query") or "").strip()
|
|
filters = validate_qdrant_filter(payload.get("filters"))
|
|
limit = min(max(int(payload.get("size", 10)), 1), MAX_QUERY_SIZE)
|
|
offset = min(max(int(payload.get("offset", 0)), 0), MAX_OFFSET)
|
|
|
|
if not query_text:
|
|
return jsonify(
|
|
{"items": [], "totalResults": 0, "offset": offset, "error": "empty_query"}
|
|
)
|
|
|
|
try:
|
|
query_vector = embed_query(
|
|
query_text, model_name=qdrant_embed_model, expected_dim=qdrant_vector_size
|
|
)
|
|
except Exception as exc: # pragma: no cover - runtime dependency
|
|
LOGGER.error("Embedding failed: %s", exc, exc_info=config.elastic.debug)
|
|
return jsonify({"error": "embedding_unavailable"}), 500
|
|
|
|
qdrant_vector_payload: Any
|
|
if qdrant_vector_name:
|
|
qdrant_vector_payload = {qdrant_vector_name: query_vector}
|
|
else:
|
|
qdrant_vector_payload = query_vector
|
|
|
|
qdrant_body: Dict[str, Any] = {
|
|
"vector": qdrant_vector_payload,
|
|
"limit": limit,
|
|
"offset": offset,
|
|
"with_payload": True,
|
|
"with_vectors": False,
|
|
}
|
|
if filters:
|
|
qdrant_body["filter"] = filters
|
|
|
|
try:
|
|
response = requests.post(
|
|
f"{qdrant_url}/collections/{qdrant_collection}/points/search",
|
|
json=qdrant_body,
|
|
timeout=20,
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
except Exception as exc:
|
|
LOGGER.error("Vector search failed: %s", exc, exc_info=config.elastic.debug)
|
|
return jsonify({"error": "vector_search_unavailable"}), 502
|
|
|
|
points = data.get("result", []) if isinstance(data, dict) else []
|
|
items: List[Dict[str, Any]] = []
|
|
missing_channel_ids: Set[str] = set()
|
|
for point in points:
|
|
payload = point.get("payload", {}) or {}
|
|
raw_highlights = payload.get("highlights") or []
|
|
highlight_entries: List[Dict[str, str]] = []
|
|
for entry in raw_highlights:
|
|
if isinstance(entry, dict):
|
|
html_value = entry.get("html") or entry.get("text")
|
|
else:
|
|
html_value = str(entry)
|
|
if not html_value:
|
|
continue
|
|
highlight_entries.append({"html": html_value, "source": "primary"})
|
|
|
|
channel_label = (
|
|
payload.get("channel_name")
|
|
or payload.get("channel_title")
|
|
or payload.get("channel_id")
|
|
)
|
|
items.append(
|
|
{
|
|
"video_id": payload.get("video_id"),
|
|
"channel_id": payload.get("channel_id"),
|
|
"channel_name": channel_label,
|
|
"title": payload.get("title"),
|
|
"titleHtml": payload.get("title"),
|
|
"description": payload.get("description"),
|
|
"descriptionHtml": payload.get("description"),
|
|
"date": payload.get("date"),
|
|
"url": payload.get("url"),
|
|
"chunkText": payload.get("text")
|
|
or payload.get("chunk_text")
|
|
or payload.get("chunk")
|
|
or payload.get("content"),
|
|
"chunkTimestamp": payload.get("timestamp")
|
|
or payload.get("start_seconds")
|
|
or payload.get("start"),
|
|
"toHighlight": highlight_entries,
|
|
"highlightSource": {
|
|
"primary": bool(highlight_entries),
|
|
"secondary": False,
|
|
},
|
|
"distance": point.get("score"),
|
|
"internal_references_count": payload.get("internal_references_count", 0),
|
|
"internal_references": payload.get("internal_references", []),
|
|
"referenced_by_count": payload.get("referenced_by_count", 0),
|
|
"referenced_by": payload.get("referenced_by", []),
|
|
"video_status": payload.get("video_status"),
|
|
"duration": payload.get("duration"),
|
|
}
|
|
)
|
|
if (not channel_label) and payload.get("channel_id"):
|
|
missing_channel_ids.add(str(payload.get("channel_id")))
|
|
|
|
if missing_channel_ids:
|
|
try:
|
|
es_lookup = client.search(
|
|
index=index,
|
|
body={
|
|
"size": len(missing_channel_ids) * 2,
|
|
"_source": ["channel_id", "channel_name"],
|
|
"query": {"terms": {"channel_id.keyword": list(missing_channel_ids)}},
|
|
},
|
|
)
|
|
hits = es_lookup.get("hits", {}).get("hits", [])
|
|
channel_lookup = {}
|
|
for hit in hits:
|
|
src = hit.get("_source", {}) or {}
|
|
cid = src.get("channel_id")
|
|
cname = src.get("channel_name")
|
|
if cid and cname and cid not in channel_lookup:
|
|
channel_lookup[cid] = cname
|
|
for item in items:
|
|
if not item.get("channel_name"):
|
|
cid = item.get("channel_id")
|
|
if cid and cid in channel_lookup:
|
|
item["channel_name"] = channel_lookup[cid]
|
|
except Exception as exc:
|
|
LOGGER.debug("Vector channel lookup failed: %s", exc)
|
|
|
|
return jsonify(
|
|
{
|
|
"items": items,
|
|
"totalResults": len(items),
|
|
"offset": offset,
|
|
}
|
|
)
|
|
|
|
@app.route("/api/transcript")
|
|
def transcript():
|
|
video_id = request.args.get("video_id", type=str)
|
|
if not video_id:
|
|
return ("video_id not set", 400)
|
|
response = client.get(index=index, id=video_id, ignore=[404])
|
|
if config.elastic.debug:
|
|
LOGGER.info(
|
|
"Elasticsearch transcript request: index=%s id=%s", index, video_id
|
|
)
|
|
LOGGER.info(
|
|
"Elasticsearch transcript response: %s",
|
|
json.dumps(response, indent=2, default=str)
|
|
if response
|
|
else "None",
|
|
)
|
|
if not response or not response.get("found"):
|
|
return ("not found", 404)
|
|
source = response["_source"]
|
|
return jsonify(
|
|
{
|
|
"video_id": source.get("video_id"),
|
|
"title": source.get("title"),
|
|
"transcript_parts": source.get("transcript_parts", []),
|
|
"transcript_full": source.get("transcript_full"),
|
|
"transcript_secondary_parts": source.get("transcript_secondary_parts", []),
|
|
"transcript_secondary_full": source.get("transcript_secondary_full"),
|
|
}
|
|
)
|
|
|
|
return app
|
|
|
|
|
|
def main() -> None: # pragma: no cover
|
|
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
|
|
app = create_app()
|
|
debug_mode = os.environ.get("FLASK_DEBUG", "0").lower() in ("1", "true")
|
|
app.run(host="0.0.0.0", port=8080, debug=debug_mode)
|
|
|
|
|
|
if __name__ == "__main__": # pragma: no cover
|
|
main()
|