TLC-Search/search_app.py
2026-01-08 22:53:30 -05:00

1837 lines
64 KiB
Python

"""
Flask application exposing search, graph, and transcript endpoints for TLC.
Routes:
GET / -> static HTML search page.
GET /graph -> static reference graph UI.
GET /api/channels -> channels aggregation.
GET /api/channel-list -> canonical channel list + feed URL.
GET /channels.txt -> raw channel URLs list.
GET /api/search -> Elasticsearch keyword search.
GET /api/graph -> reference graph API.
GET /api/transcript -> transcript JSON payload.
"""
from __future__ import annotations
import copy
import json
import logging
import os
import re
import urllib.parse
import xml.etree.ElementTree as ET
from pathlib import Path
from typing import Any, Deque, Dict, Iterable, List, Optional, Sequence, Set, Tuple
from collections import Counter, defaultdict, deque
from datetime import datetime
from threading import Lock
from time import monotonic
import requests
from flask import Flask, jsonify, request, send_from_directory
from .config import CONFIG, AppConfig
from .channel_config import load_channel_entries
try:
from elasticsearch import Elasticsearch # type: ignore
from elasticsearch import BadRequestError # type: ignore
except ImportError: # pragma: no cover - dependency optional
Elasticsearch = None
BadRequestError = Exception # type: ignore
LOGGER = logging.getLogger(__name__)
# Security constants
MAX_QUERY_SIZE = 100
MAX_OFFSET = 10000
DEFAULT_ELASTIC_TIMEOUT = int(os.environ.get("ELASTIC_TIMEOUT_SECONDS", "30"))
_RATE_LIMIT_BUCKETS: Dict[str, Deque[float]] = defaultdict(deque)
_RATE_LIMIT_LOCK = Lock()
_RSS_AUTHOR_CACHE: Dict[str, Tuple[str, float]] = {}
_RSS_AUTHOR_LOCK = Lock()
_RSS_AUTHOR_TTL_SECONDS = 60 * 60 * 24
_RSS_OEMBED_LIMIT = 12
def _client_rate_key() -> str:
forwarded = request.headers.get("X-Forwarded-For", "")
if forwarded:
return forwarded.split(",")[0].strip()
return request.headers.get("X-Real-IP") or request.remote_addr or "unknown"
def _rate_limited_response(retry_after: int):
response = jsonify({"error": "rate_limited", "retryAfter": retry_after})
response.status_code = 429
response.headers["Retry-After"] = str(retry_after)
return response
def sanitize_query_string(query: str) -> str:
"""
Sanitize user input for Elasticsearch query_string queries.
Removes dangerous field targeting and script injection patterns.
"""
if not query:
return "*"
sanitized = query.strip()
# Remove field targeting patterns like "_id:", "_source:", "script:"
dangerous_field_patterns = [
r'\b_[a-z_]+\s*:', # Internal fields like _id:, _source:
r'\bscript\s*:', # Script injection
]
for pattern in dangerous_field_patterns:
sanitized = re.sub(pattern, '', sanitized, flags=re.IGNORECASE)
# Remove excessive wildcards that could cause ReDoS
sanitized = re.sub(r'\*{2,}', '*', sanitized)
sanitized = re.sub(r'\?{2,}', '?', sanitized)
return sanitized.strip() or "*"
def _ensure_client(config: AppConfig) -> "Elasticsearch":
if Elasticsearch is None:
raise RuntimeError(
"elasticsearch package not installed. "
"Install elasticsearch>=7 to run the Flask search app."
)
kwargs = {}
if config.elastic.api_key:
kwargs["api_key"] = config.elastic.api_key
elif config.elastic.username and config.elastic.password:
kwargs["basic_auth"] = (
config.elastic.username,
config.elastic.password,
)
if config.elastic.ca_cert:
kwargs["ca_certs"] = str(config.elastic.ca_cert)
kwargs["verify_certs"] = config.elastic.verify_certs
return Elasticsearch(config.elastic.url, **kwargs)
def _extract_video_id(url: str) -> Optional[str]:
if not url:
return None
try:
parsed = urllib.parse.urlparse(url.strip())
except Exception:
return None
host = (parsed.netloc or "").lower()
path = parsed.path or ""
if host in {"youtu.be", "www.youtu.be"}:
return path.lstrip("/") or None
if host.endswith("youtube.com"):
if path == "/watch":
params = urllib.parse.parse_qs(parsed.query)
return (params.get("v") or [None])[0]
if path.startswith("/shorts/"):
return path.split("/", 2)[2] if len(path.split("/", 2)) > 2 else None
return None
def _lookup_channel_names(
client: "Elasticsearch",
index: str,
video_ids: Iterable[str],
) -> Dict[str, str]:
ids = [vid for vid in video_ids if vid]
if not ids:
return {}
now = monotonic()
mapping: Dict[str, str] = {}
cached_hits = 0
elastic_hits = 0
remaining = []
with _RSS_AUTHOR_LOCK:
for vid in ids:
cached = _RSS_AUTHOR_CACHE.get(vid)
if cached and (now - cached[1]) < _RSS_AUTHOR_TTL_SECONDS:
mapping[vid] = cached[0]
cached_hits += 1
else:
remaining.append(vid)
if remaining:
try:
response = client.mget(index=index, body={"ids": remaining})
except Exception as exc: # pragma: no cover - elasticsearch handles errors
LOGGER.warning("RSS title lookup failed: %s", exc)
response = {}
for doc in response.get("docs", []):
if not doc.get("found"):
continue
source = doc.get("_source") or {}
name = source.get("channel_name") or source.get("channel_id")
if name:
vid = doc.get("_id", "")
mapping[vid] = str(name)
elastic_hits += 1
with _RSS_AUTHOR_LOCK:
_RSS_AUTHOR_CACHE[vid] = (mapping[vid], now)
missing = [vid for vid in remaining if vid not in mapping]
oembed_hits = 0
oembed_attempts = 0
if missing:
for vid in missing[:_RSS_OEMBED_LIMIT]:
oembed_attempts += 1
video_url = f"https://www.youtube.com/watch?v={vid}"
oembed_url = (
"https://www.youtube.com/oembed?format=json&url="
+ urllib.parse.quote(video_url, safe="")
)
try:
response = requests.get(oembed_url, timeout=10)
if response.status_code != 200:
continue
data = response.json()
except Exception:
continue
author = data.get("author_name")
if not author:
continue
mapping[vid] = str(author)
oembed_hits += 1
with _RSS_AUTHOR_LOCK:
_RSS_AUTHOR_CACHE[vid] = (mapping[vid], now)
missing_count = max(len(ids) - cached_hits - elastic_hits - oembed_hits, 0)
if missing_count or oembed_attempts:
LOGGER.info(
"RSS title lookup: total=%d cached=%d elastic=%d oembed=%d missing=%d",
len(ids),
cached_hits,
elastic_hits,
oembed_hits,
missing_count,
)
else:
LOGGER.debug(
"RSS title lookup: total=%d cached=%d elastic=%d",
len(ids),
cached_hits,
elastic_hits,
)
return mapping
def _rewrite_rss_payload(
content: bytes,
client: "Elasticsearch",
index: str,
feed_name: str,
) -> bytes:
try:
root = ET.fromstring(content)
except ET.ParseError:
LOGGER.warning("RSS rewrite skipped (invalid XML) for %s", feed_name)
return content
channel = root.find("channel")
if channel is None:
LOGGER.warning("RSS rewrite skipped (missing channel) for %s", feed_name)
return content
items = channel.findall("item")
total_items = len(items)
removed_errors = 0
video_ids: Set[str] = set()
for item in list(items):
title_el = item.find("title")
title_text = (title_el.text or "").strip() if title_el is not None else ""
if "Bridge returned error" in title_text:
channel.remove(item)
removed_errors += 1
continue
link_el = item.find("link")
guid_el = item.find("guid")
video_id = _extract_video_id((link_el.text or "") if link_el is not None else "")
if not video_id:
video_id = _extract_video_id((guid_el.text or "") if guid_el is not None else "")
if video_id:
video_ids.add(video_id)
channel_name_map = _lookup_channel_names(client, index, video_ids)
if not channel_name_map:
LOGGER.info(
"RSS rewrite: feed=%s items=%d removed_errors=%d resolved=0",
feed_name,
total_items,
removed_errors,
)
return ET.tostring(root, encoding="utf-8", xml_declaration=True)
prefixed = 0
for item in channel.findall("item"):
title_el = item.find("title")
if title_el is None or not title_el.text:
continue
link_el = item.find("link")
guid_el = item.find("guid")
video_id = _extract_video_id((link_el.text or "") if link_el is not None else "")
if not video_id:
video_id = _extract_video_id((guid_el.text or "") if guid_el is not None else "")
if not video_id:
continue
channel_name = channel_name_map.get(video_id)
if not channel_name:
continue
prefix = f"{channel_name} - "
if title_el.text.startswith(prefix):
continue
title_el.text = f"{channel_name} - {title_el.text}"
prefixed += 1
LOGGER.info(
"RSS rewrite: feed=%s items=%d removed_errors=%d prefixed=%d resolved=%d",
feed_name,
total_items,
removed_errors,
prefixed,
len(channel_name_map),
)
return ET.tostring(root, encoding="utf-8", xml_declaration=True)
def metrics_payload(data_root: Path, include_external: bool = True) -> Dict[str, Any]:
total_items = 0
channel_counter: Counter = Counter()
channel_name_map: Dict[str, str] = {}
year_counter: Counter = Counter()
month_counter: Counter = Counter()
if not data_root.exists():
LOGGER.warning("Data directory %s not found; metrics will be empty.", data_root)
return {
"totalItems": 0,
"totalChannels": 0,
"itemsPerChannel": [],
"yearHistogram": [],
"recentMonths": [],
}
for path in data_root.rglob("*.json"):
try:
with path.open("r", encoding="utf-8") as handle:
doc = json.load(handle)
except Exception:
continue
if not include_external and doc.get("external_reference"):
continue
total_items += 1
channel_id = doc.get("channel_id")
channel_name = doc.get("channel_name") or channel_id
if channel_id:
channel_counter[channel_id] += 1
if channel_name and channel_id not in channel_name_map:
channel_name_map[channel_id] = channel_name
date_value = doc.get("date") or doc.get("published_at")
dt: Optional[datetime] = None
if isinstance(date_value, str):
for fmt in ("%Y-%m-%d", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%SZ"):
try:
dt = datetime.strptime(date_value[: len(fmt)], fmt)
break
except Exception:
continue
elif isinstance(date_value, (int, float)):
try:
dt = datetime.fromtimestamp(date_value)
except Exception:
dt = None
if dt:
year_counter[str(dt.year)] += 1
month_counter[dt.strftime("%Y-%m")] += 1
items_per_channel = [
{
"label": channel_name_map.get(cid, cid),
"count": count,
}
for cid, count in channel_counter.most_common()
]
year_histogram = [
{"bucket": year, "count": year_counter[year]}
for year in sorted(year_counter.keys())
]
recent_months = sorted(month_counter.keys())
recent_months = recent_months[-12:]
recent_months_payload = [
{"bucket": month, "count": month_counter[month]} for month in recent_months
]
return {
"totalItems": total_items,
"totalChannels": len(channel_counter),
"itemsPerChannel": items_per_channel,
"yearHistogram": year_histogram,
"recentMonths": recent_months_payload,
}
def elastic_metrics_payload(
client: "Elasticsearch",
index: str,
*,
channel_field_candidates: Optional[List[str]] = None,
debug: bool = False,
include_external: bool = True,
) -> Dict[str, Any]:
if channel_field_candidates is None:
channel_field_candidates = ["channel_id.keyword", "channel_id"]
base_body: Dict[str, Any] = {
"size": 0,
"track_total_hits": True,
"aggs": {
"channels": {
"terms": {
"field": "channel_id.keyword",
"size": 500,
"order": {"_count": "desc"},
},
"aggs": {
"name": {
"top_hits": {
"size": 1,
"_source": {"includes": ["channel_name"]},
"sort": [
{
"channel_name.keyword": {
"order": "asc",
"missing": "_last",
"unmapped_type": "keyword",
}
}
],
}
}
},
},
"year_histogram": {
"date_histogram": {
"field": "date",
"calendar_interval": "year",
"format": "yyyy",
}
},
"month_histogram": {
"date_histogram": {
"field": "date",
"calendar_interval": "month",
"format": "yyyy-MM",
"order": {"_key": "asc"},
}
},
},
}
if not include_external:
base_body["query"] = {"bool": {"must_not": [{"term": {"external_reference": True}}]}}
last_error: Optional[Exception] = None
response: Optional[Dict[str, Any]] = None
for candidate_field in channel_field_candidates:
body = json.loads(json.dumps(base_body))
body["aggs"]["channels"]["terms"]["field"] = candidate_field
try:
if debug:
LOGGER.info(
"Elasticsearch metrics request: %s",
json.dumps({"index": index, "body": body}, indent=2),
)
response = client.search(index=index, body=body, request_timeout=30)
break
except BadRequestError as exc:
last_error = exc
if debug:
LOGGER.warning(
"Metrics aggregation failed for field %s: %s",
candidate_field,
exc,
)
if response is None:
raise last_error or RuntimeError("Unable to compute metrics from Elasticsearch.")
hits = response.get("hits", {})
total_items = hits.get("total", {}).get("value", 0)
if debug:
LOGGER.info(
"Elasticsearch metrics response: %s",
json.dumps(response, indent=2, default=str),
)
aggregations = response.get("aggregations", {})
channel_buckets = aggregations.get("channels", {}).get("buckets", [])
items_per_channel = []
for bucket in channel_buckets:
key = bucket.get("key")
channel_name = key
top_hits = (
bucket.get("name", {})
.get("hits", {})
.get("hits", [])
)
if top_hits:
channel_name = (
top_hits[0]
.get("_source", {})
.get("channel_name", channel_name)
)
items_per_channel.append(
{"label": channel_name or key, "count": bucket.get("doc_count", 0)}
)
year_buckets = aggregations.get("year_histogram", {}).get("buckets", [])
year_histogram = [
{
"bucket": bucket.get("key_as_string")
or str(bucket.get("key")),
"count": bucket.get("doc_count", 0),
}
for bucket in year_buckets
]
month_buckets = aggregations.get("month_histogram", {}).get("buckets", [])
recent_months_entries = [
{
"bucket": bucket.get("key_as_string")
or str(bucket.get("key")),
"count": bucket.get("doc_count", 0),
"_key": bucket.get("key"),
}
for bucket in month_buckets
]
recent_months_entries.sort(key=lambda item: item.get("_key", 0))
recent_months_payload = [
{"bucket": entry["bucket"], "count": entry["count"]}
for entry in recent_months_entries[-12:]
]
return {
"totalItems": total_items,
"totalChannels": len(items_per_channel),
"itemsPerChannel": items_per_channel,
"yearHistogram": year_histogram,
"recentMonths": recent_months_payload,
}
def parse_channel_params(values: Iterable[Optional[str]]) -> List[str]:
seen: Set[str] = set()
channels: List[str] = []
for value in values:
if not value:
continue
for part in str(value).split(","):
cleaned = part.strip()
if not cleaned or cleaned.lower() == "all":
continue
if cleaned not in seen:
seen.add(cleaned)
channels.append(cleaned)
return channels
def build_year_filter(year: Optional[str]) -> Optional[Dict]:
if not year:
return None
try:
year_int = int(year)
return {
"range": {
"date": {
"gte": f"{year_int}-01-01",
"lt": f"{year_int + 1}-01-01",
"format": "yyyy-MM-dd"
}
}
}
except (ValueError, TypeError):
return None
def build_channel_filter(channels: Optional[Sequence[str]]) -> Optional[Dict]:
if not channels:
return None
per_channel_clauses: List[Dict[str, Any]] = []
for value in channels:
if not value:
continue
per_channel_clauses.append(
{
"bool": {
"should": [
{"term": {"channel_id.keyword": value}},
{"term": {"channel_id": value}},
],
"minimum_should_match": 1,
}
}
)
if not per_channel_clauses:
return None
if len(per_channel_clauses) == 1:
return per_channel_clauses[0]
return {
"bool": {
"should": per_channel_clauses,
"minimum_should_match": 1,
}
}
def build_query_payload(
query: str,
*,
channels: Optional[Sequence[str]] = None,
year: Optional[str] = None,
sort: str = "relevant",
use_exact: bool = True,
use_fuzzy: bool = True,
use_phrase: bool = True,
use_query_string: bool = False,
include_external: bool = True,
) -> Dict:
filters: List[Dict] = []
should: List[Dict] = []
channel_filter = build_channel_filter(channels)
if channel_filter:
filters.append(channel_filter)
year_filter = build_year_filter(year)
if year_filter:
filters.append(year_filter)
if not include_external:
filters.append({"bool": {"must_not": [{"term": {"external_reference": True}}]}})
if use_query_string:
base_fields = ["title^3", "description^2", "transcript_full", "transcript_secondary_full"]
qs_query = sanitize_query_string(query or "")
query_body: Dict[str, Any] = {
"query_string": {
"query": qs_query,
"default_operator": "AND",
"fields": base_fields,
}
}
if filters:
query_body = {"bool": {"must": query_body, "filter": filters}}
body: Dict = {
"query": query_body,
"highlight": {
"fields": {
"transcript_full": {
"fragment_size": 160,
"number_of_fragments": 5,
"fragmenter": "span",
},
"transcript_secondary_full": {
"fragment_size": 160,
"number_of_fragments": 5,
"fragmenter": "span",
},
"title": {"number_of_fragments": 0},
"description": {
"fragment_size": 160,
"number_of_fragments": 1,
},
},
"require_field_match": False,
"pre_tags": ["<mark>"],
"post_tags": ["</mark>"],
"encoder": "html",
"max_analyzed_offset": 900000,
},
}
if sort == "newer":
body["sort"] = [{"date": {"order": "desc"}}]
elif sort == "older":
body["sort"] = [{"date": {"order": "asc"}}]
elif sort == "referenced":
body["sort"] = [{"referenced_by_count": {"order": "desc", "unmapped_type": "long"}}]
return body
if query:
base_fields = ["title^3", "description^2", "transcript_full", "transcript_secondary_full"]
if use_phrase:
should.append(
{
"match_phrase": {
"transcript_full": {
"query": query,
"slop": 2,
"boost": 10.0,
}
}
}
)
should.append(
{
"match_phrase": {
"transcript_secondary_full": {
"query": query,
"slop": 2,
"boost": 10.0,
}
}
}
)
should.append(
{
"match_phrase": {
"title": {
"query": query,
"slop": 0,
"boost": 50.0,
}
}
}
)
if use_fuzzy:
should.append(
{
"multi_match": {
"query": query,
"fields": base_fields,
"type": "best_fields",
"operator": "and",
"fuzziness": "AUTO",
"prefix_length": 1,
"max_expansions": 50,
"boost": 1.5,
}
}
)
if use_exact:
should.append(
{
"multi_match": {
"query": query,
"fields": base_fields,
"type": "best_fields",
"operator": "and",
"boost": 3.0,
}
}
)
if should:
query_body: Dict = {
"bool": {
"should": should,
"minimum_should_match": 1,
}
}
if filters:
query_body["bool"]["filter"] = filters
elif filters:
query_body = {"bool": {"filter": filters}}
else:
query_body = {"match_all": {}}
body: Dict = {
"query": query_body,
"highlight": {
"fields": {
"transcript_full": {
"fragment_size": 160,
"number_of_fragments": 5,
"fragmenter": "span",
},
"transcript_secondary_full": {
"fragment_size": 160,
"number_of_fragments": 5,
"fragmenter": "span",
},
"title": {"number_of_fragments": 0},
"description": {
"fragment_size": 160,
"number_of_fragments": 1,
},
},
"require_field_match": False,
"pre_tags": ["<mark>"],
"post_tags": ["</mark>"],
"encoder": "html",
"max_analyzed_offset": 900000,
},
}
if query_body.get("match_all") is None:
body["highlight"]["highlight_query"] = copy.deepcopy(query_body)
if sort == "newer":
body["sort"] = [{"date": {"order": "desc"}}]
elif sort == "older":
body["sort"] = [{"date": {"order": "asc"}}]
elif sort == "referenced":
body["sort"] = [{"referenced_by_count": {"order": "desc", "unmapped_type": "long"}}]
return body
def build_graph_payload(
client: "Elasticsearch",
index: str,
root_id: str,
depth: int,
max_nodes: int,
*,
include_external: bool = True,
) -> Dict[str, Any]:
root_id = root_id.strip()
if not root_id:
return {"nodes": [], "links": [], "root": root_id, "depth": depth, "meta": {}}
doc_cache: Dict[str, Optional[Dict[str, Any]]] = {}
def fetch_document(video_id: str) -> Optional[Dict[str, Any]]:
if video_id in doc_cache:
return doc_cache[video_id]
try:
result = client.get(index=index, id=video_id)
doc_cache[video_id] = result.get("_source")
except Exception as exc: # pragma: no cover - elasticsearch handles errors
LOGGER.debug("Graph: failed to load %s: %s", video_id, exc)
doc_cache[video_id] = None
doc = doc_cache[video_id]
if doc is not None and not include_external and doc.get("external_reference"):
doc_cache[video_id] = None
return None
return doc_cache[video_id]
nodes: Dict[str, Dict[str, Any]] = {}
links: List[Dict[str, Any]] = []
link_seen: Set[Tuple[str, str, str]] = set()
queue: deque[Tuple[str, int]] = deque([(root_id, 0)])
queued: Set[str] = {root_id}
visited: Set[str] = set()
while queue and len(nodes) < max_nodes:
current_id, level = queue.popleft()
queued.discard(current_id)
if current_id in visited:
continue
doc = fetch_document(current_id)
if doc is None:
if current_id == root_id:
break
visited.add(current_id)
continue
visited.add(current_id)
nodes[current_id] = {
"id": current_id,
"title": doc.get("title") or current_id,
"channel_id": doc.get("channel_id"),
"channel_name": doc.get("channel_name") or doc.get("channel_id") or "Unknown",
"url": doc.get("url"),
"date": doc.get("date"),
"is_root": current_id == root_id,
"external_reference": bool(doc.get("external_reference")),
}
if level >= depth:
continue
neighbor_ids: List[str] = []
for ref_id in normalize_reference_list(doc.get("internal_references")):
if ref_id == current_id:
continue
key = (current_id, ref_id, "references")
if key not in link_seen:
links.append(
{"source": current_id, "target": ref_id, "relation": "references"}
)
link_seen.add(key)
neighbor_ids.append(ref_id)
for ref_id in normalize_reference_list(doc.get("referenced_by")):
if ref_id == current_id:
continue
key = (ref_id, current_id, "referenced_by")
if key not in link_seen:
links.append(
{"source": ref_id, "target": current_id, "relation": "referenced_by"}
)
link_seen.add(key)
neighbor_ids.append(ref_id)
for neighbor in neighbor_ids:
if neighbor in visited or neighbor in queued:
continue
if len(nodes) + len(queue) >= max_nodes:
break
queue.append((neighbor, level + 1))
queued.add(neighbor)
# Ensure nodes referenced by links exist in the payload.
for link in links:
for key in ("source", "target"):
node_id = link[key]
if node_id in nodes:
continue
doc = fetch_document(node_id)
if doc is None:
if include_external:
nodes[node_id] = {
"id": node_id,
"title": node_id,
"channel_id": None,
"channel_name": "Unknown",
"url": None,
"date": None,
"is_root": node_id == root_id,
"external_reference": False,
}
continue
nodes[node_id] = {
"id": node_id,
"title": doc.get("title") or node_id,
"channel_id": doc.get("channel_id"),
"channel_name": doc.get("channel_name") or doc.get("channel_id") or "Unknown",
"url": doc.get("url"),
"date": doc.get("date"),
"is_root": node_id == root_id,
"external_reference": bool(doc.get("external_reference")),
}
links = [
link
for link in links
if link.get("source") in nodes and link.get("target") in nodes
]
return {
"root": root_id,
"depth": depth,
"nodes": list(nodes.values()),
"links": links,
"meta": {
"node_count": len(nodes),
"link_count": len(links),
},
}
def build_full_graph_payload(
client: "Elasticsearch",
index: str,
max_nodes: Optional[int],
*,
highlight_id: Optional[str] = None,
include_external: bool = True,
) -> Dict[str, Any]:
"""
Attempt to render the entire reference graph by gathering every video that
references another (or is referenced).
"""
query = {
"bool": {
"should": [
{"range": {"internal_references_count": {"gt": 0}}},
{"range": {"referenced_by_count": {"gt": 0}}},
{"exists": {"field": "internal_references"}},
{"exists": {"field": "referenced_by"}},
],
"minimum_should_match": 1,
}
}
source_fields = [
"video_id",
"title",
"channel_id",
"channel_name",
"url",
"date",
"internal_references",
"referenced_by",
]
nodes: Dict[str, Dict[str, Any]] = {}
links: List[Dict[str, Any]] = []
link_seen: Set[Tuple[str, str, str]] = set()
node_limit = max_nodes if max_nodes and max_nodes > 0 else None
batch_size = 500 if node_limit is None else min(500, max(50, node_limit * 2))
truncated = False
def ensure_node(node_id: Optional[str], doc: Optional[Dict[str, Any]] = None) -> bool:
if not node_id:
return False
if node_id in nodes:
if doc:
if not include_external and doc.get("external_reference"):
nodes.pop(node_id, None)
return False
existing = nodes[node_id]
if (not existing.get("title") or existing["title"] == node_id) and doc.get("title"):
existing["title"] = doc["title"]
if not existing.get("channel_id") and doc.get("channel_id"):
existing["channel_id"] = doc.get("channel_id")
if (
existing.get("channel_name") in {"Unknown", node_id, None}
and (doc.get("channel_name") or doc.get("channel_id"))
):
existing["channel_name"] = doc.get("channel_name") or doc.get("channel_id")
if not existing.get("url") and doc.get("url"):
existing["url"] = doc.get("url")
if not existing.get("date") and doc.get("date"):
existing["date"] = doc.get("date")
return True
if node_limit is not None and len(nodes) >= node_limit:
return False
channel_name = None
channel_id = None
url = None
date_val = None
title = node_id
if doc:
if not include_external and doc.get("external_reference"):
return False
title = doc.get("title") or title
channel_id = doc.get("channel_id")
channel_name = doc.get("channel_name") or channel_id
url = doc.get("url")
date_val = doc.get("date")
nodes[node_id] = {
"id": node_id,
"title": title,
"channel_id": channel_id,
"channel_name": channel_name or "Unknown",
"url": url,
"date": date_val,
"is_root": False,
"external_reference": bool(doc.get("external_reference")) if doc else False,
}
return True
scroll_id: Optional[str] = None
try:
body = {"query": query, "_source": source_fields, "sort": ["_doc"]}
response = client.search(index=index, body=body, size=batch_size, scroll="1m", request_timeout=60)
scroll_id = response.get("_scroll_id")
stop_fetch = False
while not stop_fetch:
hits = response.get("hits", {}).get("hits", [])
if not hits:
break
for hit in hits:
if node_limit is not None and len(nodes) >= node_limit:
truncated = True
stop_fetch = True
break
source = hit.get("_source", {}) or {}
video_id = source.get("video_id")
if not video_id:
continue
if not include_external and source.get("external_reference"):
nodes.pop(video_id, None)
continue
if not ensure_node(video_id, source):
continue
for target in normalize_reference_list(source.get("internal_references")):
if target == video_id:
continue
if not ensure_node(target):
continue
key = (video_id, target, "references")
if key not in link_seen:
links.append(
{"source": video_id, "target": target, "relation": "references"}
)
link_seen.add(key)
for origin in normalize_reference_list(source.get("referenced_by")):
if origin == video_id:
continue
if not ensure_node(origin):
continue
key = (origin, video_id, "referenced_by")
if key not in link_seen:
links.append(
{"source": origin, "target": video_id, "relation": "referenced_by"}
)
link_seen.add(key)
if stop_fetch or not scroll_id:
break
response = client.scroll(scroll_id=scroll_id, scroll="1m")
scroll_id = response.get("_scroll_id")
if not scroll_id:
break
finally:
if scroll_id:
try:
client.clear_scroll(scroll_id=scroll_id)
except Exception:
pass
if highlight_id and highlight_id in nodes:
nodes[highlight_id]["is_root"] = True
links = [
link
for link in links
if link.get("source") in nodes and link.get("target") in nodes
]
return {
"root": highlight_id or "",
"depth": 0,
"nodes": list(nodes.values()),
"links": links,
"meta": {
"node_count": len(nodes),
"link_count": len(links),
"mode": "full",
"truncated": truncated,
},
}
def create_app(config: AppConfig = CONFIG) -> Flask:
app = Flask(__name__, static_folder=str(Path(__file__).parent / "static"))
app.config['MAX_CONTENT_LENGTH'] = 1 * 1024 * 1024
@app.after_request
def add_security_headers(response):
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['Permissions-Policy'] = 'geolocation=(), microphone=(), camera=()'
response.headers['Content-Security-Policy'] = (
"default-src 'self'; "
"script-src 'self' https://cdn.jsdelivr.net https://unpkg.com; "
"style-src 'self' 'unsafe-inline' https://unpkg.com; "
"img-src 'self' data: https:; "
"font-src 'self' https://unpkg.com; "
"connect-src 'self'"
)
return response
@app.before_request
def enforce_rate_limit():
if not config.rate_limit.enabled:
return None
if not request.path.startswith("/api/"):
return None
limit = config.rate_limit.requests
window_seconds = config.rate_limit.window_seconds
if limit <= 0 or window_seconds <= 0:
return None
now = monotonic()
key = _client_rate_key()
with _RATE_LIMIT_LOCK:
bucket = _RATE_LIMIT_BUCKETS[key]
while bucket and now - bucket[0] > window_seconds:
bucket.popleft()
if len(bucket) >= limit:
retry_after = max(1, int(window_seconds - (now - bucket[0])))
return _rate_limited_response(retry_after)
bucket.append(now)
return None
client = _ensure_client(config)
index = config.elastic.index
@app.route("/")
def index_page():
return send_from_directory(app.static_folder, "index.html")
@app.route("/graph")
def graph_page():
return send_from_directory(app.static_folder, "graph.html")
@app.route("/static/<path:filename>")
def static_files(filename: str):
return send_from_directory(app.static_folder, filename)
def normalize_reference_list(values: Any) -> List[str]:
if values is None:
return []
if isinstance(values, (list, tuple, set)):
iterable = values
else:
iterable = [values]
normalized: List[str] = []
for item in iterable:
candidate: Optional[str]
if isinstance(item, dict):
candidate = item.get("video_id") or item.get("id") # type: ignore[assignment]
else:
candidate = item # type: ignore[assignment]
if candidate is None:
continue
text = str(candidate).strip()
if not text:
continue
if text.lower() in {"none", "null"}:
continue
normalized.append(text)
return normalized
@app.route("/api/channels")
def channels():
include_external = request.args.get("external", default="0", type=str)
include_external = include_external.lower() not in {"0", "false", "no"}
base_channels_body = {
"size": 0,
"aggs": {
"channels": {
"terms": {"field": "channel_id", "size": 200},
"aggs": {
"name": {
"top_hits": {
"size": 1,
"_source": {"includes": ["channel_name"]},
"sort": [
{
"channel_name.keyword": {
"order": "asc",
"missing": "_last",
"unmapped_type": "keyword",
}
}
],
}
}
},
}
},
}
if not include_external:
base_channels_body["query"] = {
"bool": {"must_not": [{"term": {"external_reference": True}}]}
}
def run_channels_request(field_name: str):
body = json.loads(json.dumps(base_channels_body)) # deep copy
body["aggs"]["channels"]["terms"]["field"] = field_name
if config.elastic.debug:
LOGGER.info(
"Elasticsearch channels request: %s",
json.dumps({"index": index, "body": body}, indent=2),
)
return client.search(index=index, body=body)
response = None
last_error = None
for candidate_field in ("channel_id.keyword", "channel_id"):
try:
response = run_channels_request(candidate_field)
if config.elastic.debug:
LOGGER.info("Channels aggregation used field: %s", candidate_field)
break
except BadRequestError as exc:
last_error = exc
if config.elastic.debug:
LOGGER.warning(
"Channels aggregation failed for field %s: %s",
candidate_field,
exc,
)
if response is None:
raise last_error or RuntimeError("Unable to aggregate channels.")
if config.elastic.debug:
LOGGER.info(
"Elasticsearch channels response: %s",
json.dumps(response, indent=2, default=str),
)
buckets = (
response.get("aggregations", {})
.get("channels", {})
.get("buckets", [])
)
data = []
for bucket in buckets:
key = bucket.get("key")
name_hit = (
bucket.get("name", {})
.get("hits", {})
.get("hits", [{}])[0]
.get("_source", {})
.get("channel_name")
)
display_name = name_hit or key or "Unknown"
data.append(
{
"Id": key,
"Name": display_name,
"Count": bucket.get("doc_count", 0),
}
)
data.sort(key=lambda item: item["Name"].lower())
return jsonify(data)
@app.route("/api/channel-list")
def channel_list():
payload = {
"channels": [],
"rss_feed_url": config.rss_feed_url,
"source": str(config.channels_path),
}
try:
payload["channels"] = load_channel_entries(config.channels_path)
except FileNotFoundError:
LOGGER.warning("Channel list not found: %s", config.channels_path)
payload["error"] = "channels_not_found"
except Exception as exc:
LOGGER.exception("Failed to load channel list: %s", exc)
payload["error"] = "channels_load_failed"
return jsonify(payload)
@app.route("/channels.txt")
def channel_urls():
try:
channels = load_channel_entries(config.channels_path)
except FileNotFoundError:
LOGGER.warning("Channel list not found: %s", config.channels_path)
return jsonify({"error": "channels_not_found"}), 404
except Exception as exc:
LOGGER.exception("Failed to load channel list: %s", exc)
return jsonify({"error": "channels_load_failed"}), 500
urls = [channel["url"] for channel in channels if channel.get("url")]
body = "\n".join(urls) + ("\n" if urls else "")
return (body, 200, {"Content-Type": "text/plain; charset=utf-8"})
def _rss_target(feed_name: str) -> str:
name = (feed_name or "").strip("/")
if not name:
name = "youtube-unified"
return f"{config.rss_feed_upstream.rstrip('/')}/rss/{name}"
@app.route("/rss")
@app.route("/rss/<path:feed_name>")
def rss_proxy(feed_name: str = ""):
target = _rss_target(feed_name)
try:
upstream = requests.get(target, timeout=30)
except requests.RequestException as exc:
LOGGER.warning("RSS upstream error for %s: %s", target, exc)
return jsonify({"error": "rss_unavailable"}), 502
payload = _rewrite_rss_payload(upstream.content, client, index, feed_name or "rss")
headers = {
"Content-Type": upstream.headers.get(
"Content-Type", "application/xml; charset=UTF-8"
)
}
cache_header = upstream.headers.get("Cache-Control")
if cache_header:
headers["Cache-Control"] = cache_header
etag = upstream.headers.get("ETag")
if etag:
headers["ETag"] = etag
last_modified = upstream.headers.get("Last-Modified")
if last_modified:
headers["Last-Modified"] = last_modified
return (payload, upstream.status_code, headers)
@app.route("/api/graph")
def graph_api():
video_id = (request.args.get("video_id") or "").strip()
full_graph = request.args.get("full_graph", default="0", type=str)
full_graph = full_graph.lower() in {"1", "true", "yes"}
if not full_graph and not video_id:
return jsonify({"error": "video_id is required"}), 400
include_external = request.args.get("external", default="1", type=str)
include_external = include_external.lower() not in {"0", "false", "no"}
try:
depth = int(request.args.get("depth", "1"))
except ValueError:
depth = 1
depth = max(0, min(depth, 3))
if full_graph:
max_nodes = None
else:
try:
max_nodes = int(request.args.get("max_nodes", "200"))
except ValueError:
max_nodes = 200
max_nodes = max(10, min(max_nodes, 400))
if full_graph:
payload = build_full_graph_payload(
client,
index,
None,
highlight_id=video_id or None,
include_external=include_external,
)
else:
payload = build_graph_payload(
client,
index,
video_id,
depth,
max_nodes,
include_external=include_external,
)
if not payload["nodes"]:
return (
jsonify({"error": f"Video '{video_id}' was not found in the index."}),
404,
)
payload["meta"]["max_nodes"] = (
len(payload["nodes"]) if full_graph else max_nodes
)
return jsonify(payload)
@app.route("/api/years")
def years():
body = {
"size": 0,
"aggs": {
"years": {
"date_histogram": {
"field": "date",
"calendar_interval": "year",
"format": "yyyy",
"order": {"_key": "desc"}
}
}
}
}
if config.elastic.debug:
LOGGER.info(
"Elasticsearch years request: %s",
json.dumps({"index": index, "body": body}, indent=2),
)
response = client.search(index=index, body=body)
if config.elastic.debug:
LOGGER.info(
"Elasticsearch years response: %s",
json.dumps(response, indent=2, default=str),
)
buckets = (
response.get("aggregations", {})
.get("years", {})
.get("buckets", [])
)
data = [
{
"Year": bucket.get("key_as_string"),
"Count": bucket.get("doc_count", 0),
}
for bucket in buckets
if bucket.get("doc_count", 0) > 0
]
return jsonify(data)
@app.route("/api/search")
def search():
query = request.args.get("q", "", type=str)
raw_channels: List[Optional[str]] = request.args.getlist("channel_id")
legacy_channel = request.args.get("channel", type=str)
if legacy_channel:
raw_channels.append(legacy_channel)
channels = parse_channel_params(raw_channels)
year = request.args.get("year", "", type=str) or None
sort = request.args.get("sort", "relevant", type=str)
page = max(request.args.get("page", 0, type=int), 0)
size = min(max(request.args.get("size", 10, type=int), 1), MAX_QUERY_SIZE)
def parse_flag(name: str, default: bool = True) -> bool:
value = request.args.get(name)
if value is None:
return default
return value.lower() not in {"0", "false", "no"}
use_exact = parse_flag("exact", True)
use_fuzzy = parse_flag("fuzzy", True)
use_phrase = parse_flag("phrase", True)
use_query_string = parse_flag("query_string", False)
include_external = parse_flag("external", False)
if use_query_string:
use_exact = use_fuzzy = use_phrase = False
payload = build_query_payload(
query,
channels=channels,
year=year,
sort=sort,
use_exact=use_exact,
use_fuzzy=use_fuzzy,
use_phrase=use_phrase,
use_query_string=use_query_string,
include_external=include_external,
)
start = page * size
if start >= MAX_OFFSET:
return jsonify({"error": "offset_too_large", "maxOffset": MAX_OFFSET}), 400
if start + size > MAX_OFFSET:
size = max(1, MAX_OFFSET - start)
if config.elastic.debug:
LOGGER.info(
"Elasticsearch search request: %s",
json.dumps(
{
"index": index,
"from": start,
"size": size,
"body": payload,
"channels": channels,
"toggles": {
"exact": use_exact,
"fuzzy": use_fuzzy,
"phrase": use_phrase,
},
},
indent=2,
),
)
response = client.search(
index=index,
from_=start,
size=size,
body=payload,
request_timeout=30,
)
if config.elastic.debug:
LOGGER.info(
"Elasticsearch search response: %s",
json.dumps(response, indent=2, default=str),
)
hits = response.get("hits", {})
total = hits.get("total", {}).get("value", 0)
documents = []
for hit in hits.get("hits", []):
source = hit.get("_source", {})
highlight_map = hit.get("highlight", {})
transcript_highlight = [
{"html": value, "source": "primary"}
for value in (highlight_map.get("transcript_full", []) or [])
] + [
{"html": value, "source": "secondary"}
for value in (highlight_map.get("transcript_secondary_full", []) or [])
]
title_highlight = highlight_map.get("title") or []
description_highlight = highlight_map.get("description") or []
title_html = title_highlight[0] if title_highlight else None
description_html = description_highlight[0] if description_highlight else None
documents.append(
{
"video_id": source.get("video_id"),
"channel_id": source.get("channel_id"),
"channel_name": source.get("channel_name"),
"title": source.get("title"),
"titleHtml": title_html,
"description": source.get("description"),
"descriptionHtml": description_html,
"date": source.get("date"),
"duration": source.get("duration"),
"url": source.get("url"),
"toHighlight": transcript_highlight,
"highlightSource": {
"primary": bool(highlight_map.get("transcript_full")),
"secondary": bool(highlight_map.get("transcript_secondary_full")),
},
"internal_references_count": source.get("internal_references_count", 0),
"internal_references": source.get("internal_references", []),
"referenced_by_count": source.get("referenced_by_count", 0),
"referenced_by": source.get("referenced_by", []),
"video_status": source.get("video_status"),
"external_reference": source.get("external_reference"),
}
)
return jsonify(
{
"items": documents,
"totalResults": total,
"totalPages": (total + size - 1) // size,
"currentPage": page,
}
)
@app.route("/api/metrics")
def metrics():
include_external = request.args.get("external", default="1", type=str)
include_external = include_external.lower() not in {"0", "false", "no"}
try:
data = elastic_metrics_payload(
client,
index,
channel_field_candidates=["channel_id.keyword", "channel_id"],
debug=config.elastic.debug,
include_external=include_external,
)
except Exception:
LOGGER.exception(
"Falling back to local metrics payload due to Elasticsearch error.",
exc_info=True,
)
data = metrics_payload(config.data.root, include_external=include_external)
return jsonify(data)
@app.route("/api/frequency")
def frequency():
raw_term = request.args.get("term", type=str) or ""
use_query_string = request.args.get("query_string", default="0", type=str)
use_query_string = (use_query_string or "").lower() in {"1", "true", "yes"}
term = raw_term.strip()
if not term and not use_query_string:
return ("term parameter is required", 400)
if use_query_string and not term:
term = "*"
raw_channels: List[Optional[str]] = request.args.getlist("channel_id")
legacy_channel = request.args.get("channel", type=str)
if legacy_channel:
raw_channels.append(legacy_channel)
channels = parse_channel_params(raw_channels)
year = request.args.get("year", "", type=str) or None
interval = (request.args.get("interval", "month") or "month").lower()
allowed_intervals = {"day", "week", "month", "quarter", "year"}
if interval not in allowed_intervals:
interval = "month"
start = request.args.get("start", type=str)
end = request.args.get("end", type=str)
def parse_flag(name: str, default: bool = True) -> bool:
value = request.args.get(name)
if value is None:
return default
lowered = value.lower()
return lowered not in {"0", "false", "no"}
use_exact = parse_flag("exact", True)
use_fuzzy = parse_flag("fuzzy", True)
use_phrase = parse_flag("phrase", True)
include_external = parse_flag("external", False)
if use_query_string:
use_exact = use_fuzzy = use_phrase = False
search_payload = build_query_payload(
term,
channels=channels,
year=year,
sort="relevant",
use_exact=use_exact,
use_fuzzy=use_fuzzy,
use_phrase=use_phrase,
use_query_string=use_query_string,
include_external=include_external,
)
query = search_payload.get("query", {"match_all": {}})
if start or end:
range_filter: Dict[str, Dict[str, Dict[str, str]]] = {"range": {"date": {}}}
if start:
range_filter["range"]["date"]["gte"] = start
if end:
range_filter["range"]["date"]["lte"] = end
if "bool" in query:
bool_clause = query.setdefault("bool", {})
existing_filter = bool_clause.get("filter")
if existing_filter is None:
bool_clause["filter"] = [range_filter]
elif isinstance(existing_filter, list):
bool_clause["filter"].append(range_filter)
else:
bool_clause["filter"] = [existing_filter, range_filter]
elif query.get("match_all") is not None:
query = {"bool": {"filter": [range_filter]}}
else:
query = {"bool": {"must": [query], "filter": [range_filter]}}
histogram: Dict[str, Any] = {
"field": "date",
"calendar_interval": interval,
"min_doc_count": 0,
}
if start or end:
bounds: Dict[str, str] = {}
if start:
bounds["min"] = start
if end:
bounds["max"] = end
if bounds:
histogram["extended_bounds"] = bounds
channel_terms_size = max(6, len(channels)) if channels else 6
body = {
"size": 0,
"query": query,
"aggs": {
"over_time": {
"date_histogram": histogram,
"aggs": {
"by_channel": {
"terms": {
"field": "channel_id.keyword",
"size": channel_terms_size,
"order": {"_count": "desc"},
},
"aggs": {
"channel_name_hit": {
"top_hits": {
"size": 1,
"_source": {"includes": ["channel_name"]},
"sort": [
{
"channel_name.keyword": {
"order": "asc",
"missing": "_last",
"unmapped_type": "keyword",
}
}
],
}
}
},
}
},
}
},
}
if config.elastic.debug:
LOGGER.info(
"Elasticsearch frequency request: %s",
json.dumps(
{
"index": index,
"body": body,
"term": term,
"interval": interval,
"channels": channels,
"start": start,
"end": end,
"query_string": use_query_string,
},
indent=2,
),
)
response = client.search(index=index, body=body)
if config.elastic.debug:
LOGGER.info(
"Elasticsearch frequency response: %s",
json.dumps(response, indent=2, default=str),
)
raw_buckets = (
response.get("aggregations", {})
.get("over_time", {})
.get("buckets", [])
)
channel_totals: Dict[str, Dict[str, Any]] = {}
buckets: List[Dict[str, Any]] = []
for bucket in raw_buckets:
date_str = bucket.get("key_as_string")
total = bucket.get("doc_count", 0)
channel_entries: List[Dict[str, Any]] = []
for ch_bucket in bucket.get("by_channel", {}).get("buckets", []):
cid = ch_bucket.get("key")
count = ch_bucket.get("doc_count", 0)
if cid:
hit_source = (
ch_bucket.get("channel_name_hit", {})
.get("hits", {})
.get("hits", [{}])[0]
.get("_source", {})
)
channel_name = hit_source.get("channel_name") if isinstance(hit_source, dict) else None
channel_entries.append({"id": cid, "count": count, "name": channel_name})
if cid not in channel_totals:
channel_totals[cid] = {"total": 0, "name": channel_name}
channel_totals[cid]["total"] += count
if channel_name and not channel_totals[cid].get("name"):
channel_totals[cid]["name"] = channel_name
buckets.append(
{"date": date_str, "total": total, "channels": channel_entries}
)
ranked_channels = sorted(
[
{"id": cid, "total": info.get("total", 0), "name": info.get("name")}
for cid, info in channel_totals.items()
],
key=lambda item: item["total"],
reverse=True,
)
payload = {
"term": raw_term if not use_query_string else term,
"interval": interval,
"buckets": buckets,
"channels": ranked_channels,
"totalResults": response.get("hits", {})
.get("total", {})
.get("value", 0),
}
return jsonify(payload)
@app.route("/frequency")
def frequency_page():
return send_from_directory(app.static_folder, "frequency.html")
@app.route("/api/transcript")
def transcript():
video_id = request.args.get("video_id", type=str)
if not video_id:
return ("video_id not set", 400)
response = client.get(index=index, id=video_id, ignore=[404])
if config.elastic.debug:
LOGGER.info(
"Elasticsearch transcript request: index=%s id=%s", index, video_id
)
LOGGER.info(
"Elasticsearch transcript response: %s",
json.dumps(response, indent=2, default=str)
if response
else "None",
)
if not response or not response.get("found"):
return ("not found", 404)
source = response["_source"]
return jsonify(
{
"video_id": source.get("video_id"),
"title": source.get("title"),
"transcript_parts": source.get("transcript_parts", []),
"transcript_full": source.get("transcript_full"),
"transcript_secondary_parts": source.get("transcript_secondary_parts", []),
"transcript_secondary_full": source.get("transcript_secondary_full"),
}
)
return app
def main() -> None: # pragma: no cover
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
app = create_app()
debug_mode = os.environ.get("FLASK_DEBUG", "0").lower() in ("1", "true")
app.run(host="0.0.0.0", port=8080, debug=debug_mode)
if __name__ == "__main__": # pragma: no cover
main()