From d26edda029ec8afa68013053268649c08471369e Mon Sep 17 00:00:00 2001 From: knight Date: Thu, 8 Jan 2026 14:22:01 -0500 Subject: [PATCH] Add graph traversal endpoints and sort metrics by channel name --- search_app.py | 673 ++++++++++++++++++++++++++------------------------ 1 file changed, 353 insertions(+), 320 deletions(-) diff --git a/search_app.py b/search_app.py index 1d0e1ec..82278fb 100644 --- a/search_app.py +++ b/search_app.py @@ -201,6 +201,15 @@ def elastic_metrics_payload( "top_hits": { "size": 1, "_source": {"includes": ["channel_name"]}, + "sort": [ + { + "channel_name.keyword": { + "order": "asc", + "missing": "_last", + "unmapped_type": "keyword", + } + } + ], } } }, @@ -568,6 +577,324 @@ def build_query_payload( return body +def build_graph_payload( + client: "Elasticsearch", + index: str, + root_id: str, + depth: int, + max_nodes: int, + *, + include_external: bool = True, +) -> Dict[str, Any]: + root_id = root_id.strip() + if not root_id: + return {"nodes": [], "links": [], "root": root_id, "depth": depth, "meta": {}} + + doc_cache: Dict[str, Optional[Dict[str, Any]]] = {} + + def fetch_document(video_id: str) -> Optional[Dict[str, Any]]: + if video_id in doc_cache: + return doc_cache[video_id] + try: + result = client.get(index=index, id=video_id) + doc_cache[video_id] = result.get("_source") + except Exception as exc: # pragma: no cover - elasticsearch handles errors + LOGGER.debug("Graph: failed to load %s: %s", video_id, exc) + doc_cache[video_id] = None + doc = doc_cache[video_id] + if doc is not None and not include_external and doc.get("external_reference"): + doc_cache[video_id] = None + return None + return doc_cache[video_id] + + nodes: Dict[str, Dict[str, Any]] = {} + links: List[Dict[str, Any]] = [] + link_seen: Set[Tuple[str, str, str]] = set() + queue: deque[Tuple[str, int]] = deque([(root_id, 0)]) + queued: Set[str] = {root_id} + visited: Set[str] = set() + + while queue and len(nodes) < max_nodes: + current_id, level = queue.popleft() + queued.discard(current_id) + if current_id in visited: + continue + doc = fetch_document(current_id) + if doc is None: + if current_id == root_id: + break + visited.add(current_id) + continue + + visited.add(current_id) + nodes[current_id] = { + "id": current_id, + "title": doc.get("title") or current_id, + "channel_id": doc.get("channel_id"), + "channel_name": doc.get("channel_name") or doc.get("channel_id") or "Unknown", + "url": doc.get("url"), + "date": doc.get("date"), + "is_root": current_id == root_id, + "external_reference": bool(doc.get("external_reference")), + } + + if level >= depth: + continue + + neighbor_ids: List[str] = [] + + for ref_id in normalize_reference_list(doc.get("internal_references")): + if ref_id == current_id: + continue + key = (current_id, ref_id, "references") + if key not in link_seen: + links.append( + {"source": current_id, "target": ref_id, "relation": "references"} + ) + link_seen.add(key) + neighbor_ids.append(ref_id) + + for ref_id in normalize_reference_list(doc.get("referenced_by")): + if ref_id == current_id: + continue + key = (ref_id, current_id, "referenced_by") + if key not in link_seen: + links.append( + {"source": ref_id, "target": current_id, "relation": "referenced_by"} + ) + link_seen.add(key) + neighbor_ids.append(ref_id) + + for neighbor in neighbor_ids: + if neighbor in visited or neighbor in queued: + continue + if len(nodes) + len(queue) >= max_nodes: + break + queue.append((neighbor, level + 1)) + queued.add(neighbor) + + # Ensure nodes referenced by links exist in the payload. + for link in links: + for key in ("source", "target"): + node_id = link[key] + if node_id in nodes: + continue + doc = fetch_document(node_id) + if doc is None: + if include_external: + nodes[node_id] = { + "id": node_id, + "title": node_id, + "channel_id": None, + "channel_name": "Unknown", + "url": None, + "date": None, + "is_root": node_id == root_id, + "external_reference": False, + } + continue + nodes[node_id] = { + "id": node_id, + "title": doc.get("title") or node_id, + "channel_id": doc.get("channel_id"), + "channel_name": doc.get("channel_name") or doc.get("channel_id") or "Unknown", + "url": doc.get("url"), + "date": doc.get("date"), + "is_root": node_id == root_id, + "external_reference": bool(doc.get("external_reference")), + } + + links = [ + link + for link in links + if link.get("source") in nodes and link.get("target") in nodes + ] + + return { + "root": root_id, + "depth": depth, + "nodes": list(nodes.values()), + "links": links, + "meta": { + "node_count": len(nodes), + "link_count": len(links), + }, + } + + +def build_full_graph_payload( + client: "Elasticsearch", + index: str, + max_nodes: Optional[int], + *, + highlight_id: Optional[str] = None, + include_external: bool = True, +) -> Dict[str, Any]: + """ + Attempt to render the entire reference graph by gathering every video that + references another (or is referenced). + """ + + query = { + "bool": { + "should": [ + {"range": {"internal_references_count": {"gt": 0}}}, + {"range": {"referenced_by_count": {"gt": 0}}}, + {"exists": {"field": "internal_references"}}, + {"exists": {"field": "referenced_by"}}, + ], + "minimum_should_match": 1, + } + } + source_fields = [ + "video_id", + "title", + "channel_id", + "channel_name", + "url", + "date", + "internal_references", + "referenced_by", + ] + nodes: Dict[str, Dict[str, Any]] = {} + links: List[Dict[str, Any]] = [] + link_seen: Set[Tuple[str, str, str]] = set() + node_limit = max_nodes if max_nodes and max_nodes > 0 else None + batch_size = 500 if node_limit is None else min(500, max(50, node_limit * 2)) + truncated = False + + def ensure_node(node_id: Optional[str], doc: Optional[Dict[str, Any]] = None) -> bool: + if not node_id: + return False + if node_id in nodes: + if doc: + if not include_external and doc.get("external_reference"): + nodes.pop(node_id, None) + return False + existing = nodes[node_id] + if (not existing.get("title") or existing["title"] == node_id) and doc.get("title"): + existing["title"] = doc["title"] + if not existing.get("channel_id") and doc.get("channel_id"): + existing["channel_id"] = doc.get("channel_id") + if ( + existing.get("channel_name") in {"Unknown", node_id, None} + and (doc.get("channel_name") or doc.get("channel_id")) + ): + existing["channel_name"] = doc.get("channel_name") or doc.get("channel_id") + if not existing.get("url") and doc.get("url"): + existing["url"] = doc.get("url") + if not existing.get("date") and doc.get("date"): + existing["date"] = doc.get("date") + return True + if node_limit is not None and len(nodes) >= node_limit: + return False + channel_name = None + channel_id = None + url = None + date_val = None + title = node_id + if doc: + if not include_external and doc.get("external_reference"): + return False + title = doc.get("title") or title + channel_id = doc.get("channel_id") + channel_name = doc.get("channel_name") or channel_id + url = doc.get("url") + date_val = doc.get("date") + nodes[node_id] = { + "id": node_id, + "title": title, + "channel_id": channel_id, + "channel_name": channel_name or "Unknown", + "url": url, + "date": date_val, + "is_root": False, + "external_reference": bool(doc.get("external_reference")) if doc else False, + } + return True + + scroll_id: Optional[str] = None + try: + body = {"query": query, "_source": source_fields, "sort": ["_doc"]} + response = client.search(index=index, body=body, size=batch_size, scroll="1m") + scroll_id = response.get("_scroll_id") + stop_fetch = False + while not stop_fetch: + hits = response.get("hits", {}).get("hits", []) + if not hits: + break + for hit in hits: + if node_limit is not None and len(nodes) >= node_limit: + truncated = True + stop_fetch = True + break + source = hit.get("_source", {}) or {} + video_id = source.get("video_id") + if not video_id: + continue + if not include_external and source.get("external_reference"): + nodes.pop(video_id, None) + continue + if not ensure_node(video_id, source): + continue + for target in normalize_reference_list(source.get("internal_references")): + if target == video_id: + continue + if not ensure_node(target): + continue + key = (video_id, target, "references") + if key not in link_seen: + links.append( + {"source": video_id, "target": target, "relation": "references"} + ) + link_seen.add(key) + for origin in normalize_reference_list(source.get("referenced_by")): + if origin == video_id: + continue + if not ensure_node(origin): + continue + key = (origin, video_id, "referenced_by") + if key not in link_seen: + links.append( + {"source": origin, "target": video_id, "relation": "referenced_by"} + ) + link_seen.add(key) + if stop_fetch or not scroll_id: + break + response = client.scroll(scroll_id=scroll_id, scroll="1m") + scroll_id = response.get("_scroll_id") + if not scroll_id: + break + finally: + if scroll_id: + try: + client.clear_scroll(scroll_id=scroll_id) + except Exception: + pass + + if highlight_id and highlight_id in nodes: + nodes[highlight_id]["is_root"] = True + + links = [ + link + for link in links + if link.get("source") in nodes and link.get("target") in nodes + ] + + return { + "root": highlight_id or "", + "depth": 0, + "nodes": list(nodes.values()), + "links": links, + "meta": { + "node_count": len(nodes), + "link_count": len(links), + "mode": "full", + "truncated": truncated, + }, + } + + def create_app(config: AppConfig = CONFIG) -> Flask: app = Flask(__name__, static_folder=str(Path(__file__).parent / "static")) client = _ensure_client(config) @@ -618,325 +945,6 @@ def create_app(config: AppConfig = CONFIG) -> Flask: normalized.append(text) return normalized -def build_graph_payload( - root_id: str, depth: int, max_nodes: int, *, include_external: bool = True -) -> Dict[str, Any]: - root_id = root_id.strip() - if not root_id: - return {"nodes": [], "links": [], "root": root_id, "depth": depth, "meta": {}} - - doc_cache: Dict[str, Optional[Dict[str, Any]]] = {} - - def fetch_document(video_id: str) -> Optional[Dict[str, Any]]: - if video_id in doc_cache: - return doc_cache[video_id] - try: - result = client.get(index=index, id=video_id) - doc_cache[video_id] = result.get("_source") - except Exception as exc: # pragma: no cover - elasticsearch handles errors - LOGGER.debug("Graph: failed to load %s: %s", video_id, exc) - doc_cache[video_id] = None - doc = doc_cache[video_id] - if ( - doc is not None - and not include_external - and doc.get("external_reference") - ): - doc_cache[video_id] = None - return None - return doc_cache[video_id] - - nodes: Dict[str, Dict[str, Any]] = {} - links: List[Dict[str, Any]] = [] - link_seen: Set[Tuple[str, str, str]] = set() - queue: deque[Tuple[str, int]] = deque([(root_id, 0)]) - queued: Set[str] = {root_id} - visited: Set[str] = set() - - while queue and len(nodes) < max_nodes: - current_id, level = queue.popleft() - queued.discard(current_id) - if current_id in visited: - continue - doc = fetch_document(current_id) - if doc is None: - if current_id == root_id: - break - visited.add(current_id) - continue - - visited.add(current_id) - nodes[current_id] = { - "id": current_id, - "title": doc.get("title") or current_id, - "channel_id": doc.get("channel_id"), - "channel_name": doc.get("channel_name") or doc.get("channel_id") or "Unknown", - "url": doc.get("url"), - "date": doc.get("date"), - "is_root": current_id == root_id, - "external_reference": bool(doc.get("external_reference")), - } - - if level >= depth: - continue - - neighbor_ids: List[str] = [] - - for ref_id in normalize_reference_list(doc.get("internal_references")): - if ref_id == current_id: - continue - key = (current_id, ref_id, "references") - if key not in link_seen: - links.append( - {"source": current_id, "target": ref_id, "relation": "references"} - ) - link_seen.add(key) - neighbor_ids.append(ref_id) - - for ref_id in normalize_reference_list(doc.get("referenced_by")): - if ref_id == current_id: - continue - key = (ref_id, current_id, "referenced_by") - if key not in link_seen: - links.append( - {"source": ref_id, "target": current_id, "relation": "referenced_by"} - ) - link_seen.add(key) - neighbor_ids.append(ref_id) - - for neighbor in neighbor_ids: - if neighbor in visited or neighbor in queued: - continue - if len(nodes) + len(queue) >= max_nodes: - break - queue.append((neighbor, level + 1)) - queued.add(neighbor) - - # Ensure nodes referenced by links exist in the payload. - for link in links: - for key in ("source", "target"): - node_id = link[key] - if node_id in nodes: - continue - doc = fetch_document(node_id) - if doc is None: - if include_external: - nodes[node_id] = { - "id": node_id, - "title": node_id, - "channel_id": None, - "channel_name": "Unknown", - "url": None, - "date": None, - "is_root": node_id == root_id, - "external_reference": False, - } - continue - nodes[node_id] = { - "id": node_id, - "title": doc.get("title") or node_id, - "channel_id": doc.get("channel_id"), - "channel_name": doc.get("channel_name") or doc.get("channel_id") or "Unknown", - "url": doc.get("url"), - "date": doc.get("date"), - "is_root": node_id == root_id, - "external_reference": bool(doc.get("external_reference")), - } - - links = [ - link - for link in links - if link.get("source") in nodes and link.get("target") in nodes - ] - - return { - "root": root_id, - "depth": depth, - "nodes": list(nodes.values()), - "links": links, - "meta": { - "node_count": len(nodes), - "link_count": len(links), - }, - } - -def build_full_graph_payload( - max_nodes: Optional[int], - *, - highlight_id: Optional[str] = None, - include_external: bool = True, -) -> Dict[str, Any]: - """ - Attempt to render the entire reference graph by gathering every video that - references another (or is referenced). - """ - - query = { - "bool": { - "should": [ - {"range": {"internal_references_count": {"gt": 0}}}, - {"range": {"referenced_by_count": {"gt": 0}}}, - {"exists": {"field": "internal_references"}}, - {"exists": {"field": "referenced_by"}}, - ], - "minimum_should_match": 1, - } - } - source_fields = [ - "video_id", - "title", - "channel_id", - "channel_name", - "url", - "date", - "internal_references", - "referenced_by", - ] - nodes: Dict[str, Dict[str, Any]] = {} - links: List[Dict[str, Any]] = [] - link_seen: Set[Tuple[str, str, str]] = set() - node_limit = max_nodes if max_nodes and max_nodes > 0 else None - batch_size = 500 if node_limit is None else min(500, max(50, node_limit * 2)) - truncated = False - - def ensure_node(node_id: Optional[str], doc: Optional[Dict[str, Any]] = None) -> bool: - if not node_id: - return False - if node_id in nodes: - if doc: - if not include_external and doc.get("external_reference"): - nodes.pop(node_id, None) - return False - existing = nodes[node_id] - if (not existing.get("title") or existing["title"] == node_id) and doc.get("title"): - existing["title"] = doc["title"] - if not existing.get("channel_id") and doc.get("channel_id"): - existing["channel_id"] = doc["channel_id"] - if ( - existing.get("channel_name") in {"Unknown", node_id, None} - and (doc.get("channel_name") or doc.get("channel_id")) - ): - existing["channel_name"] = doc.get("channel_name") or doc.get("channel_id") - if not existing.get("url") and doc.get("url"): - existing["url"] = doc.get("url") - if not existing.get("date") and doc.get("date"): - existing["date"] = doc.get("date") - return True - if node_limit is not None and len(nodes) >= node_limit: - return False - channel_name = None - channel_id = None - url = None - date_val = None - title = node_id - if doc: - if not include_external and doc.get("external_reference"): - return False - title = doc.get("title") or title - channel_id = doc.get("channel_id") - channel_name = doc.get("channel_name") or channel_id - url = doc.get("url") - date_val = doc.get("date") - nodes[node_id] = { - "id": node_id, - "title": title, - "channel_id": channel_id, - "channel_name": channel_name or "Unknown", - "url": url, - "date": date_val, - "is_root": False, - "external_reference": bool(doc.get("external_reference")) if doc else False, - } - return True - - scroll_id: Optional[str] = None - try: - body = {"query": query, "_source": source_fields, "sort": ["_doc"]} - response = client.search( - index=index, body=body, size=batch_size, scroll="1m" - ) - scroll_id = response.get("_scroll_id") - stop_fetch = False - while not stop_fetch: - hits = response.get("hits", {}).get("hits", []) - if not hits: - break - for hit in hits: - if node_limit is not None and len(nodes) >= node_limit: - truncated = True - stop_fetch = True - break - source = hit.get("_source", {}) or {} - video_id = source.get("video_id") - if not video_id: - continue - if not include_external and source.get("external_reference"): - nodes.pop(video_id, None) - continue - if not ensure_node(video_id, source): - continue - for target in normalize_reference_list(source.get("internal_references")): - if target == video_id: - continue - if not ensure_node(target): - continue - key = (video_id, target, "references") - if key not in link_seen: - links.append( - {"source": video_id, "target": target, "relation": "references"} - ) - link_seen.add(key) - for origin in normalize_reference_list(source.get("referenced_by")): - if origin == video_id: - continue - if not ensure_node(origin): - continue - key = (origin, video_id, "referenced_by") - if key not in link_seen: - links.append( - {"source": origin, "target": video_id, "relation": "referenced_by"} - ) - link_seen.add(key) - if stop_fetch or not scroll_id: - break - response = client.scroll(scroll_id=scroll_id, scroll="1m") - scroll_id = response.get("_scroll_id") - if not scroll_id: - break - finally: - if scroll_id: - try: - client.clear_scroll(scroll_id=scroll_id) - except Exception: - pass - - if highlight_id and highlight_id in nodes: - nodes[highlight_id]["is_root"] = True - - links = [ - link - for link in links - if link.get("source") in nodes and link.get("target") in nodes - ] - - links = [ - link - for link in links - if link.get("source") in nodes and link.get("target") in nodes - ] - - return { - "root": highlight_id or "", - "depth": 0, - "nodes": list(nodes.values()), - "links": links, - "meta": { - "node_count": len(nodes), - "link_count": len(links), - "mode": "full", - "truncated": truncated, - }, - } @app.route("/api/channels") def channels(): @@ -952,6 +960,15 @@ def build_full_graph_payload( "top_hits": { "size": 1, "_source": {"includes": ["channel_name"]}, + "sort": [ + { + "channel_name.keyword": { + "order": "asc", + "missing": "_last", + "unmapped_type": "keyword", + } + } + ], } } }, @@ -1050,13 +1067,20 @@ def build_full_graph_payload( if full_graph: payload = build_full_graph_payload( + client, + index, None, highlight_id=video_id or None, include_external=include_external, ) else: payload = build_graph_payload( - video_id, depth, max_nodes, include_external=include_external + client, + index, + video_id, + depth, + max_nodes, + include_external=include_external, ) if not payload["nodes"]: return ( @@ -1367,6 +1391,15 @@ def build_full_graph_payload( "top_hits": { "size": 1, "_source": {"includes": ["channel_name"]}, + "sort": [ + { + "channel_name.keyword": { + "order": "asc", + "missing": "_last", + "unmapped_type": "keyword", + } + } + ], } } },