Initial commit
This commit is contained in:
226
transcript_collector.py
Normal file
226
transcript_collector.py
Normal file
@@ -0,0 +1,226 @@
|
||||
"""
|
||||
Lightweight helpers for gathering video metadata and transcripts from YouTube.
|
||||
|
||||
Usage:
|
||||
python -m python_app.transcript_collector --channel UC123 --output data/raw
|
||||
|
||||
Relies on:
|
||||
- YouTube Data API v3 (requires YOUTUBE_API_KEY).
|
||||
- youtube-transcript-api for transcript retrieval.
|
||||
Both libraries are optional at import time so the module can still be referenced
|
||||
when only working with existing JSON dumps.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
from dataclasses import asdict, dataclass
|
||||
from pathlib import Path
|
||||
from typing import Dict, Iterable, Iterator, List, Optional
|
||||
|
||||
from .config import CONFIG
|
||||
|
||||
try:
|
||||
from googleapiclient.discovery import build as build_youtube # type: ignore
|
||||
except ImportError: # pragma: no cover - library optional
|
||||
build_youtube = None
|
||||
|
||||
try:
|
||||
from youtube_transcript_api import YouTubeTranscriptApi # type: ignore
|
||||
except ImportError: # pragma: no cover - library optional
|
||||
YouTubeTranscriptApi = None
|
||||
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TranscriptSegment:
|
||||
start: float
|
||||
duration: float
|
||||
text: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class VideoRecord:
|
||||
video_id: str
|
||||
channel_id: str
|
||||
channel_title: str
|
||||
title: str
|
||||
description: str
|
||||
published_at: str
|
||||
url: str
|
||||
transcript: List[TranscriptSegment]
|
||||
|
||||
|
||||
def _ensure_youtube_client(api_key: Optional[str]):
|
||||
if build_youtube is None:
|
||||
raise RuntimeError(
|
||||
"google-api-python-client not installed. "
|
||||
"Install google-api-python-client to collect metadata."
|
||||
)
|
||||
if not api_key:
|
||||
raise RuntimeError(
|
||||
"Set YOUTUBE_API_KEY to collect metadata from YouTube."
|
||||
)
|
||||
return build_youtube("youtube", "v3", developerKey=api_key)
|
||||
|
||||
|
||||
def _ensure_transcript_api():
|
||||
if YouTubeTranscriptApi is None:
|
||||
raise RuntimeError(
|
||||
"youtube-transcript-api not installed. "
|
||||
"Install youtube-transcript-api to fetch transcripts."
|
||||
)
|
||||
return YouTubeTranscriptApi()
|
||||
|
||||
|
||||
def iter_channel_videos(
|
||||
channel_id: str,
|
||||
*,
|
||||
api_key: Optional[str] = None,
|
||||
max_pages: int = 10,
|
||||
) -> Iterator[Dict]:
|
||||
"""
|
||||
Yield raw playlist items for the uploads playlist of the given channel.
|
||||
|
||||
Args:
|
||||
channel_id: Target YouTube channel ID.
|
||||
api_key: Explicit API key (defaults to config value).
|
||||
max_pages: Hard cap on paginated playlist fetches to keep things simple.
|
||||
"""
|
||||
client = _ensure_youtube_client(api_key or CONFIG.youtube.api_key)
|
||||
channels = (
|
||||
client.channels().list(id=channel_id, part="contentDetails").execute()
|
||||
)
|
||||
items = channels.get("items", [])
|
||||
if not items:
|
||||
raise ValueError(f"Channel {channel_id} not found.")
|
||||
uploads_playlist = (
|
||||
items[0]
|
||||
.get("contentDetails", {})
|
||||
.get("relatedPlaylists", {})
|
||||
.get("uploads")
|
||||
)
|
||||
if not uploads_playlist:
|
||||
raise ValueError(f"Channel {channel_id} missing uploads playlist.")
|
||||
|
||||
request = client.playlistItems().list(
|
||||
playlistId=uploads_playlist, part="snippet", maxResults=50
|
||||
)
|
||||
page = 0
|
||||
while request and page < max_pages:
|
||||
response = request.execute()
|
||||
for item in response.get("items", []):
|
||||
yield item
|
||||
page += 1
|
||||
request = client.playlistItems().list_next(request, response)
|
||||
|
||||
|
||||
def fetch_transcript(
|
||||
video_id: str, *, languages: Optional[Iterable[str]] = None
|
||||
) -> List[TranscriptSegment]:
|
||||
"""Return transcript segments for a video, if available."""
|
||||
api = _ensure_transcript_api()
|
||||
try:
|
||||
transcripts = api.get_transcript(video_id, languages=languages)
|
||||
except Exception as exc: # broad catch keeps draft simple
|
||||
LOGGER.warning("Transcript unavailable for %s: %s", video_id, exc)
|
||||
return []
|
||||
return [
|
||||
TranscriptSegment(
|
||||
start=entry.get("start", 0.0),
|
||||
duration=entry.get("duration", 0.0),
|
||||
text=entry.get("text", ""),
|
||||
)
|
||||
for entry in transcripts
|
||||
]
|
||||
|
||||
|
||||
def collect_channel(
|
||||
channel_id: str,
|
||||
output_dir: Path,
|
||||
*,
|
||||
api_key: Optional[str] = None,
|
||||
max_pages: int = 2,
|
||||
languages: Optional[List[str]] = None,
|
||||
) -> List[VideoRecord]:
|
||||
"""
|
||||
Collect metadata + transcripts for a channel and store as JSON files.
|
||||
|
||||
Returns the in-memory list to make it easy to chain into ingestion.
|
||||
"""
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
videos: List[VideoRecord] = []
|
||||
for item in iter_channel_videos(
|
||||
channel_id, api_key=api_key, max_pages=max_pages
|
||||
):
|
||||
snippet = item.get("snippet", {})
|
||||
video_id = snippet.get("resourceId", {}).get("videoId")
|
||||
if not video_id:
|
||||
continue
|
||||
segments = fetch_transcript(video_id, languages=languages)
|
||||
record = VideoRecord(
|
||||
video_id=video_id,
|
||||
channel_id=snippet.get("channelId", channel_id),
|
||||
channel_title=snippet.get("channelTitle", ""),
|
||||
title=snippet.get("title", ""),
|
||||
description=snippet.get("description", ""),
|
||||
published_at=snippet.get("publishedAt", ""),
|
||||
url=f"https://www.youtube.com/watch?v={video_id}",
|
||||
transcript=segments,
|
||||
)
|
||||
videos.append(record)
|
||||
dest = output_dir / f"{video_id}.json"
|
||||
with dest.open("w", encoding="utf-8") as handle:
|
||||
json.dump(asdict(record), handle, ensure_ascii=False, indent=2)
|
||||
LOGGER.info("Saved %s", dest)
|
||||
return videos
|
||||
|
||||
|
||||
def _build_parser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Collect channel transcripts into JSON files."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--channel",
|
||||
required=True,
|
||||
help="YouTube channel ID (e.g. UCXYZ).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output",
|
||||
type=Path,
|
||||
default=Path("data/raw"),
|
||||
help="Directory to write per-video JSON files.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--max-pages",
|
||||
type=int,
|
||||
default=2,
|
||||
help="Number of paginated channel pages to pull (50 videos per page).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--language",
|
||||
dest="languages",
|
||||
action="append",
|
||||
help="Preferred transcript languages (can be repeated).",
|
||||
)
|
||||
return parser
|
||||
|
||||
|
||||
def main() -> None:
|
||||
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
|
||||
args = _build_parser().parse_args()
|
||||
collect_channel(
|
||||
args.channel,
|
||||
args.output,
|
||||
max_pages=args.max_pages,
|
||||
languages=args.languages,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user