Add unified channel feed
This commit is contained in:
262
search_app.py
262
search_app.py
@@ -5,6 +5,8 @@ Routes:
|
||||
GET / -> static HTML search page.
|
||||
GET /graph -> static reference graph UI.
|
||||
GET /api/channels -> channels aggregation.
|
||||
GET /api/channel-list -> canonical channel list + feed URL.
|
||||
GET /channels.txt -> raw channel URLs list.
|
||||
GET /api/search -> Elasticsearch keyword search.
|
||||
GET /api/graph -> reference graph API.
|
||||
GET /api/transcript -> transcript JSON payload.
|
||||
@@ -17,6 +19,8 @@ import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import urllib.parse
|
||||
import xml.etree.ElementTree as ET
|
||||
from pathlib import Path
|
||||
from typing import Any, Deque, Dict, Iterable, List, Optional, Sequence, Set, Tuple
|
||||
|
||||
@@ -25,9 +29,11 @@ from datetime import datetime
|
||||
from threading import Lock
|
||||
from time import monotonic
|
||||
|
||||
import requests
|
||||
from flask import Flask, jsonify, request, send_from_directory
|
||||
|
||||
from .config import CONFIG, AppConfig
|
||||
from .channel_config import load_channel_entries
|
||||
|
||||
try:
|
||||
from elasticsearch import Elasticsearch # type: ignore
|
||||
@@ -45,6 +51,10 @@ DEFAULT_ELASTIC_TIMEOUT = int(os.environ.get("ELASTIC_TIMEOUT_SECONDS", "30"))
|
||||
|
||||
_RATE_LIMIT_BUCKETS: Dict[str, Deque[float]] = defaultdict(deque)
|
||||
_RATE_LIMIT_LOCK = Lock()
|
||||
_RSS_AUTHOR_CACHE: Dict[str, Tuple[str, float]] = {}
|
||||
_RSS_AUTHOR_LOCK = Lock()
|
||||
_RSS_AUTHOR_TTL_SECONDS = 60 * 60 * 24
|
||||
_RSS_OEMBED_LIMIT = 12
|
||||
|
||||
|
||||
def _client_rate_key() -> str:
|
||||
@@ -101,6 +111,192 @@ def _ensure_client(config: AppConfig) -> "Elasticsearch":
|
||||
return Elasticsearch(config.elastic.url, **kwargs)
|
||||
|
||||
|
||||
def _extract_video_id(url: str) -> Optional[str]:
|
||||
if not url:
|
||||
return None
|
||||
try:
|
||||
parsed = urllib.parse.urlparse(url.strip())
|
||||
except Exception:
|
||||
return None
|
||||
host = (parsed.netloc or "").lower()
|
||||
path = parsed.path or ""
|
||||
if host in {"youtu.be", "www.youtu.be"}:
|
||||
return path.lstrip("/") or None
|
||||
if host.endswith("youtube.com"):
|
||||
if path == "/watch":
|
||||
params = urllib.parse.parse_qs(parsed.query)
|
||||
return (params.get("v") or [None])[0]
|
||||
if path.startswith("/shorts/"):
|
||||
return path.split("/", 2)[2] if len(path.split("/", 2)) > 2 else None
|
||||
return None
|
||||
|
||||
|
||||
def _lookup_channel_names(
|
||||
client: "Elasticsearch",
|
||||
index: str,
|
||||
video_ids: Iterable[str],
|
||||
) -> Dict[str, str]:
|
||||
ids = [vid for vid in video_ids if vid]
|
||||
if not ids:
|
||||
return {}
|
||||
|
||||
now = monotonic()
|
||||
mapping: Dict[str, str] = {}
|
||||
cached_hits = 0
|
||||
elastic_hits = 0
|
||||
remaining = []
|
||||
with _RSS_AUTHOR_LOCK:
|
||||
for vid in ids:
|
||||
cached = _RSS_AUTHOR_CACHE.get(vid)
|
||||
if cached and (now - cached[1]) < _RSS_AUTHOR_TTL_SECONDS:
|
||||
mapping[vid] = cached[0]
|
||||
cached_hits += 1
|
||||
else:
|
||||
remaining.append(vid)
|
||||
|
||||
if remaining:
|
||||
try:
|
||||
response = client.mget(index=index, body={"ids": remaining})
|
||||
except Exception as exc: # pragma: no cover - elasticsearch handles errors
|
||||
LOGGER.warning("RSS title lookup failed: %s", exc)
|
||||
response = {}
|
||||
for doc in response.get("docs", []):
|
||||
if not doc.get("found"):
|
||||
continue
|
||||
source = doc.get("_source") or {}
|
||||
name = source.get("channel_name") or source.get("channel_id")
|
||||
if name:
|
||||
vid = doc.get("_id", "")
|
||||
mapping[vid] = str(name)
|
||||
elastic_hits += 1
|
||||
with _RSS_AUTHOR_LOCK:
|
||||
_RSS_AUTHOR_CACHE[vid] = (mapping[vid], now)
|
||||
|
||||
missing = [vid for vid in remaining if vid not in mapping]
|
||||
oembed_hits = 0
|
||||
oembed_attempts = 0
|
||||
if missing:
|
||||
for vid in missing[:_RSS_OEMBED_LIMIT]:
|
||||
oembed_attempts += 1
|
||||
video_url = f"https://www.youtube.com/watch?v={vid}"
|
||||
oembed_url = (
|
||||
"https://www.youtube.com/oembed?format=json&url="
|
||||
+ urllib.parse.quote(video_url, safe="")
|
||||
)
|
||||
try:
|
||||
response = requests.get(oembed_url, timeout=10)
|
||||
if response.status_code != 200:
|
||||
continue
|
||||
data = response.json()
|
||||
except Exception:
|
||||
continue
|
||||
author = data.get("author_name")
|
||||
if not author:
|
||||
continue
|
||||
mapping[vid] = str(author)
|
||||
oembed_hits += 1
|
||||
with _RSS_AUTHOR_LOCK:
|
||||
_RSS_AUTHOR_CACHE[vid] = (mapping[vid], now)
|
||||
|
||||
missing_count = max(len(ids) - cached_hits - elastic_hits - oembed_hits, 0)
|
||||
if missing_count or oembed_attempts:
|
||||
LOGGER.info(
|
||||
"RSS title lookup: total=%d cached=%d elastic=%d oembed=%d missing=%d",
|
||||
len(ids),
|
||||
cached_hits,
|
||||
elastic_hits,
|
||||
oembed_hits,
|
||||
missing_count,
|
||||
)
|
||||
else:
|
||||
LOGGER.debug(
|
||||
"RSS title lookup: total=%d cached=%d elastic=%d",
|
||||
len(ids),
|
||||
cached_hits,
|
||||
elastic_hits,
|
||||
)
|
||||
|
||||
return mapping
|
||||
|
||||
|
||||
def _rewrite_rss_payload(
|
||||
content: bytes,
|
||||
client: "Elasticsearch",
|
||||
index: str,
|
||||
feed_name: str,
|
||||
) -> bytes:
|
||||
try:
|
||||
root = ET.fromstring(content)
|
||||
except ET.ParseError:
|
||||
LOGGER.warning("RSS rewrite skipped (invalid XML) for %s", feed_name)
|
||||
return content
|
||||
|
||||
channel = root.find("channel")
|
||||
if channel is None:
|
||||
LOGGER.warning("RSS rewrite skipped (missing channel) for %s", feed_name)
|
||||
return content
|
||||
|
||||
items = channel.findall("item")
|
||||
total_items = len(items)
|
||||
removed_errors = 0
|
||||
video_ids: Set[str] = set()
|
||||
for item in list(items):
|
||||
title_el = item.find("title")
|
||||
title_text = (title_el.text or "").strip() if title_el is not None else ""
|
||||
if "Bridge returned error" in title_text:
|
||||
channel.remove(item)
|
||||
removed_errors += 1
|
||||
continue
|
||||
link_el = item.find("link")
|
||||
guid_el = item.find("guid")
|
||||
video_id = _extract_video_id((link_el.text or "") if link_el is not None else "")
|
||||
if not video_id:
|
||||
video_id = _extract_video_id((guid_el.text or "") if guid_el is not None else "")
|
||||
if video_id:
|
||||
video_ids.add(video_id)
|
||||
|
||||
channel_name_map = _lookup_channel_names(client, index, video_ids)
|
||||
if not channel_name_map:
|
||||
LOGGER.info(
|
||||
"RSS rewrite: feed=%s items=%d removed_errors=%d resolved=0",
|
||||
feed_name,
|
||||
total_items,
|
||||
removed_errors,
|
||||
)
|
||||
return ET.tostring(root, encoding="utf-8", xml_declaration=True)
|
||||
|
||||
prefixed = 0
|
||||
for item in channel.findall("item"):
|
||||
title_el = item.find("title")
|
||||
if title_el is None or not title_el.text:
|
||||
continue
|
||||
link_el = item.find("link")
|
||||
guid_el = item.find("guid")
|
||||
video_id = _extract_video_id((link_el.text or "") if link_el is not None else "")
|
||||
if not video_id:
|
||||
video_id = _extract_video_id((guid_el.text or "") if guid_el is not None else "")
|
||||
if not video_id:
|
||||
continue
|
||||
channel_name = channel_name_map.get(video_id)
|
||||
if not channel_name:
|
||||
continue
|
||||
prefix = f"{channel_name} - "
|
||||
if title_el.text.startswith(prefix):
|
||||
continue
|
||||
title_el.text = f"{channel_name} - {title_el.text}"
|
||||
prefixed += 1
|
||||
|
||||
LOGGER.info(
|
||||
"RSS rewrite: feed=%s items=%d removed_errors=%d prefixed=%d resolved=%d",
|
||||
feed_name,
|
||||
total_items,
|
||||
removed_errors,
|
||||
prefixed,
|
||||
len(channel_name_map),
|
||||
)
|
||||
return ET.tostring(root, encoding="utf-8", xml_declaration=True)
|
||||
|
||||
|
||||
def metrics_payload(data_root: Path, include_external: bool = True) -> Dict[str, Any]:
|
||||
total_items = 0
|
||||
channel_counter: Counter = Counter()
|
||||
@@ -1077,6 +1273,72 @@ def create_app(config: AppConfig = CONFIG) -> Flask:
|
||||
data.sort(key=lambda item: item["Name"].lower())
|
||||
return jsonify(data)
|
||||
|
||||
@app.route("/api/channel-list")
|
||||
def channel_list():
|
||||
payload = {
|
||||
"channels": [],
|
||||
"rss_feed_url": config.rss_feed_url,
|
||||
"source": str(config.channels_path),
|
||||
}
|
||||
try:
|
||||
payload["channels"] = load_channel_entries(config.channels_path)
|
||||
except FileNotFoundError:
|
||||
LOGGER.warning("Channel list not found: %s", config.channels_path)
|
||||
payload["error"] = "channels_not_found"
|
||||
except Exception as exc:
|
||||
LOGGER.exception("Failed to load channel list: %s", exc)
|
||||
payload["error"] = "channels_load_failed"
|
||||
return jsonify(payload)
|
||||
|
||||
@app.route("/channels.txt")
|
||||
def channel_urls():
|
||||
try:
|
||||
channels = load_channel_entries(config.channels_path)
|
||||
except FileNotFoundError:
|
||||
LOGGER.warning("Channel list not found: %s", config.channels_path)
|
||||
return jsonify({"error": "channels_not_found"}), 404
|
||||
except Exception as exc:
|
||||
LOGGER.exception("Failed to load channel list: %s", exc)
|
||||
return jsonify({"error": "channels_load_failed"}), 500
|
||||
|
||||
urls = [channel["url"] for channel in channels if channel.get("url")]
|
||||
body = "\n".join(urls) + ("\n" if urls else "")
|
||||
return (body, 200, {"Content-Type": "text/plain; charset=utf-8"})
|
||||
|
||||
def _rss_target(feed_name: str) -> str:
|
||||
name = (feed_name or "").strip("/")
|
||||
if not name:
|
||||
name = "youtube-unified"
|
||||
return f"{config.rss_feed_upstream.rstrip('/')}/rss/{name}"
|
||||
|
||||
@app.route("/rss")
|
||||
@app.route("/rss/<path:feed_name>")
|
||||
def rss_proxy(feed_name: str = ""):
|
||||
target = _rss_target(feed_name)
|
||||
try:
|
||||
upstream = requests.get(target, timeout=30)
|
||||
except requests.RequestException as exc:
|
||||
LOGGER.warning("RSS upstream error for %s: %s", target, exc)
|
||||
return jsonify({"error": "rss_unavailable"}), 502
|
||||
|
||||
payload = _rewrite_rss_payload(upstream.content, client, index, feed_name or "rss")
|
||||
headers = {
|
||||
"Content-Type": upstream.headers.get(
|
||||
"Content-Type", "application/xml; charset=UTF-8"
|
||||
)
|
||||
}
|
||||
cache_header = upstream.headers.get("Cache-Control")
|
||||
if cache_header:
|
||||
headers["Cache-Control"] = cache_header
|
||||
etag = upstream.headers.get("ETag")
|
||||
if etag:
|
||||
headers["ETag"] = etag
|
||||
last_modified = upstream.headers.get("Last-Modified")
|
||||
if last_modified:
|
||||
headers["Last-Modified"] = last_modified
|
||||
|
||||
return (payload, upstream.status_code, headers)
|
||||
|
||||
@app.route("/api/graph")
|
||||
def graph_api():
|
||||
video_id = (request.args.get("video_id") or "").strip()
|
||||
|
||||
Reference in New Issue
Block a user