""" Flask application exposing search, graph, and transcript endpoints for TLC. Routes: GET / -> static HTML search page. GET /graph -> static reference graph UI. GET /vector-search -> experimental Qdrant vector search UI. GET /api/channels -> channels aggregation. GET /api/search -> Elasticsearch keyword search. POST /api/vector-search -> Qdrant vector similarity query. GET /api/graph -> reference graph API. GET /api/transcript -> transcript JSON payload. """ from __future__ import annotations import copy import json import logging import re from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Sequence, Set, Tuple from collections import Counter, deque from datetime import datetime from flask import Flask, jsonify, request, send_from_directory import requests try: from sentence_transformers import SentenceTransformer # type: ignore except ImportError: # pragma: no cover - optional dependency SentenceTransformer = None from .config import CONFIG, AppConfig try: from elasticsearch import Elasticsearch # type: ignore from elasticsearch import BadRequestError # type: ignore except ImportError: # pragma: no cover - dependency optional Elasticsearch = None BadRequestError = Exception # type: ignore LOGGER = logging.getLogger(__name__) _EMBED_MODEL = None _EMBED_MODEL_NAME: Optional[str] = None def _ensure_embedder(model_name: str) -> "SentenceTransformer": global _EMBED_MODEL, _EMBED_MODEL_NAME if SentenceTransformer is None: # pragma: no cover - optional dependency raise RuntimeError( "sentence-transformers is required for vector search. Install via pip install sentence-transformers." ) if _EMBED_MODEL is None or _EMBED_MODEL_NAME != model_name: LOGGER.info("Loading embedding model: %s", model_name) _EMBED_MODEL = SentenceTransformer(model_name) _EMBED_MODEL_NAME = model_name return _EMBED_MODEL def embed_query(text: str, *, model_name: str, expected_dim: int) -> List[float]: embedder = _ensure_embedder(model_name) vector = embedder.encode( [f"query: {text}"], show_progress_bar=False, normalize_embeddings=True, )[0].tolist() if len(vector) != expected_dim: raise RuntimeError( f"Embedding dimension mismatch (expected {expected_dim}, got {len(vector)})" ) return vector def _ensure_client(config: AppConfig) -> "Elasticsearch": if Elasticsearch is None: raise RuntimeError( "elasticsearch package not installed. " "Install elasticsearch>=7 to run the Flask search app." ) kwargs = {} if config.elastic.api_key: kwargs["api_key"] = config.elastic.api_key elif config.elastic.username and config.elastic.password: kwargs["basic_auth"] = ( config.elastic.username, config.elastic.password, ) if config.elastic.ca_cert: kwargs["ca_certs"] = str(config.elastic.ca_cert) kwargs["verify_certs"] = config.elastic.verify_certs return Elasticsearch(config.elastic.url, **kwargs) def metrics_payload(data_root: Path) -> Dict[str, Any]: total_items = 0 channel_counter: Counter = Counter() channel_name_map: Dict[str, str] = {} year_counter: Counter = Counter() month_counter: Counter = Counter() if not data_root.exists(): LOGGER.warning("Data directory %s not found; metrics will be empty.", data_root) return { "totalItems": 0, "totalChannels": 0, "itemsPerChannel": [], "yearHistogram": [], "recentMonths": [], } for path in data_root.rglob("*.json"): try: with path.open("r", encoding="utf-8") as handle: doc = json.load(handle) except Exception: continue total_items += 1 channel_id = doc.get("channel_id") channel_name = doc.get("channel_name") or channel_id if channel_id: channel_counter[channel_id] += 1 if channel_name and channel_id not in channel_name_map: channel_name_map[channel_id] = channel_name date_value = doc.get("date") or doc.get("published_at") dt: Optional[datetime] = None if isinstance(date_value, str): for fmt in ("%Y-%m-%d", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%SZ"): try: dt = datetime.strptime(date_value[: len(fmt)], fmt) break except Exception: continue elif isinstance(date_value, (int, float)): try: dt = datetime.fromtimestamp(date_value) except Exception: dt = None if dt: year_counter[str(dt.year)] += 1 month_counter[dt.strftime("%Y-%m")] += 1 items_per_channel = [ { "label": channel_name_map.get(cid, cid), "count": count, } for cid, count in channel_counter.most_common() ] year_histogram = [ {"bucket": year, "count": year_counter[year]} for year in sorted(year_counter.keys()) ] recent_months = sorted(month_counter.keys()) recent_months = recent_months[-12:] recent_months_payload = [ {"bucket": month, "count": month_counter[month]} for month in recent_months ] return { "totalItems": total_items, "totalChannels": len(channel_counter), "itemsPerChannel": items_per_channel, "yearHistogram": year_histogram, "recentMonths": recent_months_payload, } def elastic_metrics_payload( client: "Elasticsearch", index: str, *, channel_field_candidates: Optional[List[str]] = None, debug: bool = False, ) -> Dict[str, Any]: if channel_field_candidates is None: channel_field_candidates = ["channel_id.keyword", "channel_id"] base_body: Dict[str, Any] = { "size": 0, "track_total_hits": True, "aggs": { "channels": { "terms": { "field": "channel_id.keyword", "size": 500, "order": {"_count": "desc"}, }, "aggs": { "name": { "top_hits": { "size": 1, "_source": {"includes": ["channel_name"]}, } } }, }, "year_histogram": { "date_histogram": { "field": "date", "calendar_interval": "year", "format": "yyyy", } }, "month_histogram": { "date_histogram": { "field": "date", "calendar_interval": "month", "format": "yyyy-MM", "order": {"_key": "asc"}, } }, }, } last_error: Optional[Exception] = None response: Optional[Dict[str, Any]] = None for candidate_field in channel_field_candidates: body = json.loads(json.dumps(base_body)) body["aggs"]["channels"]["terms"]["field"] = candidate_field try: if debug: LOGGER.info( "Elasticsearch metrics request: %s", json.dumps({"index": index, "body": body}, indent=2), ) response = client.search(index=index, body=body) break except BadRequestError as exc: last_error = exc if debug: LOGGER.warning( "Metrics aggregation failed for field %s: %s", candidate_field, exc, ) if response is None: raise last_error or RuntimeError("Unable to compute metrics from Elasticsearch.") hits = response.get("hits", {}) total_items = hits.get("total", {}).get("value", 0) if debug: LOGGER.info( "Elasticsearch metrics response: %s", json.dumps(response, indent=2, default=str), ) aggregations = response.get("aggregations", {}) channel_buckets = aggregations.get("channels", {}).get("buckets", []) items_per_channel = [] for bucket in channel_buckets: key = bucket.get("key") channel_name = key top_hits = ( bucket.get("name", {}) .get("hits", {}) .get("hits", []) ) if top_hits: channel_name = ( top_hits[0] .get("_source", {}) .get("channel_name", channel_name) ) items_per_channel.append( {"label": channel_name or key, "count": bucket.get("doc_count", 0)} ) year_buckets = aggregations.get("year_histogram", {}).get("buckets", []) year_histogram = [ { "bucket": bucket.get("key_as_string") or str(bucket.get("key")), "count": bucket.get("doc_count", 0), } for bucket in year_buckets ] month_buckets = aggregations.get("month_histogram", {}).get("buckets", []) recent_months_entries = [ { "bucket": bucket.get("key_as_string") or str(bucket.get("key")), "count": bucket.get("doc_count", 0), "_key": bucket.get("key"), } for bucket in month_buckets ] recent_months_entries.sort(key=lambda item: item.get("_key", 0)) recent_months_payload = [ {"bucket": entry["bucket"], "count": entry["count"]} for entry in recent_months_entries[-12:] ] return { "totalItems": total_items, "totalChannels": len(items_per_channel), "itemsPerChannel": items_per_channel, "yearHistogram": year_histogram, "recentMonths": recent_months_payload, } def parse_channel_params(values: Iterable[Optional[str]]) -> List[str]: seen: Set[str] = set() channels: List[str] = [] for value in values: if not value: continue for part in str(value).split(","): cleaned = part.strip() if not cleaned or cleaned.lower() == "all": continue if cleaned not in seen: seen.add(cleaned) channels.append(cleaned) return channels def build_year_filter(year: Optional[str]) -> Optional[Dict]: if not year: return None try: year_int = int(year) return { "range": { "date": { "gte": f"{year_int}-01-01", "lt": f"{year_int + 1}-01-01", "format": "yyyy-MM-dd" } } } except (ValueError, TypeError): return None def build_channel_filter(channels: Optional[Sequence[str]]) -> Optional[Dict]: if not channels: return None per_channel_clauses: List[Dict[str, Any]] = [] for value in channels: if not value: continue per_channel_clauses.append( { "bool": { "should": [ {"term": {"channel_id.keyword": value}}, {"term": {"channel_id": value}}, ], "minimum_should_match": 1, } } ) if not per_channel_clauses: return None if len(per_channel_clauses) == 1: return per_channel_clauses[0] return { "bool": { "should": per_channel_clauses, "minimum_should_match": 1, } } def build_query_payload( query: str, *, channels: Optional[Sequence[str]] = None, year: Optional[str] = None, sort: str = "relevant", use_exact: bool = True, use_fuzzy: bool = True, use_phrase: bool = True, use_query_string: bool = False, include_external: bool = True, ) -> Dict: filters: List[Dict] = [] should: List[Dict] = [] channel_filter = build_channel_filter(channels) if channel_filter: filters.append(channel_filter) year_filter = build_year_filter(year) if year_filter: filters.append(year_filter) if not include_external: filters.append({"bool": {"must_not": [{"term": {"external_reference": True}}]}}) if use_query_string: base_fields = ["title^3", "description^2", "transcript_full", "transcript_secondary_full"] qs_query = (query or "").strip() or "*" query_body: Dict[str, Any] = { "query_string": { "query": qs_query, "default_operator": "AND", "fields": base_fields, } } if filters: query_body = {"bool": {"must": query_body, "filter": filters}} body: Dict = { "query": query_body, "highlight": { "fields": { "transcript_full": { "fragment_size": 160, "number_of_fragments": 5, "fragmenter": "span", }, "transcript_secondary_full": { "fragment_size": 160, "number_of_fragments": 5, "fragmenter": "span", }, "title": {"number_of_fragments": 0}, "description": { "fragment_size": 160, "number_of_fragments": 1, }, }, "require_field_match": False, "pre_tags": [""], "post_tags": [""], "encoder": "html", "max_analyzed_offset": 900000, }, } if sort == "newer": body["sort"] = [{"date": {"order": "desc"}}] elif sort == "older": body["sort"] = [{"date": {"order": "asc"}}] elif sort == "referenced": body["sort"] = [{"referenced_by_count": {"order": "desc", "unmapped_type": "long"}}] return body if query: base_fields = ["title^3", "description^2", "transcript_full", "transcript_secondary_full"] if use_phrase: should.append( { "match_phrase": { "transcript_full": { "query": query, "slop": 2, "boost": 10.0, } } } ) should.append( { "match_phrase": { "transcript_secondary_full": { "query": query, "slop": 2, "boost": 10.0, } } } ) should.append( { "match_phrase": { "title": { "query": query, "slop": 0, "boost": 50.0, } } } ) if use_fuzzy: should.append( { "multi_match": { "query": query, "fields": base_fields, "type": "best_fields", "operator": "and", "fuzziness": "AUTO", "prefix_length": 1, "max_expansions": 50, "boost": 1.5, } } ) if use_exact: should.append( { "multi_match": { "query": query, "fields": base_fields, "type": "best_fields", "operator": "and", "boost": 3.0, } } ) if should: query_body: Dict = { "bool": { "should": should, "minimum_should_match": 1, } } if filters: query_body["bool"]["filter"] = filters elif filters: query_body = {"bool": {"filter": filters}} else: query_body = {"match_all": {}} body: Dict = { "query": query_body, "highlight": { "fields": { "transcript_full": { "fragment_size": 160, "number_of_fragments": 5, "fragmenter": "span", }, "transcript_secondary_full": { "fragment_size": 160, "number_of_fragments": 5, "fragmenter": "span", }, "title": {"number_of_fragments": 0}, "description": { "fragment_size": 160, "number_of_fragments": 1, }, }, "require_field_match": False, "pre_tags": [""], "post_tags": [""], "encoder": "html", "max_analyzed_offset": 900000, }, } if query_body.get("match_all") is None: body["highlight"]["highlight_query"] = copy.deepcopy(query_body) if sort == "newer": body["sort"] = [{"date": {"order": "desc"}}] elif sort == "older": body["sort"] = [{"date": {"order": "asc"}}] elif sort == "referenced": body["sort"] = [{"referenced_by_count": {"order": "desc", "unmapped_type": "long"}}] return body def create_app(config: AppConfig = CONFIG) -> Flask: app = Flask(__name__, static_folder=str(Path(__file__).parent / "static")) client = _ensure_client(config) index = config.elastic.index qdrant_url = config.qdrant_url qdrant_collection = config.qdrant_collection qdrant_vector_name = config.qdrant_vector_name qdrant_vector_size = config.qdrant_vector_size qdrant_embed_model = config.qdrant_embed_model @app.route("/") def index_page(): return send_from_directory(app.static_folder, "index.html") @app.route("/graph") def graph_page(): return send_from_directory(app.static_folder, "graph.html") @app.route("/vector-search") def vector_search_page(): return send_from_directory(app.static_folder, "vector.html") @app.route("/static/") def static_files(filename: str): return send_from_directory(app.static_folder, filename) def normalize_reference_list(values: Any) -> List[str]: if values is None: return [] if isinstance(values, (list, tuple, set)): iterable = values else: iterable = [values] normalized: List[str] = [] for item in iterable: candidate: Optional[str] if isinstance(item, dict): candidate = item.get("video_id") or item.get("id") # type: ignore[assignment] else: candidate = item # type: ignore[assignment] if candidate is None: continue text = str(candidate).strip() if not text: continue if text.lower() in {"none", "null"}: continue normalized.append(text) return normalized def build_graph_payload( root_id: str, depth: int, max_nodes: int ) -> Dict[str, Any]: root_id = root_id.strip() if not root_id: return {"nodes": [], "links": [], "root": root_id, "depth": depth, "meta": {}} doc_cache: Dict[str, Optional[Dict[str, Any]]] = {} def fetch_document(video_id: str) -> Optional[Dict[str, Any]]: if video_id in doc_cache: return doc_cache[video_id] try: result = client.get(index=index, id=video_id) doc_cache[video_id] = result.get("_source") except Exception as exc: # pragma: no cover - elasticsearch handles errors LOGGER.debug("Graph: failed to load %s: %s", video_id, exc) doc_cache[video_id] = None return doc_cache[video_id] nodes: Dict[str, Dict[str, Any]] = {} links: List[Dict[str, Any]] = [] link_seen: Set[Tuple[str, str, str]] = set() queue: deque[Tuple[str, int]] = deque([(root_id, 0)]) queued: Set[str] = {root_id} visited: Set[str] = set() while queue and len(nodes) < max_nodes: current_id, level = queue.popleft() queued.discard(current_id) if current_id in visited: continue doc = fetch_document(current_id) if doc is None: if current_id == root_id: break visited.add(current_id) continue visited.add(current_id) nodes[current_id] = { "id": current_id, "title": doc.get("title") or current_id, "channel_id": doc.get("channel_id"), "channel_name": doc.get("channel_name") or doc.get("channel_id") or "Unknown", "url": doc.get("url"), "date": doc.get("date"), "is_root": current_id == root_id, } if level >= depth: continue neighbor_ids: List[str] = [] for ref_id in normalize_reference_list(doc.get("internal_references")): if ref_id == current_id: continue key = (current_id, ref_id, "references") if key not in link_seen: links.append( {"source": current_id, "target": ref_id, "relation": "references"} ) link_seen.add(key) neighbor_ids.append(ref_id) for ref_id in normalize_reference_list(doc.get("referenced_by")): if ref_id == current_id: continue key = (ref_id, current_id, "referenced_by") if key not in link_seen: links.append( {"source": ref_id, "target": current_id, "relation": "referenced_by"} ) link_seen.add(key) neighbor_ids.append(ref_id) for neighbor in neighbor_ids: if neighbor in visited or neighbor in queued: continue if len(nodes) + len(queue) >= max_nodes: break queue.append((neighbor, level + 1)) queued.add(neighbor) # Ensure nodes referenced by links exist in the payload. for link in links: for key in ("source", "target"): node_id = link[key] if node_id in nodes: continue doc = fetch_document(node_id) if doc is None: nodes[node_id] = { "id": node_id, "title": node_id, "channel_id": None, "channel_name": "Unknown", "url": None, "date": None, "is_root": node_id == root_id, } else: nodes[node_id] = { "id": node_id, "title": doc.get("title") or node_id, "channel_id": doc.get("channel_id"), "channel_name": doc.get("channel_name") or doc.get("channel_id") or "Unknown", "url": doc.get("url"), "date": doc.get("date"), "is_root": node_id == root_id, } links = [ link for link in links if link.get("source") in nodes and link.get("target") in nodes ] return { "root": root_id, "depth": depth, "nodes": list(nodes.values()), "links": links, "meta": { "node_count": len(nodes), "link_count": len(links), }, } @app.route("/api/channels") def channels(): include_external = request.args.get("external", default="0", type=str) include_external = include_external.lower() not in {"0", "false", "no"} base_channels_body = { "size": 0, "aggs": { "channels": { "terms": {"field": "channel_id", "size": 200}, "aggs": { "name": { "top_hits": { "size": 1, "_source": {"includes": ["channel_name"]}, } } }, } }, } if not include_external: base_channels_body["query"] = { "bool": {"must_not": [{"term": {"external_reference": True}}]} } def run_channels_request(field_name: str): body = json.loads(json.dumps(base_channels_body)) # deep copy body["aggs"]["channels"]["terms"]["field"] = field_name if config.elastic.debug: LOGGER.info( "Elasticsearch channels request: %s", json.dumps({"index": index, "body": body}, indent=2), ) return client.search(index=index, body=body) response = None last_error = None for candidate_field in ("channel_id.keyword", "channel_id"): try: response = run_channels_request(candidate_field) if config.elastic.debug: LOGGER.info("Channels aggregation used field: %s", candidate_field) break except BadRequestError as exc: last_error = exc if config.elastic.debug: LOGGER.warning( "Channels aggregation failed for field %s: %s", candidate_field, exc, ) if response is None: raise last_error or RuntimeError("Unable to aggregate channels.") if config.elastic.debug: LOGGER.info( "Elasticsearch channels response: %s", json.dumps(response, indent=2, default=str), ) buckets = ( response.get("aggregations", {}) .get("channels", {}) .get("buckets", []) ) data = [] for bucket in buckets: key = bucket.get("key") name_hit = ( bucket.get("name", {}) .get("hits", {}) .get("hits", [{}])[0] .get("_source", {}) .get("channel_name") ) display_name = name_hit or key or "Unknown" data.append( { "Id": key, "Name": display_name, "Count": bucket.get("doc_count", 0), } ) data.sort(key=lambda item: item["Name"].lower()) return jsonify(data) @app.route("/api/graph") def graph_api(): video_id = (request.args.get("video_id") or "").strip() if not video_id: return jsonify({"error": "video_id is required"}), 400 try: depth = int(request.args.get("depth", "1")) except ValueError: depth = 1 depth = max(0, min(depth, 3)) try: max_nodes = int(request.args.get("max_nodes", "200")) except ValueError: max_nodes = 200 max_nodes = max(10, min(max_nodes, 400)) payload = build_graph_payload(video_id, depth, max_nodes) if not payload["nodes"]: return ( jsonify({"error": f"Video '{video_id}' was not found in the index."}), 404, ) payload["meta"]["max_nodes"] = max_nodes return jsonify(payload) @app.route("/api/years") def years(): body = { "size": 0, "aggs": { "years": { "date_histogram": { "field": "date", "calendar_interval": "year", "format": "yyyy", "order": {"_key": "desc"} } } } } if config.elastic.debug: LOGGER.info( "Elasticsearch years request: %s", json.dumps({"index": index, "body": body}, indent=2), ) response = client.search(index=index, body=body) if config.elastic.debug: LOGGER.info( "Elasticsearch years response: %s", json.dumps(response, indent=2, default=str), ) buckets = ( response.get("aggregations", {}) .get("years", {}) .get("buckets", []) ) data = [ { "Year": bucket.get("key_as_string"), "Count": bucket.get("doc_count", 0), } for bucket in buckets if bucket.get("doc_count", 0) > 0 ] return jsonify(data) @app.route("/api/search") def search(): query = request.args.get("q", "", type=str) raw_channels: List[Optional[str]] = request.args.getlist("channel_id") legacy_channel = request.args.get("channel", type=str) if legacy_channel: raw_channels.append(legacy_channel) channels = parse_channel_params(raw_channels) year = request.args.get("year", "", type=str) or None sort = request.args.get("sort", "relevant", type=str) page = max(request.args.get("page", 0, type=int), 0) size = max(request.args.get("size", 10, type=int), 1) def parse_flag(name: str, default: bool = True) -> bool: value = request.args.get(name) if value is None: return default return value.lower() not in {"0", "false", "no"} use_exact = parse_flag("exact", True) use_fuzzy = parse_flag("fuzzy", True) use_phrase = parse_flag("phrase", True) use_query_string = parse_flag("query_string", False) include_external = parse_flag("external", False) if use_query_string: use_exact = use_fuzzy = use_phrase = False payload = build_query_payload( query, channels=channels, year=year, sort=sort, use_exact=use_exact, use_fuzzy=use_fuzzy, use_phrase=use_phrase, use_query_string=use_query_string, include_external=include_external, ) start = page * size if config.elastic.debug: LOGGER.info( "Elasticsearch search request: %s", json.dumps( { "index": index, "from": start, "size": size, "body": payload, "channels": channels, "toggles": { "exact": use_exact, "fuzzy": use_fuzzy, "phrase": use_phrase, }, }, indent=2, ), ) response = client.search( index=index, from_=start, size=size, body=payload, ) if config.elastic.debug: LOGGER.info( "Elasticsearch search response: %s", json.dumps(response, indent=2, default=str), ) hits = response.get("hits", {}) total = hits.get("total", {}).get("value", 0) documents = [] for hit in hits.get("hits", []): source = hit.get("_source", {}) highlight_map = hit.get("highlight", {}) transcript_highlight = [ {"html": value, "source": "primary"} for value in (highlight_map.get("transcript_full", []) or []) ] + [ {"html": value, "source": "secondary"} for value in (highlight_map.get("transcript_secondary_full", []) or []) ] title_html = ( highlight_map.get("title") or [source.get("title") or "Untitled"] )[0] description_html = ( highlight_map.get("description") or [source.get("description") or ""] )[0] documents.append( { "video_id": source.get("video_id"), "channel_id": source.get("channel_id"), "channel_name": source.get("channel_name"), "title": source.get("title"), "titleHtml": title_html, "description": source.get("description"), "descriptionHtml": description_html, "date": source.get("date"), "duration": source.get("duration"), "url": source.get("url"), "toHighlight": transcript_highlight, "highlightSource": { "primary": bool(highlight_map.get("transcript_full")), "secondary": bool(highlight_map.get("transcript_secondary_full")), }, "internal_references_count": source.get("internal_references_count", 0), "internal_references": source.get("internal_references", []), "referenced_by_count": source.get("referenced_by_count", 0), "referenced_by": source.get("referenced_by", []), "video_status": source.get("video_status"), "external_reference": source.get("external_reference"), } ) return jsonify( { "items": documents, "totalResults": total, "totalPages": (total + size - 1) // size, "currentPage": page, } ) @app.route("/api/metrics") def metrics(): try: data = elastic_metrics_payload( client, index, channel_field_candidates=["channel_id.keyword", "channel_id"], debug=config.elastic.debug, ) except Exception: LOGGER.exception( "Falling back to local metrics payload due to Elasticsearch error.", exc_info=True, ) data = metrics_payload(config.data.root) return jsonify(data) @app.route("/api/frequency") def frequency(): raw_term = request.args.get("term", type=str) or "" use_query_string = request.args.get("query_string", default="0", type=str) use_query_string = (use_query_string or "").lower() in {"1", "true", "yes"} term = raw_term.strip() if not term and not use_query_string: return ("term parameter is required", 400) if use_query_string and not term: term = "*" raw_channels: List[Optional[str]] = request.args.getlist("channel_id") legacy_channel = request.args.get("channel", type=str) if legacy_channel: raw_channels.append(legacy_channel) channels = parse_channel_params(raw_channels) year = request.args.get("year", "", type=str) or None interval = (request.args.get("interval", "month") or "month").lower() allowed_intervals = {"day", "week", "month", "quarter", "year"} if interval not in allowed_intervals: interval = "month" start = request.args.get("start", type=str) end = request.args.get("end", type=str) def parse_flag(name: str, default: bool = True) -> bool: value = request.args.get(name) if value is None: return default lowered = value.lower() return lowered not in {"0", "false", "no"} use_exact = parse_flag("exact", True) use_fuzzy = parse_flag("fuzzy", True) use_phrase = parse_flag("phrase", True) include_external = parse_flag("external", False) if use_query_string: use_exact = use_fuzzy = use_phrase = False search_payload = build_query_payload( term, channels=channels, year=year, sort="relevant", use_exact=use_exact, use_fuzzy=use_fuzzy, use_phrase=use_phrase, use_query_string=use_query_string, include_external=include_external, ) query = search_payload.get("query", {"match_all": {}}) if start or end: range_filter: Dict[str, Dict[str, Dict[str, str]]] = {"range": {"date": {}}} if start: range_filter["range"]["date"]["gte"] = start if end: range_filter["range"]["date"]["lte"] = end if "bool" in query: bool_clause = query.setdefault("bool", {}) existing_filter = bool_clause.get("filter") if existing_filter is None: bool_clause["filter"] = [range_filter] elif isinstance(existing_filter, list): bool_clause["filter"].append(range_filter) else: bool_clause["filter"] = [existing_filter, range_filter] elif query.get("match_all") is not None: query = {"bool": {"filter": [range_filter]}} else: query = {"bool": {"must": [query], "filter": [range_filter]}} histogram: Dict[str, Any] = { "field": "date", "calendar_interval": interval, "min_doc_count": 0, } if start or end: bounds: Dict[str, str] = {} if start: bounds["min"] = start if end: bounds["max"] = end if bounds: histogram["extended_bounds"] = bounds channel_terms_size = max(6, len(channels)) if channels else 6 body = { "size": 0, "query": query, "aggs": { "over_time": { "date_histogram": histogram, "aggs": { "by_channel": { "terms": { "field": "channel_id.keyword", "size": channel_terms_size, "order": {"_count": "desc"}, }, "aggs": { "channel_name_hit": { "top_hits": { "size": 1, "_source": {"includes": ["channel_name"]}, } } }, } }, } }, } if config.elastic.debug: LOGGER.info( "Elasticsearch frequency request: %s", json.dumps( { "index": index, "body": body, "term": term, "interval": interval, "channels": channels, "start": start, "end": end, "query_string": use_query_string, }, indent=2, ), ) response = client.search(index=index, body=body) if config.elastic.debug: LOGGER.info( "Elasticsearch frequency response: %s", json.dumps(response, indent=2, default=str), ) raw_buckets = ( response.get("aggregations", {}) .get("over_time", {}) .get("buckets", []) ) channel_totals: Dict[str, Dict[str, Any]] = {} buckets: List[Dict[str, Any]] = [] for bucket in raw_buckets: date_str = bucket.get("key_as_string") total = bucket.get("doc_count", 0) channel_entries: List[Dict[str, Any]] = [] for ch_bucket in bucket.get("by_channel", {}).get("buckets", []): cid = ch_bucket.get("key") count = ch_bucket.get("doc_count", 0) if cid: hit_source = ( ch_bucket.get("channel_name_hit", {}) .get("hits", {}) .get("hits", [{}])[0] .get("_source", {}) ) channel_name = hit_source.get("channel_name") if isinstance(hit_source, dict) else None channel_entries.append({"id": cid, "count": count, "name": channel_name}) if cid not in channel_totals: channel_totals[cid] = {"total": 0, "name": channel_name} channel_totals[cid]["total"] += count if channel_name and not channel_totals[cid].get("name"): channel_totals[cid]["name"] = channel_name buckets.append( {"date": date_str, "total": total, "channels": channel_entries} ) ranked_channels = sorted( [ {"id": cid, "total": info.get("total", 0), "name": info.get("name")} for cid, info in channel_totals.items() ], key=lambda item: item["total"], reverse=True, ) payload = { "term": raw_term if not use_query_string else term, "interval": interval, "buckets": buckets, "channels": ranked_channels, "totalResults": response.get("hits", {}) .get("total", {}) .get("value", 0), } return jsonify(payload) @app.route("/frequency") def frequency_page(): return send_from_directory(app.static_folder, "frequency.html") @app.route("/api/vector-search", methods=["POST"]) def api_vector_search(): payload = request.get_json(silent=True) or {} query_text = (payload.get("query") or "").strip() filters = payload.get("filters") or {} limit = max(int(payload.get("size", 10)), 1) offset = max(int(payload.get("offset", 0)), 0) if not query_text: return jsonify( {"items": [], "totalResults": 0, "offset": offset, "error": "empty_query"} ) try: query_vector = embed_query( query_text, model_name=qdrant_embed_model, expected_dim=qdrant_vector_size ) except Exception as exc: # pragma: no cover - runtime dependency LOGGER.error("Embedding failed: %s", exc, exc_info=config.elastic.debug) return jsonify({"error": "embedding_unavailable"}), 500 qdrant_vector_payload: Any if qdrant_vector_name: qdrant_vector_payload = {qdrant_vector_name: query_vector} else: qdrant_vector_payload = query_vector qdrant_body: Dict[str, Any] = { "vector": qdrant_vector_payload, "limit": limit, "offset": offset, "with_payload": True, "with_vectors": False, } if filters: qdrant_body["filter"] = filters try: response = requests.post( f"{qdrant_url}/collections/{qdrant_collection}/points/search", json=qdrant_body, timeout=20, ) response.raise_for_status() data = response.json() except Exception as exc: LOGGER.error("Vector search failed: %s", exc, exc_info=config.elastic.debug) return jsonify({"error": "vector_search_unavailable"}), 502 points = data.get("result", []) if isinstance(data, dict) else [] items: List[Dict[str, Any]] = [] missing_channel_ids: Set[str] = set() for point in points: payload = point.get("payload", {}) or {} raw_highlights = payload.get("highlights") or [] highlight_entries: List[Dict[str, str]] = [] for entry in raw_highlights: if isinstance(entry, dict): html_value = entry.get("html") or entry.get("text") else: html_value = str(entry) if not html_value: continue highlight_entries.append({"html": html_value, "source": "primary"}) channel_label = ( payload.get("channel_name") or payload.get("channel_title") or payload.get("channel_id") ) items.append( { "video_id": payload.get("video_id"), "channel_id": payload.get("channel_id"), "channel_name": channel_label, "title": payload.get("title"), "titleHtml": payload.get("title"), "description": payload.get("description"), "descriptionHtml": payload.get("description"), "date": payload.get("date"), "url": payload.get("url"), "chunkText": payload.get("text") or payload.get("chunk_text") or payload.get("chunk") or payload.get("content"), "chunkTimestamp": payload.get("timestamp") or payload.get("start_seconds") or payload.get("start"), "toHighlight": highlight_entries, "highlightSource": { "primary": bool(highlight_entries), "secondary": False, }, "distance": point.get("score"), "internal_references_count": payload.get("internal_references_count", 0), "internal_references": payload.get("internal_references", []), "referenced_by_count": payload.get("referenced_by_count", 0), "referenced_by": payload.get("referenced_by", []), "video_status": payload.get("video_status"), "duration": payload.get("duration"), } ) if (not channel_label) and payload.get("channel_id"): missing_channel_ids.add(str(payload.get("channel_id"))) if missing_channel_ids: try: es_lookup = client.search( index=index, body={ "size": len(missing_channel_ids) * 2, "_source": ["channel_id", "channel_name"], "query": {"terms": {"channel_id.keyword": list(missing_channel_ids)}}, }, ) hits = es_lookup.get("hits", {}).get("hits", []) channel_lookup = {} for hit in hits: src = hit.get("_source", {}) or {} cid = src.get("channel_id") cname = src.get("channel_name") if cid and cname and cid not in channel_lookup: channel_lookup[cid] = cname for item in items: if not item.get("channel_name"): cid = item.get("channel_id") if cid and cid in channel_lookup: item["channel_name"] = channel_lookup[cid] except Exception as exc: LOGGER.debug("Vector channel lookup failed: %s", exc) return jsonify( { "items": items, "totalResults": len(items), "offset": offset, } ) @app.route("/api/transcript") def transcript(): video_id = request.args.get("video_id", type=str) if not video_id: return ("video_id not set", 400) response = client.get(index=index, id=video_id, ignore=[404]) if config.elastic.debug: LOGGER.info( "Elasticsearch transcript request: index=%s id=%s", index, video_id ) LOGGER.info( "Elasticsearch transcript response: %s", json.dumps(response, indent=2, default=str) if response else "None", ) if not response or not response.get("found"): return ("not found", 404) source = response["_source"] return jsonify( { "video_id": source.get("video_id"), "title": source.get("title"), "transcript_parts": source.get("transcript_parts", []), "transcript_full": source.get("transcript_full"), "transcript_secondary_parts": source.get("transcript_secondary_parts", []), "transcript_secondary_full": source.get("transcript_secondary_full"), } ) return app def main() -> None: # pragma: no cover logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") app = create_app() app.run(host="0.0.0.0", port=8080, debug=True) if __name__ == "__main__": # pragma: no cover main()