"""
Flask application exposing a minimal search API backed by Elasticsearch.
Routes:
GET / -> Static HTML search page.
GET /api/channels -> List available channels (via terms aggregation).
GET /api/search -> Search index with pagination and simple highlighting.
GET /api/transcript -> Return full transcript for a given video_id.
"""
from __future__ import annotations
import copy
import json
import logging
import re
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Sequence, Set
from collections import Counter
from datetime import datetime
from flask import Flask, jsonify, request, send_from_directory
from .config import CONFIG, AppConfig
try:
from elasticsearch import Elasticsearch # type: ignore
from elasticsearch import BadRequestError # type: ignore
except ImportError: # pragma: no cover - dependency optional
Elasticsearch = None
BadRequestError = Exception # type: ignore
LOGGER = logging.getLogger(__name__)
def _ensure_client(config: AppConfig) -> "Elasticsearch":
if Elasticsearch is None:
raise RuntimeError(
"elasticsearch package not installed. "
"Install elasticsearch>=7 to run the Flask search app."
)
kwargs = {}
if config.elastic.api_key:
kwargs["api_key"] = config.elastic.api_key
elif config.elastic.username and config.elastic.password:
kwargs["basic_auth"] = (
config.elastic.username,
config.elastic.password,
)
if config.elastic.ca_cert:
kwargs["ca_certs"] = str(config.elastic.ca_cert)
kwargs["verify_certs"] = config.elastic.verify_certs
return Elasticsearch(config.elastic.url, **kwargs)
def metrics_payload(data_root: Path) -> Dict[str, Any]:
total_items = 0
channel_counter: Counter = Counter()
channel_name_map: Dict[str, str] = {}
year_counter: Counter = Counter()
month_counter: Counter = Counter()
if not data_root.exists():
LOGGER.warning("Data directory %s not found; metrics will be empty.", data_root)
return {
"totalItems": 0,
"totalChannels": 0,
"itemsPerChannel": [],
"yearHistogram": [],
"recentMonths": [],
}
for path in data_root.rglob("*.json"):
try:
with path.open("r", encoding="utf-8") as handle:
doc = json.load(handle)
except Exception:
continue
total_items += 1
channel_id = doc.get("channel_id")
channel_name = doc.get("channel_name") or channel_id
if channel_id:
channel_counter[channel_id] += 1
if channel_name and channel_id not in channel_name_map:
channel_name_map[channel_id] = channel_name
date_value = doc.get("date") or doc.get("published_at")
dt: Optional[datetime] = None
if isinstance(date_value, str):
for fmt in ("%Y-%m-%d", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%SZ"):
try:
dt = datetime.strptime(date_value[: len(fmt)], fmt)
break
except Exception:
continue
elif isinstance(date_value, (int, float)):
try:
dt = datetime.fromtimestamp(date_value)
except Exception:
dt = None
if dt:
year_counter[str(dt.year)] += 1
month_counter[dt.strftime("%Y-%m")] += 1
items_per_channel = [
{
"label": channel_name_map.get(cid, cid),
"count": count,
}
for cid, count in channel_counter.most_common()
]
year_histogram = [
{"bucket": year, "count": year_counter[year]}
for year in sorted(year_counter.keys())
]
recent_months = sorted(month_counter.keys())
recent_months = recent_months[-12:]
recent_months_payload = [
{"bucket": month, "count": month_counter[month]} for month in recent_months
]
return {
"totalItems": total_items,
"totalChannels": len(channel_counter),
"itemsPerChannel": items_per_channel,
"yearHistogram": year_histogram,
"recentMonths": recent_months_payload,
}
def elastic_metrics_payload(
client: "Elasticsearch",
index: str,
*,
channel_field_candidates: Optional[List[str]] = None,
debug: bool = False,
) -> Dict[str, Any]:
if channel_field_candidates is None:
channel_field_candidates = ["channel_id.keyword", "channel_id"]
base_body: Dict[str, Any] = {
"size": 0,
"track_total_hits": True,
"aggs": {
"channels": {
"terms": {
"field": "channel_id.keyword",
"size": 500,
"order": {"_count": "desc"},
},
"aggs": {
"name": {
"top_hits": {
"size": 1,
"_source": {"includes": ["channel_name"]},
}
}
},
},
"year_histogram": {
"date_histogram": {
"field": "date",
"calendar_interval": "year",
"format": "yyyy",
}
},
"month_histogram": {
"date_histogram": {
"field": "date",
"calendar_interval": "month",
"format": "yyyy-MM",
"order": {"_key": "asc"},
}
},
},
}
last_error: Optional[Exception] = None
response: Optional[Dict[str, Any]] = None
for candidate_field in channel_field_candidates:
body = json.loads(json.dumps(base_body))
body["aggs"]["channels"]["terms"]["field"] = candidate_field
try:
if debug:
LOGGER.info(
"Elasticsearch metrics request: %s",
json.dumps({"index": index, "body": body}, indent=2),
)
response = client.search(index=index, body=body)
break
except BadRequestError as exc:
last_error = exc
if debug:
LOGGER.warning(
"Metrics aggregation failed for field %s: %s",
candidate_field,
exc,
)
if response is None:
raise last_error or RuntimeError("Unable to compute metrics from Elasticsearch.")
hits = response.get("hits", {})
total_items = hits.get("total", {}).get("value", 0)
if debug:
LOGGER.info(
"Elasticsearch metrics response: %s",
json.dumps(response, indent=2, default=str),
)
aggregations = response.get("aggregations", {})
channel_buckets = aggregations.get("channels", {}).get("buckets", [])
items_per_channel = []
for bucket in channel_buckets:
key = bucket.get("key")
channel_name = key
top_hits = (
bucket.get("name", {})
.get("hits", {})
.get("hits", [])
)
if top_hits:
channel_name = (
top_hits[0]
.get("_source", {})
.get("channel_name", channel_name)
)
items_per_channel.append(
{"label": channel_name or key, "count": bucket.get("doc_count", 0)}
)
year_buckets = aggregations.get("year_histogram", {}).get("buckets", [])
year_histogram = [
{
"bucket": bucket.get("key_as_string")
or str(bucket.get("key")),
"count": bucket.get("doc_count", 0),
}
for bucket in year_buckets
]
month_buckets = aggregations.get("month_histogram", {}).get("buckets", [])
recent_months_entries = [
{
"bucket": bucket.get("key_as_string")
or str(bucket.get("key")),
"count": bucket.get("doc_count", 0),
"_key": bucket.get("key"),
}
for bucket in month_buckets
]
recent_months_entries.sort(key=lambda item: item.get("_key", 0))
recent_months_payload = [
{"bucket": entry["bucket"], "count": entry["count"]}
for entry in recent_months_entries[-12:]
]
return {
"totalItems": total_items,
"totalChannels": len(items_per_channel),
"itemsPerChannel": items_per_channel,
"yearHistogram": year_histogram,
"recentMonths": recent_months_payload,
}
def parse_channel_params(values: Iterable[Optional[str]]) -> List[str]:
seen: Set[str] = set()
channels: List[str] = []
for value in values:
if not value:
continue
for part in str(value).split(","):
cleaned = part.strip()
if not cleaned or cleaned.lower() == "all":
continue
if cleaned not in seen:
seen.add(cleaned)
channels.append(cleaned)
return channels
def build_year_filter(year: Optional[str]) -> Optional[Dict]:
if not year:
return None
try:
year_int = int(year)
return {
"range": {
"date": {
"gte": f"{year_int}-01-01",
"lt": f"{year_int + 1}-01-01",
"format": "yyyy-MM-dd"
}
}
}
except (ValueError, TypeError):
return None
def build_channel_filter(channels: Optional[Sequence[str]]) -> Optional[Dict]:
if not channels:
return None
per_channel_clauses: List[Dict[str, Any]] = []
for value in channels:
if not value:
continue
per_channel_clauses.append(
{
"bool": {
"should": [
{"term": {"channel_id.keyword": value}},
{"term": {"channel_id": value}},
],
"minimum_should_match": 1,
}
}
)
if not per_channel_clauses:
return None
if len(per_channel_clauses) == 1:
return per_channel_clauses[0]
return {
"bool": {
"should": per_channel_clauses,
"minimum_should_match": 1,
}
}
def build_query_payload(
query: str,
*,
channels: Optional[Sequence[str]] = None,
year: Optional[str] = None,
sort: str = "relevant",
use_exact: bool = True,
use_fuzzy: bool = True,
use_phrase: bool = True,
use_query_string: bool = False,
) -> Dict:
filters: List[Dict] = []
should: List[Dict] = []
channel_filter = build_channel_filter(channels)
if channel_filter:
filters.append(channel_filter)
year_filter = build_year_filter(year)
if year_filter:
filters.append(year_filter)
if use_query_string:
base_fields = ["title^3", "description^2", "transcript_full", "transcript_secondary_full"]
qs_query = (query or "").strip() or "*"
query_body: Dict[str, Any] = {
"query_string": {
"query": qs_query,
"default_operator": "AND",
"fields": base_fields,
}
}
if filters:
query_body = {"bool": {"must": query_body, "filter": filters}}
body: Dict = {
"query": query_body,
"highlight": {
"fields": {
"transcript_full": {
"fragment_size": 160,
"number_of_fragments": 5,
"fragmenter": "span",
},
"transcript_secondary_full": {
"fragment_size": 160,
"number_of_fragments": 5,
"fragmenter": "span",
},
"title": {"number_of_fragments": 0},
"description": {
"fragment_size": 160,
"number_of_fragments": 1,
},
},
"require_field_match": False,
"pre_tags": [""],
"post_tags": [""],
"encoder": "html",
"max_analyzed_offset": 900000,
},
}
if sort == "newer":
body["sort"] = [{"date": {"order": "desc"}}]
elif sort == "older":
body["sort"] = [{"date": {"order": "asc"}}]
elif sort == "referenced":
body["sort"] = [{"referenced_by_count": {"order": "desc", "unmapped_type": "long"}}]
return body
if query:
base_fields = ["title^3", "description^2", "transcript_full", "transcript_secondary_full"]
if use_phrase:
should.append(
{
"match_phrase": {
"transcript_full": {
"query": query,
"slop": 2,
"boost": 10.0,
}
}
}
)
should.append(
{
"match_phrase": {
"transcript_secondary_full": {
"query": query,
"slop": 2,
"boost": 10.0,
}
}
}
)
if use_fuzzy:
should.append(
{
"multi_match": {
"query": query,
"fields": base_fields,
"type": "best_fields",
"operator": "and",
"fuzziness": "AUTO",
"prefix_length": 1,
"max_expansions": 50,
"boost": 1.5,
}
}
)
if use_exact:
should.append(
{
"multi_match": {
"query": query,
"fields": base_fields,
"type": "best_fields",
"operator": "and",
"boost": 3.0,
}
}
)
if should:
query_body: Dict = {
"bool": {
"should": should,
"minimum_should_match": 1,
}
}
if filters:
query_body["bool"]["filter"] = filters
elif filters:
query_body = {"bool": {"filter": filters}}
else:
query_body = {"match_all": {}}
body: Dict = {
"query": query_body,
"highlight": {
"fields": {
"transcript_full": {
"fragment_size": 160,
"number_of_fragments": 5,
"fragmenter": "span",
},
"transcript_secondary_full": {
"fragment_size": 160,
"number_of_fragments": 5,
"fragmenter": "span",
},
"title": {"number_of_fragments": 0},
"description": {
"fragment_size": 160,
"number_of_fragments": 1,
},
},
"require_field_match": False,
"pre_tags": [""],
"post_tags": [""],
"encoder": "html",
"max_analyzed_offset": 900000,
},
}
if query_body.get("match_all") is None:
body["highlight"]["highlight_query"] = copy.deepcopy(query_body)
if sort == "newer":
body["sort"] = [{"date": {"order": "desc"}}]
elif sort == "older":
body["sort"] = [{"date": {"order": "asc"}}]
elif sort == "referenced":
body["sort"] = [{"referenced_by_count": {"order": "desc", "unmapped_type": "long"}}]
return body
def create_app(config: AppConfig = CONFIG) -> Flask:
app = Flask(__name__, static_folder=str(Path(__file__).parent / "static"))
client = _ensure_client(config)
index = config.elastic.index
@app.route("/")
def index_page():
return send_from_directory(app.static_folder, "index.html")
@app.route("/static/")
def static_files(filename: str):
return send_from_directory(app.static_folder, filename)
@app.route("/api/channels")
def channels():
base_channels_body = {
"size": 0,
"aggs": {
"channels": {
"terms": {"field": "channel_id", "size": 200},
"aggs": {
"name": {
"top_hits": {
"size": 1,
"_source": {"includes": ["channel_name"]},
}
}
},
}
},
}
def run_channels_request(field_name: str):
body = json.loads(json.dumps(base_channels_body)) # deep copy
body["aggs"]["channels"]["terms"]["field"] = field_name
if config.elastic.debug:
LOGGER.info(
"Elasticsearch channels request: %s",
json.dumps({"index": index, "body": body}, indent=2),
)
return client.search(index=index, body=body)
response = None
last_error = None
for candidate_field in ("channel_id.keyword", "channel_id"):
try:
response = run_channels_request(candidate_field)
if config.elastic.debug:
LOGGER.info("Channels aggregation used field: %s", candidate_field)
break
except BadRequestError as exc:
last_error = exc
if config.elastic.debug:
LOGGER.warning(
"Channels aggregation failed for field %s: %s",
candidate_field,
exc,
)
if response is None:
raise last_error or RuntimeError("Unable to aggregate channels.")
if config.elastic.debug:
LOGGER.info(
"Elasticsearch channels response: %s",
json.dumps(response, indent=2, default=str),
)
buckets = (
response.get("aggregations", {})
.get("channels", {})
.get("buckets", [])
)
data = [
{
"Id": bucket.get("key"),
"Name": (
bucket.get("name", {})
.get("hits", {})
.get("hits", [{}])[0]
.get("_source", {})
.get("channel_name", bucket.get("key"))
),
"Count": bucket.get("doc_count", 0),
}
for bucket in buckets
]
data.sort(key=lambda item: item["Name"].lower())
return jsonify(data)
@app.route("/api/years")
def years():
body = {
"size": 0,
"aggs": {
"years": {
"date_histogram": {
"field": "date",
"calendar_interval": "year",
"format": "yyyy",
"order": {"_key": "desc"}
}
}
}
}
if config.elastic.debug:
LOGGER.info(
"Elasticsearch years request: %s",
json.dumps({"index": index, "body": body}, indent=2),
)
response = client.search(index=index, body=body)
if config.elastic.debug:
LOGGER.info(
"Elasticsearch years response: %s",
json.dumps(response, indent=2, default=str),
)
buckets = (
response.get("aggregations", {})
.get("years", {})
.get("buckets", [])
)
data = [
{
"Year": bucket.get("key_as_string"),
"Count": bucket.get("doc_count", 0),
}
for bucket in buckets
if bucket.get("doc_count", 0) > 0
]
return jsonify(data)
@app.route("/api/search")
def search():
query = request.args.get("q", "", type=str)
raw_channels: List[Optional[str]] = request.args.getlist("channel_id")
legacy_channel = request.args.get("channel", type=str)
if legacy_channel:
raw_channels.append(legacy_channel)
channels = parse_channel_params(raw_channels)
year = request.args.get("year", "", type=str) or None
sort = request.args.get("sort", "relevant", type=str)
page = max(request.args.get("page", 0, type=int), 0)
size = max(request.args.get("size", 10, type=int), 1)
def parse_flag(name: str, default: bool = True) -> bool:
value = request.args.get(name)
if value is None:
return default
return value.lower() not in {"0", "false", "no"}
use_exact = parse_flag("exact", True)
use_fuzzy = parse_flag("fuzzy", True)
use_phrase = parse_flag("phrase", True)
use_query_string = parse_flag("query_string", False)
if use_query_string:
use_exact = use_fuzzy = use_phrase = False
payload = build_query_payload(
query,
channels=channels,
year=year,
sort=sort,
use_exact=use_exact,
use_fuzzy=use_fuzzy,
use_phrase=use_phrase,
use_query_string=use_query_string,
)
start = page * size
if config.elastic.debug:
LOGGER.info(
"Elasticsearch search request: %s",
json.dumps(
{
"index": index,
"from": start,
"size": size,
"body": payload,
"channels": channels,
"toggles": {
"exact": use_exact,
"fuzzy": use_fuzzy,
"phrase": use_phrase,
},
},
indent=2,
),
)
response = client.search(
index=index,
from_=start,
size=size,
body=payload,
)
if config.elastic.debug:
LOGGER.info(
"Elasticsearch search response: %s",
json.dumps(response, indent=2, default=str),
)
hits = response.get("hits", {})
total = hits.get("total", {}).get("value", 0)
documents = []
for hit in hits.get("hits", []):
source = hit.get("_source", {})
highlight_map = hit.get("highlight", {})
transcript_highlight = (
(highlight_map.get("transcript_full", []) or [])
+ (highlight_map.get("transcript_secondary_full", []) or [])
)
title_html = (
highlight_map.get("title")
or [source.get("title") or "Untitled"]
)[0]
description_html = (
highlight_map.get("description")
or [source.get("description") or ""]
)[0]
documents.append(
{
"video_id": source.get("video_id"),
"channel_id": source.get("channel_id"),
"channel_name": source.get("channel_name"),
"title": source.get("title"),
"titleHtml": title_html,
"description": source.get("description"),
"descriptionHtml": description_html,
"date": source.get("date"),
"url": source.get("url"),
"toHighlight": transcript_highlight,
"highlightSource": {
"primary": bool(highlight_map.get("transcript_full")),
"secondary": bool(highlight_map.get("transcript_secondary_full")),
},
"internal_references_count": source.get("internal_references_count", 0),
"referenced_by_count": source.get("referenced_by_count", 0),
}
)
return jsonify(
{
"items": documents,
"totalResults": total,
"totalPages": (total + size - 1) // size,
"currentPage": page,
}
)
@app.route("/api/metrics")
def metrics():
try:
data = elastic_metrics_payload(
client,
index,
channel_field_candidates=["channel_id.keyword", "channel_id"],
debug=config.elastic.debug,
)
except Exception:
LOGGER.exception(
"Falling back to local metrics payload due to Elasticsearch error.",
exc_info=True,
)
data = metrics_payload(config.data.root)
return jsonify(data)
@app.route("/api/frequency")
def frequency():
raw_term = request.args.get("term", type=str) or ""
use_query_string = request.args.get("query_string", default="0", type=str)
use_query_string = (use_query_string or "").lower() in {"1", "true", "yes"}
term = raw_term.strip()
if not term and not use_query_string:
return ("term parameter is required", 400)
if use_query_string and not term:
term = "*"
raw_channels: List[Optional[str]] = request.args.getlist("channel_id")
legacy_channel = request.args.get("channel", type=str)
if legacy_channel:
raw_channels.append(legacy_channel)
channels = parse_channel_params(raw_channels)
year = request.args.get("year", "", type=str) or None
interval = (request.args.get("interval", "month") or "month").lower()
allowed_intervals = {"day", "week", "month", "quarter", "year"}
if interval not in allowed_intervals:
interval = "month"
start = request.args.get("start", type=str)
end = request.args.get("end", type=str)
filters: List[Dict] = []
channel_filter = build_channel_filter(channels)
if channel_filter:
filters.append(channel_filter)
year_filter = build_year_filter(year)
if year_filter:
filters.append(year_filter)
if start or end:
range_filter: Dict[str, Dict[str, Dict[str, str]]] = {"range": {"date": {}}}
if start:
range_filter["range"]["date"]["gte"] = start
if end:
range_filter["range"]["date"]["lte"] = end
filters.append(range_filter)
base_fields = ["title^3", "description^2", "transcript_full", "transcript_secondary_full"]
if use_query_string:
qs_query = term or "*"
must_clause: List[Dict[str, Any]] = [
{
"query_string": {
"query": qs_query,
"default_operator": "AND",
"fields": base_fields,
}
}
]
else:
must_clause = [
{
"multi_match": {
"query": term,
"fields": base_fields,
"type": "best_fields",
"operator": "and",
}
}
]
query: Dict[str, Any] = {"bool": {"must": must_clause}}
if filters:
query["bool"]["filter"] = filters
histogram: Dict[str, Any] = {
"field": "date",
"calendar_interval": interval,
"min_doc_count": 0,
}
if start or end:
bounds: Dict[str, str] = {}
if start:
bounds["min"] = start
if end:
bounds["max"] = end
if bounds:
histogram["extended_bounds"] = bounds
channel_terms_size = max(6, len(channels)) if channels else 6
body = {
"size": 0,
"query": query,
"aggs": {
"over_time": {
"date_histogram": histogram,
"aggs": {
"by_channel": {
"terms": {
"field": "channel_id.keyword",
"size": channel_terms_size,
"order": {"_count": "desc"},
}
}
},
}
},
}
if config.elastic.debug:
LOGGER.info(
"Elasticsearch frequency request: %s",
json.dumps(
{
"index": index,
"body": body,
"term": term,
"interval": interval,
"channels": channels,
"start": start,
"end": end,
"query_string": use_query_string,
},
indent=2,
),
)
response = client.search(index=index, body=body)
if config.elastic.debug:
LOGGER.info(
"Elasticsearch frequency response: %s",
json.dumps(response, indent=2, default=str),
)
raw_buckets = (
response.get("aggregations", {})
.get("over_time", {})
.get("buckets", [])
)
channel_totals: Dict[str, int] = {}
buckets: List[Dict[str, Any]] = []
for bucket in raw_buckets:
date_str = bucket.get("key_as_string")
total = bucket.get("doc_count", 0)
channel_entries: List[Dict[str, Any]] = []
for ch_bucket in bucket.get("by_channel", {}).get("buckets", []):
cid = ch_bucket.get("key")
count = ch_bucket.get("doc_count", 0)
if cid:
channel_entries.append({"id": cid, "count": count})
channel_totals[cid] = channel_totals.get(cid, 0) + count
buckets.append(
{"date": date_str, "total": total, "channels": channel_entries}
)
ranked_channels = sorted(
[{"id": cid, "total": total} for cid, total in channel_totals.items()],
key=lambda item: item["total"],
reverse=True,
)
payload = {
"term": raw_term if not use_query_string else term,
"interval": interval,
"buckets": buckets,
"channels": ranked_channels,
"totalResults": response.get("hits", {})
.get("total", {})
.get("value", 0),
}
return jsonify(payload)
@app.route("/frequency")
def frequency_page():
return send_from_directory(app.static_folder, "frequency.html")
@app.route("/api/transcript")
def transcript():
video_id = request.args.get("video_id", type=str)
if not video_id:
return ("video_id not set", 400)
response = client.get(index=index, id=video_id, ignore=[404])
if config.elastic.debug:
LOGGER.info(
"Elasticsearch transcript request: index=%s id=%s", index, video_id
)
LOGGER.info(
"Elasticsearch transcript response: %s",
json.dumps(response, indent=2, default=str)
if response
else "None",
)
if not response or not response.get("found"):
return ("not found", 404)
source = response["_source"]
return jsonify(
{
"video_id": source.get("video_id"),
"title": source.get("title"),
"transcript_parts": source.get("transcript_parts", []),
"transcript_full": source.get("transcript_full"),
"transcript_secondary_parts": source.get("transcript_secondary_parts", []),
"transcript_secondary_full": source.get("transcript_secondary_full"),
}
)
return app
def main() -> None: # pragma: no cover
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
app = create_app()
app.run(host="0.0.0.0", port=8080, debug=True)
if __name__ == "__main__": # pragma: no cover
main()