- Add "Most referenced" sort option to sort by backlink count - Backend now supports sorting by referenced_by_count field - Search results now display reference counts as badges: - Shows number of backlinks (videos linking to this one) - Shows number of internal references (outbound links) - Reference badges appear alongside transcript source badges 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
993 lines
32 KiB
Python
993 lines
32 KiB
Python
"""
|
|
Flask application exposing a minimal search API backed by Elasticsearch.
|
|
|
|
Routes:
|
|
GET / -> Static HTML search page.
|
|
GET /api/channels -> List available channels (via terms aggregation).
|
|
GET /api/search -> Search index with pagination and simple highlighting.
|
|
GET /api/transcript -> Return full transcript for a given video_id.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import copy
|
|
import json
|
|
import logging
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Iterable, List, Optional, Sequence, Set
|
|
|
|
from collections import Counter
|
|
from datetime import datetime
|
|
|
|
from flask import Flask, jsonify, request, send_from_directory
|
|
|
|
from .config import CONFIG, AppConfig
|
|
|
|
try:
|
|
from elasticsearch import Elasticsearch # type: ignore
|
|
from elasticsearch import BadRequestError # type: ignore
|
|
except ImportError: # pragma: no cover - dependency optional
|
|
Elasticsearch = None
|
|
BadRequestError = Exception # type: ignore
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
def _ensure_client(config: AppConfig) -> "Elasticsearch":
|
|
if Elasticsearch is None:
|
|
raise RuntimeError(
|
|
"elasticsearch package not installed. "
|
|
"Install elasticsearch>=7 to run the Flask search app."
|
|
)
|
|
kwargs = {}
|
|
if config.elastic.api_key:
|
|
kwargs["api_key"] = config.elastic.api_key
|
|
elif config.elastic.username and config.elastic.password:
|
|
kwargs["basic_auth"] = (
|
|
config.elastic.username,
|
|
config.elastic.password,
|
|
)
|
|
if config.elastic.ca_cert:
|
|
kwargs["ca_certs"] = str(config.elastic.ca_cert)
|
|
kwargs["verify_certs"] = config.elastic.verify_certs
|
|
return Elasticsearch(config.elastic.url, **kwargs)
|
|
|
|
|
|
def metrics_payload(data_root: Path) -> Dict[str, Any]:
|
|
total_items = 0
|
|
channel_counter: Counter = Counter()
|
|
channel_name_map: Dict[str, str] = {}
|
|
year_counter: Counter = Counter()
|
|
month_counter: Counter = Counter()
|
|
|
|
if not data_root.exists():
|
|
LOGGER.warning("Data directory %s not found; metrics will be empty.", data_root)
|
|
return {
|
|
"totalItems": 0,
|
|
"totalChannels": 0,
|
|
"itemsPerChannel": [],
|
|
"yearHistogram": [],
|
|
"recentMonths": [],
|
|
}
|
|
|
|
for path in data_root.rglob("*.json"):
|
|
try:
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
doc = json.load(handle)
|
|
except Exception:
|
|
continue
|
|
|
|
total_items += 1
|
|
|
|
channel_id = doc.get("channel_id")
|
|
channel_name = doc.get("channel_name") or channel_id
|
|
if channel_id:
|
|
channel_counter[channel_id] += 1
|
|
if channel_name and channel_id not in channel_name_map:
|
|
channel_name_map[channel_id] = channel_name
|
|
|
|
date_value = doc.get("date") or doc.get("published_at")
|
|
dt: Optional[datetime] = None
|
|
if isinstance(date_value, str):
|
|
for fmt in ("%Y-%m-%d", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%SZ"):
|
|
try:
|
|
dt = datetime.strptime(date_value[: len(fmt)], fmt)
|
|
break
|
|
except Exception:
|
|
continue
|
|
elif isinstance(date_value, (int, float)):
|
|
try:
|
|
dt = datetime.fromtimestamp(date_value)
|
|
except Exception:
|
|
dt = None
|
|
|
|
if dt:
|
|
year_counter[str(dt.year)] += 1
|
|
month_counter[dt.strftime("%Y-%m")] += 1
|
|
|
|
items_per_channel = [
|
|
{
|
|
"label": channel_name_map.get(cid, cid),
|
|
"count": count,
|
|
}
|
|
for cid, count in channel_counter.most_common()
|
|
]
|
|
|
|
year_histogram = [
|
|
{"bucket": year, "count": year_counter[year]}
|
|
for year in sorted(year_counter.keys())
|
|
]
|
|
|
|
recent_months = sorted(month_counter.keys())
|
|
recent_months = recent_months[-12:]
|
|
recent_months_payload = [
|
|
{"bucket": month, "count": month_counter[month]} for month in recent_months
|
|
]
|
|
|
|
return {
|
|
"totalItems": total_items,
|
|
"totalChannels": len(channel_counter),
|
|
"itemsPerChannel": items_per_channel,
|
|
"yearHistogram": year_histogram,
|
|
"recentMonths": recent_months_payload,
|
|
}
|
|
|
|
|
|
def elastic_metrics_payload(
|
|
client: "Elasticsearch",
|
|
index: str,
|
|
*,
|
|
channel_field_candidates: Optional[List[str]] = None,
|
|
debug: bool = False,
|
|
) -> Dict[str, Any]:
|
|
if channel_field_candidates is None:
|
|
channel_field_candidates = ["channel_id.keyword", "channel_id"]
|
|
|
|
base_body: Dict[str, Any] = {
|
|
"size": 0,
|
|
"track_total_hits": True,
|
|
"aggs": {
|
|
"channels": {
|
|
"terms": {
|
|
"field": "channel_id.keyword",
|
|
"size": 500,
|
|
"order": {"_count": "desc"},
|
|
},
|
|
"aggs": {
|
|
"name": {
|
|
"top_hits": {
|
|
"size": 1,
|
|
"_source": {"includes": ["channel_name"]},
|
|
}
|
|
}
|
|
},
|
|
},
|
|
"year_histogram": {
|
|
"date_histogram": {
|
|
"field": "date",
|
|
"calendar_interval": "year",
|
|
"format": "yyyy",
|
|
}
|
|
},
|
|
"month_histogram": {
|
|
"date_histogram": {
|
|
"field": "date",
|
|
"calendar_interval": "month",
|
|
"format": "yyyy-MM",
|
|
"order": {"_key": "asc"},
|
|
}
|
|
},
|
|
},
|
|
}
|
|
|
|
last_error: Optional[Exception] = None
|
|
response: Optional[Dict[str, Any]] = None
|
|
for candidate_field in channel_field_candidates:
|
|
body = json.loads(json.dumps(base_body))
|
|
body["aggs"]["channels"]["terms"]["field"] = candidate_field
|
|
try:
|
|
if debug:
|
|
LOGGER.info(
|
|
"Elasticsearch metrics request: %s",
|
|
json.dumps({"index": index, "body": body}, indent=2),
|
|
)
|
|
response = client.search(index=index, body=body)
|
|
break
|
|
except BadRequestError as exc:
|
|
last_error = exc
|
|
if debug:
|
|
LOGGER.warning(
|
|
"Metrics aggregation failed for field %s: %s",
|
|
candidate_field,
|
|
exc,
|
|
)
|
|
if response is None:
|
|
raise last_error or RuntimeError("Unable to compute metrics from Elasticsearch.")
|
|
|
|
hits = response.get("hits", {})
|
|
total_items = hits.get("total", {}).get("value", 0)
|
|
|
|
if debug:
|
|
LOGGER.info(
|
|
"Elasticsearch metrics response: %s",
|
|
json.dumps(response, indent=2, default=str),
|
|
)
|
|
|
|
aggregations = response.get("aggregations", {})
|
|
channel_buckets = aggregations.get("channels", {}).get("buckets", [])
|
|
items_per_channel = []
|
|
for bucket in channel_buckets:
|
|
key = bucket.get("key")
|
|
channel_name = key
|
|
top_hits = (
|
|
bucket.get("name", {})
|
|
.get("hits", {})
|
|
.get("hits", [])
|
|
)
|
|
if top_hits:
|
|
channel_name = (
|
|
top_hits[0]
|
|
.get("_source", {})
|
|
.get("channel_name", channel_name)
|
|
)
|
|
items_per_channel.append(
|
|
{"label": channel_name or key, "count": bucket.get("doc_count", 0)}
|
|
)
|
|
|
|
year_buckets = aggregations.get("year_histogram", {}).get("buckets", [])
|
|
year_histogram = [
|
|
{
|
|
"bucket": bucket.get("key_as_string")
|
|
or str(bucket.get("key")),
|
|
"count": bucket.get("doc_count", 0),
|
|
}
|
|
for bucket in year_buckets
|
|
]
|
|
|
|
month_buckets = aggregations.get("month_histogram", {}).get("buckets", [])
|
|
recent_months_entries = [
|
|
{
|
|
"bucket": bucket.get("key_as_string")
|
|
or str(bucket.get("key")),
|
|
"count": bucket.get("doc_count", 0),
|
|
"_key": bucket.get("key"),
|
|
}
|
|
for bucket in month_buckets
|
|
]
|
|
recent_months_entries.sort(key=lambda item: item.get("_key", 0))
|
|
recent_months_payload = [
|
|
{"bucket": entry["bucket"], "count": entry["count"]}
|
|
for entry in recent_months_entries[-12:]
|
|
]
|
|
|
|
return {
|
|
"totalItems": total_items,
|
|
"totalChannels": len(items_per_channel),
|
|
"itemsPerChannel": items_per_channel,
|
|
"yearHistogram": year_histogram,
|
|
"recentMonths": recent_months_payload,
|
|
}
|
|
|
|
|
|
def parse_channel_params(values: Iterable[Optional[str]]) -> List[str]:
|
|
seen: Set[str] = set()
|
|
channels: List[str] = []
|
|
for value in values:
|
|
if not value:
|
|
continue
|
|
for part in str(value).split(","):
|
|
cleaned = part.strip()
|
|
if not cleaned or cleaned.lower() == "all":
|
|
continue
|
|
if cleaned not in seen:
|
|
seen.add(cleaned)
|
|
channels.append(cleaned)
|
|
return channels
|
|
|
|
|
|
def build_year_filter(year: Optional[str]) -> Optional[Dict]:
|
|
if not year:
|
|
return None
|
|
try:
|
|
year_int = int(year)
|
|
return {
|
|
"range": {
|
|
"date": {
|
|
"gte": f"{year_int}-01-01",
|
|
"lt": f"{year_int + 1}-01-01",
|
|
"format": "yyyy-MM-dd"
|
|
}
|
|
}
|
|
}
|
|
except (ValueError, TypeError):
|
|
return None
|
|
|
|
|
|
def build_channel_filter(channels: Optional[Sequence[str]]) -> Optional[Dict]:
|
|
if not channels:
|
|
return None
|
|
per_channel_clauses: List[Dict[str, Any]] = []
|
|
for value in channels:
|
|
if not value:
|
|
continue
|
|
per_channel_clauses.append(
|
|
{
|
|
"bool": {
|
|
"should": [
|
|
{"term": {"channel_id.keyword": value}},
|
|
{"term": {"channel_id": value}},
|
|
],
|
|
"minimum_should_match": 1,
|
|
}
|
|
}
|
|
)
|
|
if not per_channel_clauses:
|
|
return None
|
|
if len(per_channel_clauses) == 1:
|
|
return per_channel_clauses[0]
|
|
return {
|
|
"bool": {
|
|
"should": per_channel_clauses,
|
|
"minimum_should_match": 1,
|
|
}
|
|
}
|
|
|
|
|
|
def build_query_payload(
|
|
query: str,
|
|
*,
|
|
channels: Optional[Sequence[str]] = None,
|
|
year: Optional[str] = None,
|
|
sort: str = "relevant",
|
|
use_exact: bool = True,
|
|
use_fuzzy: bool = True,
|
|
use_phrase: bool = True,
|
|
use_query_string: bool = False,
|
|
) -> Dict:
|
|
filters: List[Dict] = []
|
|
should: List[Dict] = []
|
|
|
|
channel_filter = build_channel_filter(channels)
|
|
if channel_filter:
|
|
filters.append(channel_filter)
|
|
|
|
year_filter = build_year_filter(year)
|
|
if year_filter:
|
|
filters.append(year_filter)
|
|
|
|
if use_query_string:
|
|
base_fields = ["title^3", "description^2", "transcript_full", "transcript_secondary_full"]
|
|
qs_query = (query or "").strip() or "*"
|
|
query_body: Dict[str, Any] = {
|
|
"query_string": {
|
|
"query": qs_query,
|
|
"default_operator": "AND",
|
|
"fields": base_fields,
|
|
}
|
|
}
|
|
if filters:
|
|
query_body = {"bool": {"must": query_body, "filter": filters}}
|
|
body: Dict = {
|
|
"query": query_body,
|
|
"highlight": {
|
|
"fields": {
|
|
"transcript_full": {
|
|
"fragment_size": 160,
|
|
"number_of_fragments": 5,
|
|
"fragmenter": "span",
|
|
},
|
|
"transcript_secondary_full": {
|
|
"fragment_size": 160,
|
|
"number_of_fragments": 5,
|
|
"fragmenter": "span",
|
|
},
|
|
"title": {"number_of_fragments": 0},
|
|
"description": {
|
|
"fragment_size": 160,
|
|
"number_of_fragments": 1,
|
|
},
|
|
},
|
|
"require_field_match": False,
|
|
"pre_tags": ["<mark>"],
|
|
"post_tags": ["</mark>"],
|
|
"encoder": "html",
|
|
"max_analyzed_offset": 900000,
|
|
},
|
|
}
|
|
if sort == "newer":
|
|
body["sort"] = [{"date": {"order": "desc"}}]
|
|
elif sort == "older":
|
|
body["sort"] = [{"date": {"order": "asc"}}]
|
|
elif sort == "referenced":
|
|
body["sort"] = [{"referenced_by_count": {"order": "desc"}}]
|
|
return body
|
|
|
|
if query:
|
|
base_fields = ["title^3", "description^2", "transcript_full", "transcript_secondary_full"]
|
|
if use_phrase:
|
|
should.append(
|
|
{
|
|
"match_phrase": {
|
|
"transcript_full": {
|
|
"query": query,
|
|
"slop": 2,
|
|
"boost": 10.0,
|
|
}
|
|
}
|
|
}
|
|
)
|
|
should.append(
|
|
{
|
|
"match_phrase": {
|
|
"transcript_secondary_full": {
|
|
"query": query,
|
|
"slop": 2,
|
|
"boost": 10.0,
|
|
}
|
|
}
|
|
}
|
|
)
|
|
if use_fuzzy:
|
|
should.append(
|
|
{
|
|
"multi_match": {
|
|
"query": query,
|
|
"fields": base_fields,
|
|
"type": "best_fields",
|
|
"operator": "and",
|
|
"fuzziness": "AUTO",
|
|
"prefix_length": 1,
|
|
"max_expansions": 50,
|
|
"boost": 1.5,
|
|
}
|
|
}
|
|
)
|
|
if use_exact:
|
|
should.append(
|
|
{
|
|
"multi_match": {
|
|
"query": query,
|
|
"fields": base_fields,
|
|
"type": "best_fields",
|
|
"operator": "and",
|
|
"boost": 3.0,
|
|
}
|
|
}
|
|
)
|
|
|
|
if should:
|
|
query_body: Dict = {
|
|
"bool": {
|
|
"should": should,
|
|
"minimum_should_match": 1,
|
|
}
|
|
}
|
|
if filters:
|
|
query_body["bool"]["filter"] = filters
|
|
elif filters:
|
|
query_body = {"bool": {"filter": filters}}
|
|
else:
|
|
query_body = {"match_all": {}}
|
|
|
|
body: Dict = {
|
|
"query": query_body,
|
|
"highlight": {
|
|
"fields": {
|
|
"transcript_full": {
|
|
"fragment_size": 160,
|
|
"number_of_fragments": 5,
|
|
"fragmenter": "span",
|
|
},
|
|
"transcript_secondary_full": {
|
|
"fragment_size": 160,
|
|
"number_of_fragments": 5,
|
|
"fragmenter": "span",
|
|
},
|
|
"title": {"number_of_fragments": 0},
|
|
"description": {
|
|
"fragment_size": 160,
|
|
"number_of_fragments": 1,
|
|
},
|
|
},
|
|
"require_field_match": False,
|
|
"pre_tags": ["<mark>"],
|
|
"post_tags": ["</mark>"],
|
|
"encoder": "html",
|
|
"max_analyzed_offset": 900000,
|
|
},
|
|
}
|
|
if query_body.get("match_all") is None:
|
|
body["highlight"]["highlight_query"] = copy.deepcopy(query_body)
|
|
|
|
if sort == "newer":
|
|
body["sort"] = [{"date": {"order": "desc"}}]
|
|
elif sort == "older":
|
|
body["sort"] = [{"date": {"order": "asc"}}]
|
|
elif sort == "referenced":
|
|
body["sort"] = [{"referenced_by_count": {"order": "desc"}}]
|
|
return body
|
|
|
|
|
|
def create_app(config: AppConfig = CONFIG) -> Flask:
|
|
app = Flask(__name__, static_folder=str(Path(__file__).parent / "static"))
|
|
client = _ensure_client(config)
|
|
index = config.elastic.index
|
|
|
|
@app.route("/")
|
|
def index_page():
|
|
return send_from_directory(app.static_folder, "index.html")
|
|
|
|
@app.route("/static/<path:filename>")
|
|
def static_files(filename: str):
|
|
return send_from_directory(app.static_folder, filename)
|
|
|
|
@app.route("/api/channels")
|
|
def channels():
|
|
base_channels_body = {
|
|
"size": 0,
|
|
"aggs": {
|
|
"channels": {
|
|
"terms": {"field": "channel_id", "size": 200},
|
|
"aggs": {
|
|
"name": {
|
|
"top_hits": {
|
|
"size": 1,
|
|
"_source": {"includes": ["channel_name"]},
|
|
}
|
|
}
|
|
},
|
|
}
|
|
},
|
|
}
|
|
|
|
def run_channels_request(field_name: str):
|
|
body = json.loads(json.dumps(base_channels_body)) # deep copy
|
|
body["aggs"]["channels"]["terms"]["field"] = field_name
|
|
if config.elastic.debug:
|
|
LOGGER.info(
|
|
"Elasticsearch channels request: %s",
|
|
json.dumps({"index": index, "body": body}, indent=2),
|
|
)
|
|
return client.search(index=index, body=body)
|
|
|
|
response = None
|
|
last_error = None
|
|
for candidate_field in ("channel_id.keyword", "channel_id"):
|
|
try:
|
|
response = run_channels_request(candidate_field)
|
|
if config.elastic.debug:
|
|
LOGGER.info("Channels aggregation used field: %s", candidate_field)
|
|
break
|
|
except BadRequestError as exc:
|
|
last_error = exc
|
|
if config.elastic.debug:
|
|
LOGGER.warning(
|
|
"Channels aggregation failed for field %s: %s",
|
|
candidate_field,
|
|
exc,
|
|
)
|
|
if response is None:
|
|
raise last_error or RuntimeError("Unable to aggregate channels.")
|
|
|
|
if config.elastic.debug:
|
|
LOGGER.info(
|
|
"Elasticsearch channels response: %s",
|
|
json.dumps(response, indent=2, default=str),
|
|
)
|
|
buckets = (
|
|
response.get("aggregations", {})
|
|
.get("channels", {})
|
|
.get("buckets", [])
|
|
)
|
|
data = [
|
|
{
|
|
"Id": bucket.get("key"),
|
|
"Name": (
|
|
bucket.get("name", {})
|
|
.get("hits", {})
|
|
.get("hits", [{}])[0]
|
|
.get("_source", {})
|
|
.get("channel_name", bucket.get("key"))
|
|
),
|
|
"Count": bucket.get("doc_count", 0),
|
|
}
|
|
for bucket in buckets
|
|
]
|
|
data.sort(key=lambda item: item["Name"].lower())
|
|
return jsonify(data)
|
|
|
|
@app.route("/api/years")
|
|
def years():
|
|
body = {
|
|
"size": 0,
|
|
"aggs": {
|
|
"years": {
|
|
"date_histogram": {
|
|
"field": "date",
|
|
"calendar_interval": "year",
|
|
"format": "yyyy",
|
|
"order": {"_key": "desc"}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if config.elastic.debug:
|
|
LOGGER.info(
|
|
"Elasticsearch years request: %s",
|
|
json.dumps({"index": index, "body": body}, indent=2),
|
|
)
|
|
|
|
response = client.search(index=index, body=body)
|
|
|
|
if config.elastic.debug:
|
|
LOGGER.info(
|
|
"Elasticsearch years response: %s",
|
|
json.dumps(response, indent=2, default=str),
|
|
)
|
|
|
|
buckets = (
|
|
response.get("aggregations", {})
|
|
.get("years", {})
|
|
.get("buckets", [])
|
|
)
|
|
|
|
data = [
|
|
{
|
|
"Year": bucket.get("key_as_string"),
|
|
"Count": bucket.get("doc_count", 0),
|
|
}
|
|
for bucket in buckets
|
|
if bucket.get("doc_count", 0) > 0
|
|
]
|
|
|
|
return jsonify(data)
|
|
|
|
@app.route("/api/search")
|
|
def search():
|
|
query = request.args.get("q", "", type=str)
|
|
raw_channels: List[Optional[str]] = request.args.getlist("channel_id")
|
|
legacy_channel = request.args.get("channel", type=str)
|
|
if legacy_channel:
|
|
raw_channels.append(legacy_channel)
|
|
channels = parse_channel_params(raw_channels)
|
|
year = request.args.get("year", "", type=str) or None
|
|
sort = request.args.get("sort", "relevant", type=str)
|
|
page = max(request.args.get("page", 0, type=int), 0)
|
|
size = max(request.args.get("size", 10, type=int), 1)
|
|
|
|
def parse_flag(name: str, default: bool = True) -> bool:
|
|
value = request.args.get(name)
|
|
if value is None:
|
|
return default
|
|
return value.lower() not in {"0", "false", "no"}
|
|
|
|
use_exact = parse_flag("exact", True)
|
|
use_fuzzy = parse_flag("fuzzy", True)
|
|
use_phrase = parse_flag("phrase", True)
|
|
use_query_string = parse_flag("query_string", False)
|
|
if use_query_string:
|
|
use_exact = use_fuzzy = use_phrase = False
|
|
|
|
payload = build_query_payload(
|
|
query,
|
|
channels=channels,
|
|
year=year,
|
|
sort=sort,
|
|
use_exact=use_exact,
|
|
use_fuzzy=use_fuzzy,
|
|
use_phrase=use_phrase,
|
|
use_query_string=use_query_string,
|
|
)
|
|
start = page * size
|
|
if config.elastic.debug:
|
|
LOGGER.info(
|
|
"Elasticsearch search request: %s",
|
|
json.dumps(
|
|
{
|
|
"index": index,
|
|
"from": start,
|
|
"size": size,
|
|
"body": payload,
|
|
"channels": channels,
|
|
"toggles": {
|
|
"exact": use_exact,
|
|
"fuzzy": use_fuzzy,
|
|
"phrase": use_phrase,
|
|
},
|
|
},
|
|
indent=2,
|
|
),
|
|
)
|
|
response = client.search(
|
|
index=index,
|
|
from_=start,
|
|
size=size,
|
|
body=payload,
|
|
)
|
|
if config.elastic.debug:
|
|
LOGGER.info(
|
|
"Elasticsearch search response: %s",
|
|
json.dumps(response, indent=2, default=str),
|
|
)
|
|
|
|
hits = response.get("hits", {})
|
|
total = hits.get("total", {}).get("value", 0)
|
|
documents = []
|
|
for hit in hits.get("hits", []):
|
|
source = hit.get("_source", {})
|
|
highlight_map = hit.get("highlight", {})
|
|
transcript_highlight = (
|
|
(highlight_map.get("transcript_full", []) or [])
|
|
+ (highlight_map.get("transcript_secondary_full", []) or [])
|
|
)
|
|
|
|
title_html = (
|
|
highlight_map.get("title")
|
|
or [source.get("title") or "Untitled"]
|
|
)[0]
|
|
description_html = (
|
|
highlight_map.get("description")
|
|
or [source.get("description") or ""]
|
|
)[0]
|
|
documents.append(
|
|
{
|
|
"video_id": source.get("video_id"),
|
|
"channel_id": source.get("channel_id"),
|
|
"channel_name": source.get("channel_name"),
|
|
"title": source.get("title"),
|
|
"titleHtml": title_html,
|
|
"description": source.get("description"),
|
|
"descriptionHtml": description_html,
|
|
"date": source.get("date"),
|
|
"url": source.get("url"),
|
|
"toHighlight": transcript_highlight,
|
|
"highlightSource": {
|
|
"primary": bool(highlight_map.get("transcript_full")),
|
|
"secondary": bool(highlight_map.get("transcript_secondary_full")),
|
|
},
|
|
"internal_references_count": source.get("internal_references_count", 0),
|
|
"referenced_by_count": source.get("referenced_by_count", 0),
|
|
}
|
|
)
|
|
|
|
return jsonify(
|
|
{
|
|
"items": documents,
|
|
"totalResults": total,
|
|
"totalPages": (total + size - 1) // size,
|
|
"currentPage": page,
|
|
}
|
|
)
|
|
|
|
@app.route("/api/metrics")
|
|
def metrics():
|
|
try:
|
|
data = elastic_metrics_payload(
|
|
client,
|
|
index,
|
|
channel_field_candidates=["channel_id.keyword", "channel_id"],
|
|
debug=config.elastic.debug,
|
|
)
|
|
except Exception:
|
|
LOGGER.exception(
|
|
"Falling back to local metrics payload due to Elasticsearch error.",
|
|
exc_info=True,
|
|
)
|
|
data = metrics_payload(config.data.root)
|
|
return jsonify(data)
|
|
|
|
@app.route("/api/frequency")
|
|
def frequency():
|
|
raw_term = request.args.get("term", type=str) or ""
|
|
use_query_string = request.args.get("query_string", default="0", type=str)
|
|
use_query_string = (use_query_string or "").lower() in {"1", "true", "yes"}
|
|
term = raw_term.strip()
|
|
if not term and not use_query_string:
|
|
return ("term parameter is required", 400)
|
|
if use_query_string and not term:
|
|
term = "*"
|
|
|
|
raw_channels: List[Optional[str]] = request.args.getlist("channel_id")
|
|
legacy_channel = request.args.get("channel", type=str)
|
|
if legacy_channel:
|
|
raw_channels.append(legacy_channel)
|
|
channels = parse_channel_params(raw_channels)
|
|
year = request.args.get("year", "", type=str) or None
|
|
interval = (request.args.get("interval", "month") or "month").lower()
|
|
allowed_intervals = {"day", "week", "month", "quarter", "year"}
|
|
if interval not in allowed_intervals:
|
|
interval = "month"
|
|
start = request.args.get("start", type=str)
|
|
end = request.args.get("end", type=str)
|
|
|
|
filters: List[Dict] = []
|
|
channel_filter = build_channel_filter(channels)
|
|
if channel_filter:
|
|
filters.append(channel_filter)
|
|
year_filter = build_year_filter(year)
|
|
if year_filter:
|
|
filters.append(year_filter)
|
|
if start or end:
|
|
range_filter: Dict[str, Dict[str, Dict[str, str]]] = {"range": {"date": {}}}
|
|
if start:
|
|
range_filter["range"]["date"]["gte"] = start
|
|
if end:
|
|
range_filter["range"]["date"]["lte"] = end
|
|
filters.append(range_filter)
|
|
|
|
base_fields = ["title^3", "description^2", "transcript_full", "transcript_secondary_full"]
|
|
if use_query_string:
|
|
qs_query = term or "*"
|
|
must_clause: List[Dict[str, Any]] = [
|
|
{
|
|
"query_string": {
|
|
"query": qs_query,
|
|
"default_operator": "AND",
|
|
"fields": base_fields,
|
|
}
|
|
}
|
|
]
|
|
else:
|
|
must_clause = [
|
|
{
|
|
"multi_match": {
|
|
"query": term,
|
|
"fields": base_fields,
|
|
"type": "best_fields",
|
|
"operator": "and",
|
|
}
|
|
}
|
|
]
|
|
|
|
query: Dict[str, Any] = {"bool": {"must": must_clause}}
|
|
if filters:
|
|
query["bool"]["filter"] = filters
|
|
|
|
histogram: Dict[str, Any] = {
|
|
"field": "date",
|
|
"calendar_interval": interval,
|
|
"min_doc_count": 0,
|
|
}
|
|
if start or end:
|
|
bounds: Dict[str, str] = {}
|
|
if start:
|
|
bounds["min"] = start
|
|
if end:
|
|
bounds["max"] = end
|
|
if bounds:
|
|
histogram["extended_bounds"] = bounds
|
|
|
|
channel_terms_size = max(6, len(channels)) if channels else 6
|
|
|
|
body = {
|
|
"size": 0,
|
|
"query": query,
|
|
"aggs": {
|
|
"over_time": {
|
|
"date_histogram": histogram,
|
|
"aggs": {
|
|
"by_channel": {
|
|
"terms": {
|
|
"field": "channel_id.keyword",
|
|
"size": channel_terms_size,
|
|
"order": {"_count": "desc"},
|
|
}
|
|
}
|
|
},
|
|
}
|
|
},
|
|
}
|
|
|
|
if config.elastic.debug:
|
|
LOGGER.info(
|
|
"Elasticsearch frequency request: %s",
|
|
json.dumps(
|
|
{
|
|
"index": index,
|
|
"body": body,
|
|
"term": term,
|
|
"interval": interval,
|
|
"channels": channels,
|
|
"start": start,
|
|
"end": end,
|
|
"query_string": use_query_string,
|
|
},
|
|
indent=2,
|
|
),
|
|
)
|
|
|
|
response = client.search(index=index, body=body)
|
|
|
|
if config.elastic.debug:
|
|
LOGGER.info(
|
|
"Elasticsearch frequency response: %s",
|
|
json.dumps(response, indent=2, default=str),
|
|
)
|
|
|
|
raw_buckets = (
|
|
response.get("aggregations", {})
|
|
.get("over_time", {})
|
|
.get("buckets", [])
|
|
)
|
|
|
|
channel_totals: Dict[str, int] = {}
|
|
buckets: List[Dict[str, Any]] = []
|
|
for bucket in raw_buckets:
|
|
date_str = bucket.get("key_as_string")
|
|
total = bucket.get("doc_count", 0)
|
|
channel_entries: List[Dict[str, Any]] = []
|
|
for ch_bucket in bucket.get("by_channel", {}).get("buckets", []):
|
|
cid = ch_bucket.get("key")
|
|
count = ch_bucket.get("doc_count", 0)
|
|
if cid:
|
|
channel_entries.append({"id": cid, "count": count})
|
|
channel_totals[cid] = channel_totals.get(cid, 0) + count
|
|
buckets.append(
|
|
{"date": date_str, "total": total, "channels": channel_entries}
|
|
)
|
|
|
|
ranked_channels = sorted(
|
|
[{"id": cid, "total": total} for cid, total in channel_totals.items()],
|
|
key=lambda item: item["total"],
|
|
reverse=True,
|
|
)
|
|
|
|
payload = {
|
|
"term": raw_term if not use_query_string else term,
|
|
"interval": interval,
|
|
"buckets": buckets,
|
|
"channels": ranked_channels,
|
|
"totalResults": response.get("hits", {})
|
|
.get("total", {})
|
|
.get("value", 0),
|
|
}
|
|
return jsonify(payload)
|
|
|
|
@app.route("/frequency")
|
|
def frequency_page():
|
|
return send_from_directory(app.static_folder, "frequency.html")
|
|
|
|
@app.route("/api/transcript")
|
|
def transcript():
|
|
video_id = request.args.get("video_id", type=str)
|
|
if not video_id:
|
|
return ("video_id not set", 400)
|
|
response = client.get(index=index, id=video_id, ignore=[404])
|
|
if config.elastic.debug:
|
|
LOGGER.info(
|
|
"Elasticsearch transcript request: index=%s id=%s", index, video_id
|
|
)
|
|
LOGGER.info(
|
|
"Elasticsearch transcript response: %s",
|
|
json.dumps(response, indent=2, default=str)
|
|
if response
|
|
else "None",
|
|
)
|
|
if not response or not response.get("found"):
|
|
return ("not found", 404)
|
|
source = response["_source"]
|
|
return jsonify(
|
|
{
|
|
"video_id": source.get("video_id"),
|
|
"title": source.get("title"),
|
|
"transcript_parts": source.get("transcript_parts", []),
|
|
"transcript_full": source.get("transcript_full"),
|
|
"transcript_secondary_parts": source.get("transcript_secondary_parts", []),
|
|
"transcript_secondary_full": source.get("transcript_secondary_full"),
|
|
}
|
|
)
|
|
|
|
return app
|
|
|
|
|
|
def main() -> None: # pragma: no cover
|
|
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
|
|
app = create_app()
|
|
app.run(host="0.0.0.0", port=8080, debug=True)
|
|
|
|
|
|
if __name__ == "__main__": # pragma: no cover
|
|
main()
|