Add graph traversal endpoints and sort metrics by channel name
This commit is contained in:
673
search_app.py
673
search_app.py
@@ -201,6 +201,15 @@ def elastic_metrics_payload(
|
|||||||
"top_hits": {
|
"top_hits": {
|
||||||
"size": 1,
|
"size": 1,
|
||||||
"_source": {"includes": ["channel_name"]},
|
"_source": {"includes": ["channel_name"]},
|
||||||
|
"sort": [
|
||||||
|
{
|
||||||
|
"channel_name.keyword": {
|
||||||
|
"order": "asc",
|
||||||
|
"missing": "_last",
|
||||||
|
"unmapped_type": "keyword",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@@ -568,6 +577,324 @@ def build_query_payload(
|
|||||||
return body
|
return body
|
||||||
|
|
||||||
|
|
||||||
|
def build_graph_payload(
|
||||||
|
client: "Elasticsearch",
|
||||||
|
index: str,
|
||||||
|
root_id: str,
|
||||||
|
depth: int,
|
||||||
|
max_nodes: int,
|
||||||
|
*,
|
||||||
|
include_external: bool = True,
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
root_id = root_id.strip()
|
||||||
|
if not root_id:
|
||||||
|
return {"nodes": [], "links": [], "root": root_id, "depth": depth, "meta": {}}
|
||||||
|
|
||||||
|
doc_cache: Dict[str, Optional[Dict[str, Any]]] = {}
|
||||||
|
|
||||||
|
def fetch_document(video_id: str) -> Optional[Dict[str, Any]]:
|
||||||
|
if video_id in doc_cache:
|
||||||
|
return doc_cache[video_id]
|
||||||
|
try:
|
||||||
|
result = client.get(index=index, id=video_id)
|
||||||
|
doc_cache[video_id] = result.get("_source")
|
||||||
|
except Exception as exc: # pragma: no cover - elasticsearch handles errors
|
||||||
|
LOGGER.debug("Graph: failed to load %s: %s", video_id, exc)
|
||||||
|
doc_cache[video_id] = None
|
||||||
|
doc = doc_cache[video_id]
|
||||||
|
if doc is not None and not include_external and doc.get("external_reference"):
|
||||||
|
doc_cache[video_id] = None
|
||||||
|
return None
|
||||||
|
return doc_cache[video_id]
|
||||||
|
|
||||||
|
nodes: Dict[str, Dict[str, Any]] = {}
|
||||||
|
links: List[Dict[str, Any]] = []
|
||||||
|
link_seen: Set[Tuple[str, str, str]] = set()
|
||||||
|
queue: deque[Tuple[str, int]] = deque([(root_id, 0)])
|
||||||
|
queued: Set[str] = {root_id}
|
||||||
|
visited: Set[str] = set()
|
||||||
|
|
||||||
|
while queue and len(nodes) < max_nodes:
|
||||||
|
current_id, level = queue.popleft()
|
||||||
|
queued.discard(current_id)
|
||||||
|
if current_id in visited:
|
||||||
|
continue
|
||||||
|
doc = fetch_document(current_id)
|
||||||
|
if doc is None:
|
||||||
|
if current_id == root_id:
|
||||||
|
break
|
||||||
|
visited.add(current_id)
|
||||||
|
continue
|
||||||
|
|
||||||
|
visited.add(current_id)
|
||||||
|
nodes[current_id] = {
|
||||||
|
"id": current_id,
|
||||||
|
"title": doc.get("title") or current_id,
|
||||||
|
"channel_id": doc.get("channel_id"),
|
||||||
|
"channel_name": doc.get("channel_name") or doc.get("channel_id") or "Unknown",
|
||||||
|
"url": doc.get("url"),
|
||||||
|
"date": doc.get("date"),
|
||||||
|
"is_root": current_id == root_id,
|
||||||
|
"external_reference": bool(doc.get("external_reference")),
|
||||||
|
}
|
||||||
|
|
||||||
|
if level >= depth:
|
||||||
|
continue
|
||||||
|
|
||||||
|
neighbor_ids: List[str] = []
|
||||||
|
|
||||||
|
for ref_id in normalize_reference_list(doc.get("internal_references")):
|
||||||
|
if ref_id == current_id:
|
||||||
|
continue
|
||||||
|
key = (current_id, ref_id, "references")
|
||||||
|
if key not in link_seen:
|
||||||
|
links.append(
|
||||||
|
{"source": current_id, "target": ref_id, "relation": "references"}
|
||||||
|
)
|
||||||
|
link_seen.add(key)
|
||||||
|
neighbor_ids.append(ref_id)
|
||||||
|
|
||||||
|
for ref_id in normalize_reference_list(doc.get("referenced_by")):
|
||||||
|
if ref_id == current_id:
|
||||||
|
continue
|
||||||
|
key = (ref_id, current_id, "referenced_by")
|
||||||
|
if key not in link_seen:
|
||||||
|
links.append(
|
||||||
|
{"source": ref_id, "target": current_id, "relation": "referenced_by"}
|
||||||
|
)
|
||||||
|
link_seen.add(key)
|
||||||
|
neighbor_ids.append(ref_id)
|
||||||
|
|
||||||
|
for neighbor in neighbor_ids:
|
||||||
|
if neighbor in visited or neighbor in queued:
|
||||||
|
continue
|
||||||
|
if len(nodes) + len(queue) >= max_nodes:
|
||||||
|
break
|
||||||
|
queue.append((neighbor, level + 1))
|
||||||
|
queued.add(neighbor)
|
||||||
|
|
||||||
|
# Ensure nodes referenced by links exist in the payload.
|
||||||
|
for link in links:
|
||||||
|
for key in ("source", "target"):
|
||||||
|
node_id = link[key]
|
||||||
|
if node_id in nodes:
|
||||||
|
continue
|
||||||
|
doc = fetch_document(node_id)
|
||||||
|
if doc is None:
|
||||||
|
if include_external:
|
||||||
|
nodes[node_id] = {
|
||||||
|
"id": node_id,
|
||||||
|
"title": node_id,
|
||||||
|
"channel_id": None,
|
||||||
|
"channel_name": "Unknown",
|
||||||
|
"url": None,
|
||||||
|
"date": None,
|
||||||
|
"is_root": node_id == root_id,
|
||||||
|
"external_reference": False,
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
nodes[node_id] = {
|
||||||
|
"id": node_id,
|
||||||
|
"title": doc.get("title") or node_id,
|
||||||
|
"channel_id": doc.get("channel_id"),
|
||||||
|
"channel_name": doc.get("channel_name") or doc.get("channel_id") or "Unknown",
|
||||||
|
"url": doc.get("url"),
|
||||||
|
"date": doc.get("date"),
|
||||||
|
"is_root": node_id == root_id,
|
||||||
|
"external_reference": bool(doc.get("external_reference")),
|
||||||
|
}
|
||||||
|
|
||||||
|
links = [
|
||||||
|
link
|
||||||
|
for link in links
|
||||||
|
if link.get("source") in nodes and link.get("target") in nodes
|
||||||
|
]
|
||||||
|
|
||||||
|
return {
|
||||||
|
"root": root_id,
|
||||||
|
"depth": depth,
|
||||||
|
"nodes": list(nodes.values()),
|
||||||
|
"links": links,
|
||||||
|
"meta": {
|
||||||
|
"node_count": len(nodes),
|
||||||
|
"link_count": len(links),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def build_full_graph_payload(
|
||||||
|
client: "Elasticsearch",
|
||||||
|
index: str,
|
||||||
|
max_nodes: Optional[int],
|
||||||
|
*,
|
||||||
|
highlight_id: Optional[str] = None,
|
||||||
|
include_external: bool = True,
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Attempt to render the entire reference graph by gathering every video that
|
||||||
|
references another (or is referenced).
|
||||||
|
"""
|
||||||
|
|
||||||
|
query = {
|
||||||
|
"bool": {
|
||||||
|
"should": [
|
||||||
|
{"range": {"internal_references_count": {"gt": 0}}},
|
||||||
|
{"range": {"referenced_by_count": {"gt": 0}}},
|
||||||
|
{"exists": {"field": "internal_references"}},
|
||||||
|
{"exists": {"field": "referenced_by"}},
|
||||||
|
],
|
||||||
|
"minimum_should_match": 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
source_fields = [
|
||||||
|
"video_id",
|
||||||
|
"title",
|
||||||
|
"channel_id",
|
||||||
|
"channel_name",
|
||||||
|
"url",
|
||||||
|
"date",
|
||||||
|
"internal_references",
|
||||||
|
"referenced_by",
|
||||||
|
]
|
||||||
|
nodes: Dict[str, Dict[str, Any]] = {}
|
||||||
|
links: List[Dict[str, Any]] = []
|
||||||
|
link_seen: Set[Tuple[str, str, str]] = set()
|
||||||
|
node_limit = max_nodes if max_nodes and max_nodes > 0 else None
|
||||||
|
batch_size = 500 if node_limit is None else min(500, max(50, node_limit * 2))
|
||||||
|
truncated = False
|
||||||
|
|
||||||
|
def ensure_node(node_id: Optional[str], doc: Optional[Dict[str, Any]] = None) -> bool:
|
||||||
|
if not node_id:
|
||||||
|
return False
|
||||||
|
if node_id in nodes:
|
||||||
|
if doc:
|
||||||
|
if not include_external and doc.get("external_reference"):
|
||||||
|
nodes.pop(node_id, None)
|
||||||
|
return False
|
||||||
|
existing = nodes[node_id]
|
||||||
|
if (not existing.get("title") or existing["title"] == node_id) and doc.get("title"):
|
||||||
|
existing["title"] = doc["title"]
|
||||||
|
if not existing.get("channel_id") and doc.get("channel_id"):
|
||||||
|
existing["channel_id"] = doc.get("channel_id")
|
||||||
|
if (
|
||||||
|
existing.get("channel_name") in {"Unknown", node_id, None}
|
||||||
|
and (doc.get("channel_name") or doc.get("channel_id"))
|
||||||
|
):
|
||||||
|
existing["channel_name"] = doc.get("channel_name") or doc.get("channel_id")
|
||||||
|
if not existing.get("url") and doc.get("url"):
|
||||||
|
existing["url"] = doc.get("url")
|
||||||
|
if not existing.get("date") and doc.get("date"):
|
||||||
|
existing["date"] = doc.get("date")
|
||||||
|
return True
|
||||||
|
if node_limit is not None and len(nodes) >= node_limit:
|
||||||
|
return False
|
||||||
|
channel_name = None
|
||||||
|
channel_id = None
|
||||||
|
url = None
|
||||||
|
date_val = None
|
||||||
|
title = node_id
|
||||||
|
if doc:
|
||||||
|
if not include_external and doc.get("external_reference"):
|
||||||
|
return False
|
||||||
|
title = doc.get("title") or title
|
||||||
|
channel_id = doc.get("channel_id")
|
||||||
|
channel_name = doc.get("channel_name") or channel_id
|
||||||
|
url = doc.get("url")
|
||||||
|
date_val = doc.get("date")
|
||||||
|
nodes[node_id] = {
|
||||||
|
"id": node_id,
|
||||||
|
"title": title,
|
||||||
|
"channel_id": channel_id,
|
||||||
|
"channel_name": channel_name or "Unknown",
|
||||||
|
"url": url,
|
||||||
|
"date": date_val,
|
||||||
|
"is_root": False,
|
||||||
|
"external_reference": bool(doc.get("external_reference")) if doc else False,
|
||||||
|
}
|
||||||
|
return True
|
||||||
|
|
||||||
|
scroll_id: Optional[str] = None
|
||||||
|
try:
|
||||||
|
body = {"query": query, "_source": source_fields, "sort": ["_doc"]}
|
||||||
|
response = client.search(index=index, body=body, size=batch_size, scroll="1m")
|
||||||
|
scroll_id = response.get("_scroll_id")
|
||||||
|
stop_fetch = False
|
||||||
|
while not stop_fetch:
|
||||||
|
hits = response.get("hits", {}).get("hits", [])
|
||||||
|
if not hits:
|
||||||
|
break
|
||||||
|
for hit in hits:
|
||||||
|
if node_limit is not None and len(nodes) >= node_limit:
|
||||||
|
truncated = True
|
||||||
|
stop_fetch = True
|
||||||
|
break
|
||||||
|
source = hit.get("_source", {}) or {}
|
||||||
|
video_id = source.get("video_id")
|
||||||
|
if not video_id:
|
||||||
|
continue
|
||||||
|
if not include_external and source.get("external_reference"):
|
||||||
|
nodes.pop(video_id, None)
|
||||||
|
continue
|
||||||
|
if not ensure_node(video_id, source):
|
||||||
|
continue
|
||||||
|
for target in normalize_reference_list(source.get("internal_references")):
|
||||||
|
if target == video_id:
|
||||||
|
continue
|
||||||
|
if not ensure_node(target):
|
||||||
|
continue
|
||||||
|
key = (video_id, target, "references")
|
||||||
|
if key not in link_seen:
|
||||||
|
links.append(
|
||||||
|
{"source": video_id, "target": target, "relation": "references"}
|
||||||
|
)
|
||||||
|
link_seen.add(key)
|
||||||
|
for origin in normalize_reference_list(source.get("referenced_by")):
|
||||||
|
if origin == video_id:
|
||||||
|
continue
|
||||||
|
if not ensure_node(origin):
|
||||||
|
continue
|
||||||
|
key = (origin, video_id, "referenced_by")
|
||||||
|
if key not in link_seen:
|
||||||
|
links.append(
|
||||||
|
{"source": origin, "target": video_id, "relation": "referenced_by"}
|
||||||
|
)
|
||||||
|
link_seen.add(key)
|
||||||
|
if stop_fetch or not scroll_id:
|
||||||
|
break
|
||||||
|
response = client.scroll(scroll_id=scroll_id, scroll="1m")
|
||||||
|
scroll_id = response.get("_scroll_id")
|
||||||
|
if not scroll_id:
|
||||||
|
break
|
||||||
|
finally:
|
||||||
|
if scroll_id:
|
||||||
|
try:
|
||||||
|
client.clear_scroll(scroll_id=scroll_id)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if highlight_id and highlight_id in nodes:
|
||||||
|
nodes[highlight_id]["is_root"] = True
|
||||||
|
|
||||||
|
links = [
|
||||||
|
link
|
||||||
|
for link in links
|
||||||
|
if link.get("source") in nodes and link.get("target") in nodes
|
||||||
|
]
|
||||||
|
|
||||||
|
return {
|
||||||
|
"root": highlight_id or "",
|
||||||
|
"depth": 0,
|
||||||
|
"nodes": list(nodes.values()),
|
||||||
|
"links": links,
|
||||||
|
"meta": {
|
||||||
|
"node_count": len(nodes),
|
||||||
|
"link_count": len(links),
|
||||||
|
"mode": "full",
|
||||||
|
"truncated": truncated,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def create_app(config: AppConfig = CONFIG) -> Flask:
|
def create_app(config: AppConfig = CONFIG) -> Flask:
|
||||||
app = Flask(__name__, static_folder=str(Path(__file__).parent / "static"))
|
app = Flask(__name__, static_folder=str(Path(__file__).parent / "static"))
|
||||||
client = _ensure_client(config)
|
client = _ensure_client(config)
|
||||||
@@ -618,325 +945,6 @@ def create_app(config: AppConfig = CONFIG) -> Flask:
|
|||||||
normalized.append(text)
|
normalized.append(text)
|
||||||
return normalized
|
return normalized
|
||||||
|
|
||||||
def build_graph_payload(
|
|
||||||
root_id: str, depth: int, max_nodes: int, *, include_external: bool = True
|
|
||||||
) -> Dict[str, Any]:
|
|
||||||
root_id = root_id.strip()
|
|
||||||
if not root_id:
|
|
||||||
return {"nodes": [], "links": [], "root": root_id, "depth": depth, "meta": {}}
|
|
||||||
|
|
||||||
doc_cache: Dict[str, Optional[Dict[str, Any]]] = {}
|
|
||||||
|
|
||||||
def fetch_document(video_id: str) -> Optional[Dict[str, Any]]:
|
|
||||||
if video_id in doc_cache:
|
|
||||||
return doc_cache[video_id]
|
|
||||||
try:
|
|
||||||
result = client.get(index=index, id=video_id)
|
|
||||||
doc_cache[video_id] = result.get("_source")
|
|
||||||
except Exception as exc: # pragma: no cover - elasticsearch handles errors
|
|
||||||
LOGGER.debug("Graph: failed to load %s: %s", video_id, exc)
|
|
||||||
doc_cache[video_id] = None
|
|
||||||
doc = doc_cache[video_id]
|
|
||||||
if (
|
|
||||||
doc is not None
|
|
||||||
and not include_external
|
|
||||||
and doc.get("external_reference")
|
|
||||||
):
|
|
||||||
doc_cache[video_id] = None
|
|
||||||
return None
|
|
||||||
return doc_cache[video_id]
|
|
||||||
|
|
||||||
nodes: Dict[str, Dict[str, Any]] = {}
|
|
||||||
links: List[Dict[str, Any]] = []
|
|
||||||
link_seen: Set[Tuple[str, str, str]] = set()
|
|
||||||
queue: deque[Tuple[str, int]] = deque([(root_id, 0)])
|
|
||||||
queued: Set[str] = {root_id}
|
|
||||||
visited: Set[str] = set()
|
|
||||||
|
|
||||||
while queue and len(nodes) < max_nodes:
|
|
||||||
current_id, level = queue.popleft()
|
|
||||||
queued.discard(current_id)
|
|
||||||
if current_id in visited:
|
|
||||||
continue
|
|
||||||
doc = fetch_document(current_id)
|
|
||||||
if doc is None:
|
|
||||||
if current_id == root_id:
|
|
||||||
break
|
|
||||||
visited.add(current_id)
|
|
||||||
continue
|
|
||||||
|
|
||||||
visited.add(current_id)
|
|
||||||
nodes[current_id] = {
|
|
||||||
"id": current_id,
|
|
||||||
"title": doc.get("title") or current_id,
|
|
||||||
"channel_id": doc.get("channel_id"),
|
|
||||||
"channel_name": doc.get("channel_name") or doc.get("channel_id") or "Unknown",
|
|
||||||
"url": doc.get("url"),
|
|
||||||
"date": doc.get("date"),
|
|
||||||
"is_root": current_id == root_id,
|
|
||||||
"external_reference": bool(doc.get("external_reference")),
|
|
||||||
}
|
|
||||||
|
|
||||||
if level >= depth:
|
|
||||||
continue
|
|
||||||
|
|
||||||
neighbor_ids: List[str] = []
|
|
||||||
|
|
||||||
for ref_id in normalize_reference_list(doc.get("internal_references")):
|
|
||||||
if ref_id == current_id:
|
|
||||||
continue
|
|
||||||
key = (current_id, ref_id, "references")
|
|
||||||
if key not in link_seen:
|
|
||||||
links.append(
|
|
||||||
{"source": current_id, "target": ref_id, "relation": "references"}
|
|
||||||
)
|
|
||||||
link_seen.add(key)
|
|
||||||
neighbor_ids.append(ref_id)
|
|
||||||
|
|
||||||
for ref_id in normalize_reference_list(doc.get("referenced_by")):
|
|
||||||
if ref_id == current_id:
|
|
||||||
continue
|
|
||||||
key = (ref_id, current_id, "referenced_by")
|
|
||||||
if key not in link_seen:
|
|
||||||
links.append(
|
|
||||||
{"source": ref_id, "target": current_id, "relation": "referenced_by"}
|
|
||||||
)
|
|
||||||
link_seen.add(key)
|
|
||||||
neighbor_ids.append(ref_id)
|
|
||||||
|
|
||||||
for neighbor in neighbor_ids:
|
|
||||||
if neighbor in visited or neighbor in queued:
|
|
||||||
continue
|
|
||||||
if len(nodes) + len(queue) >= max_nodes:
|
|
||||||
break
|
|
||||||
queue.append((neighbor, level + 1))
|
|
||||||
queued.add(neighbor)
|
|
||||||
|
|
||||||
# Ensure nodes referenced by links exist in the payload.
|
|
||||||
for link in links:
|
|
||||||
for key in ("source", "target"):
|
|
||||||
node_id = link[key]
|
|
||||||
if node_id in nodes:
|
|
||||||
continue
|
|
||||||
doc = fetch_document(node_id)
|
|
||||||
if doc is None:
|
|
||||||
if include_external:
|
|
||||||
nodes[node_id] = {
|
|
||||||
"id": node_id,
|
|
||||||
"title": node_id,
|
|
||||||
"channel_id": None,
|
|
||||||
"channel_name": "Unknown",
|
|
||||||
"url": None,
|
|
||||||
"date": None,
|
|
||||||
"is_root": node_id == root_id,
|
|
||||||
"external_reference": False,
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
nodes[node_id] = {
|
|
||||||
"id": node_id,
|
|
||||||
"title": doc.get("title") or node_id,
|
|
||||||
"channel_id": doc.get("channel_id"),
|
|
||||||
"channel_name": doc.get("channel_name") or doc.get("channel_id") or "Unknown",
|
|
||||||
"url": doc.get("url"),
|
|
||||||
"date": doc.get("date"),
|
|
||||||
"is_root": node_id == root_id,
|
|
||||||
"external_reference": bool(doc.get("external_reference")),
|
|
||||||
}
|
|
||||||
|
|
||||||
links = [
|
|
||||||
link
|
|
||||||
for link in links
|
|
||||||
if link.get("source") in nodes and link.get("target") in nodes
|
|
||||||
]
|
|
||||||
|
|
||||||
return {
|
|
||||||
"root": root_id,
|
|
||||||
"depth": depth,
|
|
||||||
"nodes": list(nodes.values()),
|
|
||||||
"links": links,
|
|
||||||
"meta": {
|
|
||||||
"node_count": len(nodes),
|
|
||||||
"link_count": len(links),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
def build_full_graph_payload(
|
|
||||||
max_nodes: Optional[int],
|
|
||||||
*,
|
|
||||||
highlight_id: Optional[str] = None,
|
|
||||||
include_external: bool = True,
|
|
||||||
) -> Dict[str, Any]:
|
|
||||||
"""
|
|
||||||
Attempt to render the entire reference graph by gathering every video that
|
|
||||||
references another (or is referenced).
|
|
||||||
"""
|
|
||||||
|
|
||||||
query = {
|
|
||||||
"bool": {
|
|
||||||
"should": [
|
|
||||||
{"range": {"internal_references_count": {"gt": 0}}},
|
|
||||||
{"range": {"referenced_by_count": {"gt": 0}}},
|
|
||||||
{"exists": {"field": "internal_references"}},
|
|
||||||
{"exists": {"field": "referenced_by"}},
|
|
||||||
],
|
|
||||||
"minimum_should_match": 1,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
source_fields = [
|
|
||||||
"video_id",
|
|
||||||
"title",
|
|
||||||
"channel_id",
|
|
||||||
"channel_name",
|
|
||||||
"url",
|
|
||||||
"date",
|
|
||||||
"internal_references",
|
|
||||||
"referenced_by",
|
|
||||||
]
|
|
||||||
nodes: Dict[str, Dict[str, Any]] = {}
|
|
||||||
links: List[Dict[str, Any]] = []
|
|
||||||
link_seen: Set[Tuple[str, str, str]] = set()
|
|
||||||
node_limit = max_nodes if max_nodes and max_nodes > 0 else None
|
|
||||||
batch_size = 500 if node_limit is None else min(500, max(50, node_limit * 2))
|
|
||||||
truncated = False
|
|
||||||
|
|
||||||
def ensure_node(node_id: Optional[str], doc: Optional[Dict[str, Any]] = None) -> bool:
|
|
||||||
if not node_id:
|
|
||||||
return False
|
|
||||||
if node_id in nodes:
|
|
||||||
if doc:
|
|
||||||
if not include_external and doc.get("external_reference"):
|
|
||||||
nodes.pop(node_id, None)
|
|
||||||
return False
|
|
||||||
existing = nodes[node_id]
|
|
||||||
if (not existing.get("title") or existing["title"] == node_id) and doc.get("title"):
|
|
||||||
existing["title"] = doc["title"]
|
|
||||||
if not existing.get("channel_id") and doc.get("channel_id"):
|
|
||||||
existing["channel_id"] = doc["channel_id"]
|
|
||||||
if (
|
|
||||||
existing.get("channel_name") in {"Unknown", node_id, None}
|
|
||||||
and (doc.get("channel_name") or doc.get("channel_id"))
|
|
||||||
):
|
|
||||||
existing["channel_name"] = doc.get("channel_name") or doc.get("channel_id")
|
|
||||||
if not existing.get("url") and doc.get("url"):
|
|
||||||
existing["url"] = doc.get("url")
|
|
||||||
if not existing.get("date") and doc.get("date"):
|
|
||||||
existing["date"] = doc.get("date")
|
|
||||||
return True
|
|
||||||
if node_limit is not None and len(nodes) >= node_limit:
|
|
||||||
return False
|
|
||||||
channel_name = None
|
|
||||||
channel_id = None
|
|
||||||
url = None
|
|
||||||
date_val = None
|
|
||||||
title = node_id
|
|
||||||
if doc:
|
|
||||||
if not include_external and doc.get("external_reference"):
|
|
||||||
return False
|
|
||||||
title = doc.get("title") or title
|
|
||||||
channel_id = doc.get("channel_id")
|
|
||||||
channel_name = doc.get("channel_name") or channel_id
|
|
||||||
url = doc.get("url")
|
|
||||||
date_val = doc.get("date")
|
|
||||||
nodes[node_id] = {
|
|
||||||
"id": node_id,
|
|
||||||
"title": title,
|
|
||||||
"channel_id": channel_id,
|
|
||||||
"channel_name": channel_name or "Unknown",
|
|
||||||
"url": url,
|
|
||||||
"date": date_val,
|
|
||||||
"is_root": False,
|
|
||||||
"external_reference": bool(doc.get("external_reference")) if doc else False,
|
|
||||||
}
|
|
||||||
return True
|
|
||||||
|
|
||||||
scroll_id: Optional[str] = None
|
|
||||||
try:
|
|
||||||
body = {"query": query, "_source": source_fields, "sort": ["_doc"]}
|
|
||||||
response = client.search(
|
|
||||||
index=index, body=body, size=batch_size, scroll="1m"
|
|
||||||
)
|
|
||||||
scroll_id = response.get("_scroll_id")
|
|
||||||
stop_fetch = False
|
|
||||||
while not stop_fetch:
|
|
||||||
hits = response.get("hits", {}).get("hits", [])
|
|
||||||
if not hits:
|
|
||||||
break
|
|
||||||
for hit in hits:
|
|
||||||
if node_limit is not None and len(nodes) >= node_limit:
|
|
||||||
truncated = True
|
|
||||||
stop_fetch = True
|
|
||||||
break
|
|
||||||
source = hit.get("_source", {}) or {}
|
|
||||||
video_id = source.get("video_id")
|
|
||||||
if not video_id:
|
|
||||||
continue
|
|
||||||
if not include_external and source.get("external_reference"):
|
|
||||||
nodes.pop(video_id, None)
|
|
||||||
continue
|
|
||||||
if not ensure_node(video_id, source):
|
|
||||||
continue
|
|
||||||
for target in normalize_reference_list(source.get("internal_references")):
|
|
||||||
if target == video_id:
|
|
||||||
continue
|
|
||||||
if not ensure_node(target):
|
|
||||||
continue
|
|
||||||
key = (video_id, target, "references")
|
|
||||||
if key not in link_seen:
|
|
||||||
links.append(
|
|
||||||
{"source": video_id, "target": target, "relation": "references"}
|
|
||||||
)
|
|
||||||
link_seen.add(key)
|
|
||||||
for origin in normalize_reference_list(source.get("referenced_by")):
|
|
||||||
if origin == video_id:
|
|
||||||
continue
|
|
||||||
if not ensure_node(origin):
|
|
||||||
continue
|
|
||||||
key = (origin, video_id, "referenced_by")
|
|
||||||
if key not in link_seen:
|
|
||||||
links.append(
|
|
||||||
{"source": origin, "target": video_id, "relation": "referenced_by"}
|
|
||||||
)
|
|
||||||
link_seen.add(key)
|
|
||||||
if stop_fetch or not scroll_id:
|
|
||||||
break
|
|
||||||
response = client.scroll(scroll_id=scroll_id, scroll="1m")
|
|
||||||
scroll_id = response.get("_scroll_id")
|
|
||||||
if not scroll_id:
|
|
||||||
break
|
|
||||||
finally:
|
|
||||||
if scroll_id:
|
|
||||||
try:
|
|
||||||
client.clear_scroll(scroll_id=scroll_id)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
if highlight_id and highlight_id in nodes:
|
|
||||||
nodes[highlight_id]["is_root"] = True
|
|
||||||
|
|
||||||
links = [
|
|
||||||
link
|
|
||||||
for link in links
|
|
||||||
if link.get("source") in nodes and link.get("target") in nodes
|
|
||||||
]
|
|
||||||
|
|
||||||
links = [
|
|
||||||
link
|
|
||||||
for link in links
|
|
||||||
if link.get("source") in nodes and link.get("target") in nodes
|
|
||||||
]
|
|
||||||
|
|
||||||
return {
|
|
||||||
"root": highlight_id or "",
|
|
||||||
"depth": 0,
|
|
||||||
"nodes": list(nodes.values()),
|
|
||||||
"links": links,
|
|
||||||
"meta": {
|
|
||||||
"node_count": len(nodes),
|
|
||||||
"link_count": len(links),
|
|
||||||
"mode": "full",
|
|
||||||
"truncated": truncated,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
@app.route("/api/channels")
|
@app.route("/api/channels")
|
||||||
def channels():
|
def channels():
|
||||||
@@ -952,6 +960,15 @@ def build_full_graph_payload(
|
|||||||
"top_hits": {
|
"top_hits": {
|
||||||
"size": 1,
|
"size": 1,
|
||||||
"_source": {"includes": ["channel_name"]},
|
"_source": {"includes": ["channel_name"]},
|
||||||
|
"sort": [
|
||||||
|
{
|
||||||
|
"channel_name.keyword": {
|
||||||
|
"order": "asc",
|
||||||
|
"missing": "_last",
|
||||||
|
"unmapped_type": "keyword",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@@ -1050,13 +1067,20 @@ def build_full_graph_payload(
|
|||||||
|
|
||||||
if full_graph:
|
if full_graph:
|
||||||
payload = build_full_graph_payload(
|
payload = build_full_graph_payload(
|
||||||
|
client,
|
||||||
|
index,
|
||||||
None,
|
None,
|
||||||
highlight_id=video_id or None,
|
highlight_id=video_id or None,
|
||||||
include_external=include_external,
|
include_external=include_external,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
payload = build_graph_payload(
|
payload = build_graph_payload(
|
||||||
video_id, depth, max_nodes, include_external=include_external
|
client,
|
||||||
|
index,
|
||||||
|
video_id,
|
||||||
|
depth,
|
||||||
|
max_nodes,
|
||||||
|
include_external=include_external,
|
||||||
)
|
)
|
||||||
if not payload["nodes"]:
|
if not payload["nodes"]:
|
||||||
return (
|
return (
|
||||||
@@ -1367,6 +1391,15 @@ def build_full_graph_payload(
|
|||||||
"top_hits": {
|
"top_hits": {
|
||||||
"size": 1,
|
"size": 1,
|
||||||
"_source": {"includes": ["channel_name"]},
|
"_source": {"includes": ["channel_name"]},
|
||||||
|
"sort": [
|
||||||
|
{
|
||||||
|
"channel_name.keyword": {
|
||||||
|
"order": "asc",
|
||||||
|
"missing": "_last",
|
||||||
|
"unmapped_type": "keyword",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|||||||
Reference in New Issue
Block a user