")
def static_files(filename: str):
return send_from_directory(app.static_folder, filename)
def normalize_reference_list(values: Any) -> List[str]:
if values is None:
return []
if isinstance(values, (list, tuple, set)):
iterable = values
else:
iterable = [values]
normalized: List[str] = []
for item in iterable:
candidate: Optional[str]
if isinstance(item, dict):
candidate = item.get("video_id") or item.get("id") # type: ignore[assignment]
else:
candidate = item # type: ignore[assignment]
if candidate is None:
continue
text = str(candidate).strip()
if not text:
continue
if text.lower() in {"none", "null"}:
continue
normalized.append(text)
return normalized
@app.route("/api/channels")
def channels():
include_external = request.args.get("external", default="0", type=str)
include_external = include_external.lower() not in {"0", "false", "no"}
base_channels_body = {
"size": 0,
"aggs": {
"channels": {
"terms": {"field": "channel_id", "size": 200},
"aggs": {
"name": {
"top_hits": {
"size": 1,
"_source": {"includes": ["channel_name"]},
"sort": [
{
"channel_name.keyword": {
"order": "asc",
"missing": "_last",
"unmapped_type": "keyword",
}
}
],
}
}
},
}
},
}
if not include_external:
base_channels_body["query"] = {
"bool": {"must_not": [{"term": {"external_reference": True}}]}
}
def run_channels_request(field_name: str):
body = json.loads(json.dumps(base_channels_body)) # deep copy
body["aggs"]["channels"]["terms"]["field"] = field_name
if config.elastic.debug:
LOGGER.info(
"Elasticsearch channels request: %s",
json.dumps({"index": index, "body": body}, indent=2),
)
return client.search(index=index, body=body)
response = None
last_error = None
for candidate_field in ("channel_id.keyword", "channel_id"):
try:
response = run_channels_request(candidate_field)
if config.elastic.debug:
LOGGER.info("Channels aggregation used field: %s", candidate_field)
break
except BadRequestError as exc:
last_error = exc
if config.elastic.debug:
LOGGER.warning(
"Channels aggregation failed for field %s: %s",
candidate_field,
exc,
)
if response is None:
raise last_error or RuntimeError("Unable to aggregate channels.")
if config.elastic.debug:
LOGGER.info(
"Elasticsearch channels response: %s",
json.dumps(response, indent=2, default=str),
)
buckets = (
response.get("aggregations", {})
.get("channels", {})
.get("buckets", [])
)
data = []
for bucket in buckets:
key = bucket.get("key")
name_hit = (
bucket.get("name", {})
.get("hits", {})
.get("hits", [{}])[0]
.get("_source", {})
.get("channel_name")
)
display_name = name_hit or key or "Unknown"
data.append(
{
"Id": key,
"Name": display_name,
"Count": bucket.get("doc_count", 0),
}
)
data.sort(key=lambda item: item["Name"].lower())
return jsonify(data)
@app.route("/api/channel-list")
def channel_list():
payload = {
"channels": [],
"rss_feed_url": config.rss_feed_url,
"source": str(config.channels_path),
}
try:
payload["channels"] = load_channel_entries(config.channels_path)
except FileNotFoundError:
LOGGER.warning("Channel list not found: %s", config.channels_path)
payload["error"] = "channels_not_found"
except Exception as exc:
LOGGER.exception("Failed to load channel list: %s", exc)
payload["error"] = "channels_load_failed"
return jsonify(payload)
@app.route("/channels.txt")
def channel_urls():
try:
channels = load_channel_entries(config.channels_path)
except FileNotFoundError:
LOGGER.warning("Channel list not found: %s", config.channels_path)
return jsonify({"error": "channels_not_found"}), 404
except Exception as exc:
LOGGER.exception("Failed to load channel list: %s", exc)
return jsonify({"error": "channels_load_failed"}), 500
urls = [channel["url"] for channel in channels if channel.get("url")]
body = "\n".join(urls) + ("\n" if urls else "")
return (body, 200, {"Content-Type": "text/plain; charset=utf-8"})
@app.route("/channels")
def channels_page():
try:
entries = load_channel_entries(config.channels_path)
except FileNotFoundError:
return "Channel list not found.", 404
except Exception:
return "Failed to load channel list.", 500
rows = ""
for ch in entries:
name = ch.get("name") or ch.get("handle") or ch.get("id") or "Unknown"
url = ch.get("url", "")
# Link to the channel page, not the /videos tab
channel_url = url.replace("/videos", "") if url.endswith("/videos") else url
name_html = (
f'{name}'
if channel_url
else name
)
rows += f"| {name_html} |
\n"
html = f"""
Channels - This Little Corner
Channels
{len(entries)} channels tracked
"""
return (html, 200, {"Content-Type": "text/html; charset=utf-8"})
def _rss_target(feed_name: str) -> str:
name = (feed_name or "").strip("/")
if not name:
name = "youtube-unified"
return f"{config.rss_feed_upstream.rstrip('/')}/rss/{name}"
@app.route("/rss")
@app.route("/rss/")
def rss_proxy(feed_name: str = ""):
target = _rss_target(feed_name)
try:
upstream = requests.get(target, timeout=30)
except requests.RequestException as exc:
LOGGER.warning("RSS upstream error for %s: %s", target, exc)
return jsonify({"error": "rss_unavailable"}), 502
payload = _rewrite_rss_payload(upstream.content, client, index, feed_name or "rss")
headers = {
"Content-Type": upstream.headers.get(
"Content-Type", "application/xml; charset=UTF-8"
)
}
cache_header = upstream.headers.get("Cache-Control")
if cache_header:
headers["Cache-Control"] = cache_header
etag = upstream.headers.get("ETag")
if etag:
headers["ETag"] = etag
last_modified = upstream.headers.get("Last-Modified")
if last_modified:
headers["Last-Modified"] = last_modified
return (payload, upstream.status_code, headers)
@app.route("/api/graph")
def graph_api():
video_id = (request.args.get("video_id") or "").strip()
full_graph = request.args.get("full_graph", default="0", type=str)
full_graph = full_graph.lower() in {"1", "true", "yes"}
if not full_graph and not video_id:
return jsonify({"error": "video_id is required"}), 400
include_external = request.args.get("external", default="1", type=str)
include_external = include_external.lower() not in {"0", "false", "no"}
try:
depth = int(request.args.get("depth", "1"))
except ValueError:
depth = 1
depth = max(0, min(depth, 3))
if full_graph:
max_nodes = None
else:
try:
max_nodes = int(request.args.get("max_nodes", "200"))
except ValueError:
max_nodes = 200
max_nodes = max(10, min(max_nodes, 400))
if full_graph:
payload = build_full_graph_payload(
client,
index,
None,
highlight_id=video_id or None,
include_external=include_external,
)
else:
payload = build_graph_payload(
client,
index,
video_id,
depth,
max_nodes,
include_external=include_external,
)
if not payload["nodes"]:
return (
jsonify({"error": f"Video '{video_id}' was not found in the index."}),
404,
)
payload["meta"]["max_nodes"] = (
len(payload["nodes"]) if full_graph else max_nodes
)
return jsonify(payload)
@app.route("/api/years")
def years():
body = {
"size": 0,
"aggs": {
"years": {
"date_histogram": {
"field": "date",
"calendar_interval": "year",
"format": "yyyy",
"order": {"_key": "desc"}
}
}
}
}
if config.elastic.debug:
LOGGER.info(
"Elasticsearch years request: %s",
json.dumps({"index": index, "body": body}, indent=2),
)
response = client.search(index=index, body=body)
if config.elastic.debug:
LOGGER.info(
"Elasticsearch years response: %s",
json.dumps(response, indent=2, default=str),
)
buckets = (
response.get("aggregations", {})
.get("years", {})
.get("buckets", [])
)
data = [
{
"Year": bucket.get("key_as_string"),
"Count": bucket.get("doc_count", 0),
}
for bucket in buckets
if bucket.get("doc_count", 0) > 0
]
return jsonify(data)
@app.route("/api/search")
def search():
query = request.args.get("q", "", type=str)
raw_channels: List[Optional[str]] = request.args.getlist("channel_id")
legacy_channel = request.args.get("channel", type=str)
if legacy_channel:
raw_channels.append(legacy_channel)
channels = parse_channel_params(raw_channels)
year = request.args.get("year", "", type=str) or None
sort = request.args.get("sort", "relevant", type=str)
page = max(request.args.get("page", 0, type=int), 0)
size = min(max(request.args.get("size", 10, type=int), 1), MAX_QUERY_SIZE)
def parse_flag(name: str, default: bool = True) -> bool:
value = request.args.get(name)
if value is None:
return default
return value.lower() not in {"0", "false", "no"}
use_exact = parse_flag("exact", True)
use_fuzzy = parse_flag("fuzzy", True)
use_phrase = parse_flag("phrase", True)
use_query_string = parse_flag("query_string", False)
include_external = parse_flag("external", False)
if use_query_string:
use_exact = use_fuzzy = use_phrase = False
payload = build_query_payload(
query,
channels=channels,
year=year,
sort=sort,
use_exact=use_exact,
use_fuzzy=use_fuzzy,
use_phrase=use_phrase,
use_query_string=use_query_string,
include_external=include_external,
)
start = page * size
if start >= MAX_OFFSET:
return jsonify({"error": "offset_too_large", "maxOffset": MAX_OFFSET}), 400
if start + size > MAX_OFFSET:
size = max(1, MAX_OFFSET - start)
if config.elastic.debug:
LOGGER.info(
"Elasticsearch search request: %s",
json.dumps(
{
"index": index,
"from": start,
"size": size,
"body": payload,
"channels": channels,
"toggles": {
"exact": use_exact,
"fuzzy": use_fuzzy,
"phrase": use_phrase,
},
},
indent=2,
),
)
response = client.search(
index=index,
from_=start,
size=size,
body=payload,
request_timeout=30,
)
if config.elastic.debug:
LOGGER.info(
"Elasticsearch search response: %s",
json.dumps(response, indent=2, default=str),
)
hits = response.get("hits", {})
total = hits.get("total", {}).get("value", 0)
documents = []
for hit in hits.get("hits", []):
source = hit.get("_source", {})
highlight_map = hit.get("highlight", {})
transcript_highlight = [
{"html": value, "source": "primary"}
for value in (highlight_map.get("transcript_full", []) or [])
] + [
{"html": value, "source": "secondary"}
for value in (highlight_map.get("transcript_secondary_full", []) or [])
]
title_highlight = highlight_map.get("title") or []
description_highlight = highlight_map.get("description") or []
title_html = title_highlight[0] if title_highlight else None
description_html = description_highlight[0] if description_highlight else None
documents.append(
{
"video_id": source.get("video_id"),
"channel_id": source.get("channel_id"),
"channel_name": source.get("channel_name"),
"title": source.get("title"),
"titleHtml": title_html,
"description": source.get("description"),
"descriptionHtml": description_html,
"date": source.get("date"),
"duration": source.get("duration"),
"url": source.get("url"),
"toHighlight": transcript_highlight,
"highlightSource": {
"primary": bool(highlight_map.get("transcript_full")),
"secondary": bool(highlight_map.get("transcript_secondary_full")),
},
"internal_references_count": source.get("internal_references_count", 0),
"internal_references": source.get("internal_references", []),
"referenced_by_count": source.get("referenced_by_count", 0),
"referenced_by": source.get("referenced_by", []),
"video_status": source.get("video_status"),
"external_reference": source.get("external_reference"),
}
)
return jsonify(
{
"items": documents,
"totalResults": total,
"totalPages": (total + size - 1) // size,
"currentPage": page,
}
)
@app.route("/api/metrics")
def metrics():
include_external = request.args.get("external", default="1", type=str)
include_external = include_external.lower() not in {"0", "false", "no"}
try:
data = elastic_metrics_payload(
client,
index,
channel_field_candidates=["channel_id.keyword", "channel_id"],
debug=config.elastic.debug,
include_external=include_external,
)
except Exception:
LOGGER.exception(
"Falling back to local metrics payload due to Elasticsearch error.",
exc_info=True,
)
data = metrics_payload(config.data.root, include_external=include_external)
return jsonify(data)
@app.route("/api/frequency")
def frequency():
raw_term = request.args.get("term", type=str) or ""
use_query_string = request.args.get("query_string", default="0", type=str)
use_query_string = (use_query_string or "").lower() in {"1", "true", "yes"}
term = raw_term.strip()
if not term and not use_query_string:
return ("term parameter is required", 400)
if use_query_string and not term:
term = "*"
raw_channels: List[Optional[str]] = request.args.getlist("channel_id")
legacy_channel = request.args.get("channel", type=str)
if legacy_channel:
raw_channels.append(legacy_channel)
channels = parse_channel_params(raw_channels)
year = request.args.get("year", "", type=str) or None
interval = (request.args.get("interval", "month") or "month").lower()
allowed_intervals = {"day", "week", "month", "quarter", "year"}
if interval not in allowed_intervals:
interval = "month"
start = request.args.get("start", type=str)
end = request.args.get("end", type=str)
def parse_flag(name: str, default: bool = True) -> bool:
value = request.args.get(name)
if value is None:
return default
lowered = value.lower()
return lowered not in {"0", "false", "no"}
use_exact = parse_flag("exact", True)
use_fuzzy = parse_flag("fuzzy", True)
use_phrase = parse_flag("phrase", True)
include_external = parse_flag("external", False)
if use_query_string:
use_exact = use_fuzzy = use_phrase = False
search_payload = build_query_payload(
term,
channels=channels,
year=year,
sort="relevant",
use_exact=use_exact,
use_fuzzy=use_fuzzy,
use_phrase=use_phrase,
use_query_string=use_query_string,
include_external=include_external,
)
query = search_payload.get("query", {"match_all": {}})
if start or end:
range_filter: Dict[str, Dict[str, Dict[str, str]]] = {"range": {"date": {}}}
if start:
range_filter["range"]["date"]["gte"] = start
if end:
range_filter["range"]["date"]["lte"] = end
if "bool" in query:
bool_clause = query.setdefault("bool", {})
existing_filter = bool_clause.get("filter")
if existing_filter is None:
bool_clause["filter"] = [range_filter]
elif isinstance(existing_filter, list):
bool_clause["filter"].append(range_filter)
else:
bool_clause["filter"] = [existing_filter, range_filter]
elif query.get("match_all") is not None:
query = {"bool": {"filter": [range_filter]}}
else:
query = {"bool": {"must": [query], "filter": [range_filter]}}
histogram: Dict[str, Any] = {
"field": "date",
"calendar_interval": interval,
"min_doc_count": 0,
}
if start or end:
bounds: Dict[str, str] = {}
if start:
bounds["min"] = start
if end:
bounds["max"] = end
if bounds:
histogram["extended_bounds"] = bounds
channel_terms_size = max(6, len(channels)) if channels else 6
body = {
"size": 0,
"query": query,
"aggs": {
"over_time": {
"date_histogram": histogram,
"aggs": {
"by_channel": {
"terms": {
"field": "channel_id.keyword",
"size": channel_terms_size,
"order": {"_count": "desc"},
},
"aggs": {
"channel_name_hit": {
"top_hits": {
"size": 1,
"_source": {"includes": ["channel_name"]},
"sort": [
{
"channel_name.keyword": {
"order": "asc",
"missing": "_last",
"unmapped_type": "keyword",
}
}
],
}
}
},
}
},
}
},
}
if config.elastic.debug:
LOGGER.info(
"Elasticsearch frequency request: %s",
json.dumps(
{
"index": index,
"body": body,
"term": term,
"interval": interval,
"channels": channels,
"start": start,
"end": end,
"query_string": use_query_string,
},
indent=2,
),
)
response = client.search(index=index, body=body)
if config.elastic.debug:
LOGGER.info(
"Elasticsearch frequency response: %s",
json.dumps(response, indent=2, default=str),
)
raw_buckets = (
response.get("aggregations", {})
.get("over_time", {})
.get("buckets", [])
)
channel_totals: Dict[str, Dict[str, Any]] = {}
buckets: List[Dict[str, Any]] = []
for bucket in raw_buckets:
date_str = bucket.get("key_as_string")
total = bucket.get("doc_count", 0)
channel_entries: List[Dict[str, Any]] = []
for ch_bucket in bucket.get("by_channel", {}).get("buckets", []):
cid = ch_bucket.get("key")
count = ch_bucket.get("doc_count", 0)
if cid:
hit_source = (
ch_bucket.get("channel_name_hit", {})
.get("hits", {})
.get("hits", [{}])[0]
.get("_source", {})
)
channel_name = hit_source.get("channel_name") if isinstance(hit_source, dict) else None
channel_entries.append({"id": cid, "count": count, "name": channel_name})
if cid not in channel_totals:
channel_totals[cid] = {"total": 0, "name": channel_name}
channel_totals[cid]["total"] += count
if channel_name and not channel_totals[cid].get("name"):
channel_totals[cid]["name"] = channel_name
buckets.append(
{"date": date_str, "total": total, "channels": channel_entries}
)
ranked_channels = sorted(
[
{"id": cid, "total": info.get("total", 0), "name": info.get("name")}
for cid, info in channel_totals.items()
],
key=lambda item: item["total"],
reverse=True,
)
payload = {
"term": raw_term if not use_query_string else term,
"interval": interval,
"buckets": buckets,
"channels": ranked_channels,
"totalResults": response.get("hits", {})
.get("total", {})
.get("value", 0),
}
return jsonify(payload)
@app.route("/frequency")
def frequency_page():
return send_from_directory(app.static_folder, "frequency.html")
@app.route("/api/transcript")
def transcript():
video_id = request.args.get("video_id", type=str)
if not video_id:
return ("video_id not set", 400)
response = client.get(index=index, id=video_id, ignore=[404])
if config.elastic.debug:
LOGGER.info(
"Elasticsearch transcript request: index=%s id=%s", index, video_id
)
LOGGER.info(
"Elasticsearch transcript response: %s",
json.dumps(response, indent=2, default=str)
if response
else "None",
)
if not response or not response.get("found"):
return ("not found", 404)
source = response["_source"]
return jsonify(
{
"video_id": source.get("video_id"),
"title": source.get("title"),
"transcript_parts": source.get("transcript_parts", []),
"transcript_full": source.get("transcript_full"),
"transcript_secondary_parts": source.get("transcript_secondary_parts", []),
"transcript_secondary_full": source.get("transcript_secondary_full"),
}
)
return app
def main() -> None: # pragma: no cover
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
app = create_app()
debug_mode = os.environ.get("FLASK_DEBUG", "0").lower() in ("1", "true")
app.run(host="0.0.0.0", port=8080, debug=debug_mode)
if __name__ == "__main__": # pragma: no cover
main()