diff --git a/poetry.lock b/poetry.lock index 9e7646c..1bbb34a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,5 +1,28 @@ # This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. +[[package]] +name = "anyio" +version = "4.5.2" +description = "High level compatibility layer for multiple asynchronous event loop implementations" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "anyio-4.5.2-py3-none-any.whl", hash = "sha256:c011ee36bc1e8ba40e5a81cb9df91925c218fe9b778554e0b56a21e1b5d4716f"}, + {file = "anyio-4.5.2.tar.gz", hash = "sha256:23009af4ed04ce05991845451e11ef02fc7c5ed29179ac9a420e5ad0ac7ddc5b"}, +] + +[package.dependencies] +exceptiongroup = {version = ">=1.0.2", markers = "python_version < \"3.11\""} +idna = ">=2.8" +sniffio = ">=1.1" +typing-extensions = {version = ">=4.1", markers = "python_version < \"3.11\""} + +[package.extras] +doc = ["Sphinx (>=7.4,<8.0)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphinx-rtd-theme"] +test = ["anyio[trio]", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "truststore (>=0.9.1) ; python_version >= \"3.10\"", "uvloop (>=0.21.0b1) ; platform_python_implementation == \"CPython\" and platform_system != \"Windows\""] +trio = ["trio (>=0.26.1)"] + [[package]] name = "certifi" version = "2025.1.31" @@ -230,7 +253,7 @@ version = "1.2.2" description = "Backport of PEP 654 (exception groups)" optional = false python-versions = ">=3.7" -groups = ["test"] +groups = ["main", "test"] markers = "python_version < \"3.11\"" files = [ {file = "exceptiongroup-1.2.2-py3-none-any.whl", hash = "sha256:3111b9d131c238bec2f8f516e123e14ba243563fb135d3fe885990585aa7795b"}, @@ -240,6 +263,40 @@ files = [ [package.extras] test = ["pytest (>=6)"] +[[package]] +name = "h11" +version = "0.16.0" +description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86"}, + {file = "h11-0.16.0.tar.gz", hash = "sha256:4e35b956cf45792e4caa5885e69fba00bdbc6ffafbfa020300e549b208ee5ff1"}, +] + +[[package]] +name = "httpcore" +version = "1.0.9" +description = "A minimal low-level HTTP client." +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55"}, + {file = "httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8"}, +] + +[package.dependencies] +certifi = "*" +h11 = ">=0.16" + +[package.extras] +asyncio = ["anyio (>=4.0,<5.0)"] +http2 = ["h2 (>=3,<5)"] +socks = ["socksio (==1.*)"] +trio = ["trio (>=0.22.0,<1.0)"] + [[package]] name = "httpretty" version = "1.0.5" @@ -251,6 +308,31 @@ files = [ {file = "httpretty-1.0.5.tar.gz", hash = "sha256:e53c927c4d3d781a0761727f1edfad64abef94e828718e12b672a678a8b3e0b5"}, ] +[[package]] +name = "httpx" +version = "0.28.1" +description = "The next generation HTTP client." +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad"}, + {file = "httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc"}, +] + +[package.dependencies] +anyio = "*" +certifi = "*" +httpcore = "==1.*" +idna = "*" + +[package.extras] +brotli = ["brotli ; platform_python_implementation == \"CPython\"", "brotlicffi ; platform_python_implementation != \"CPython\""] +cli = ["click (==8.*)", "pygments (==2.*)", "rich (>=10,<14)"] +http2 = ["h2 (>=3,<5)"] +socks = ["socksio (==1.*)"] +zstd = ["zstandard (>=0.18.0)"] + [[package]] name = "idna" version = "3.10" @@ -379,6 +461,18 @@ files = [ {file = "ruff-0.6.9.tar.gz", hash = "sha256:b076ef717a8e5bc819514ee1d602bbdca5b4420ae13a9cf61a0c0a4f53a2baa2"}, ] +[[package]] +name = "sniffio" +version = "1.3.1" +description = "Sniff out which async library your code is running under" +optional = false +python-versions = ">=3.7" +groups = ["main"] +files = [ + {file = "sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2"}, + {file = "sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"}, +] + [[package]] name = "tomli" version = "2.2.1" @@ -422,6 +516,19 @@ files = [ {file = "tomli-2.2.1.tar.gz", hash = "sha256:cd45e1dc79c835ce60f7404ec8119f2eb06d38b1deba146f07ced3bbc44505ff"}, ] +[[package]] +name = "typing-extensions" +version = "4.13.2" +description = "Backported and Experimental Type Hints for Python 3.8+" +optional = false +python-versions = ">=3.8" +groups = ["main"] +markers = "python_version < \"3.11\"" +files = [ + {file = "typing_extensions-4.13.2-py3-none-any.whl", hash = "sha256:a439e7c04b49fec3e5d3e2beaa21755cadbbdc391694e28ccdd36ca4a1408f8c"}, + {file = "typing_extensions-4.13.2.tar.gz", hash = "sha256:e6c81219bd689f51865d9e372991c540bda33a0379d5573cddb9a3a23f7caaef"}, +] + [[package]] name = "urllib3" version = "2.2.3" @@ -443,4 +550,4 @@ zstd = ["zstandard (>=0.18.0)"] [metadata] lock-version = "2.1" python-versions = ">=3.8,<3.14" -content-hash = "90a050a0b068935ce6452cab0e0fa30c93c1af7ed745896403524110ad47c69b" +content-hash = "79f4e64adc63cef19b42cfe57cb9f22c0079dc41867551eaf80e1aeb379e7363" diff --git a/pyproject.toml b/pyproject.toml index d236365..b2b4214 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,7 @@ precommit.shell = "poe format && poe lint && poe coverage" python = ">=3.8,<3.14" requests = "*" defusedxml = "^0.7.1" +httpx = "^0.28.1" [tool.poetry.group.test] optional = true diff --git a/youtube_transcript_api/__init__.py b/youtube_transcript_api/__init__.py index 8e8d726..cb6e24d 100644 --- a/youtube_transcript_api/__init__.py +++ b/youtube_transcript_api/__init__.py @@ -1,5 +1,5 @@ # ruff: noqa: F401 -from ._api import YouTubeTranscriptApi +from ._api import YouTubeTranscriptApi, YoutubeTranscriptAsyncApi from ._transcripts import ( TranscriptList, Transcript, @@ -30,6 +30,7 @@ __all__ = [ "YouTubeTranscriptApi", + "YoutubeTranscriptAsyncApi", "TranscriptList", "Transcript", "FetchedTranscript", diff --git a/youtube_transcript_api/_api.py b/youtube_transcript_api/_api.py index b294493..17ee775 100644 --- a/youtube_transcript_api/_api.py +++ b/youtube_transcript_api/_api.py @@ -1,19 +1,20 @@ -from typing import Optional, Iterable - -from requests import Session -from requests.adapters import HTTPAdapter -from urllib3 import Retry - +from typing import Optional, Iterable, List from .proxies import ProxyConfig from ._transcripts import TranscriptListFetcher, FetchedTranscript, TranscriptList +from ._transcripts_async import ( + AsyncTranscriptHandler, + BulkFetchResults, +) + +from httpx import AsyncClient, AsyncHTTPTransport class YouTubeTranscriptApi: def __init__( self, proxy_config: Optional[ProxyConfig] = None, - http_client: Optional[Session] = None, + http_client: Optional[AsyncClient] = None, ): """ Note on thread-safety: As this class will initialize a `requests.Session` @@ -29,7 +30,7 @@ def __init__( manually want to share cookies between different instances of `YouTubeTranscriptApi`, overwrite defaults, specify SSL certificates, etc. """ - http_client = Session() if http_client is None else http_client + http_client = AsyncClient(timeout=20) if http_client is None else http_client http_client.headers.update({"Accept-Language": "en-US"}) # Cookie auth has been temporarily disabled, as it is not working properly with # YouTube's most recent changes. @@ -40,12 +41,10 @@ def __init__( if proxy_config.prevent_keeping_connections_alive: http_client.headers.update({"Connection": "close"}) if proxy_config.retries_when_blocked > 0: - retry_config = Retry( - total=proxy_config.retries_when_blocked, - status_forcelist=[429], + transport = AsyncHTTPTransport( + retries=proxy_config.retries_when_blocked ) - http_client.mount("http://", HTTPAdapter(max_retries=retry_config)) - http_client.mount("https://", HTTPAdapter(max_retries=retry_config)) + http_client._transport = transport self._fetcher = TranscriptListFetcher(http_client, proxy_config=proxy_config) def fetch( @@ -70,7 +69,7 @@ def fetch( return ( self.list(video_id) .find_transcript(languages) - .fetch(preserve_formatting=preserve_formatting) + .fetch_sync(preserve_formatting=preserve_formatting) ) def list( @@ -124,4 +123,127 @@ def list( :param video_id: the ID of the video you want to retrieve the transcript for. Make sure that this is the actual ID, NOT the full URL to the video! """ - return self._fetcher.fetch(video_id) + return self._fetcher.fetch_sync(video_id) + + +class YoutubeTranscriptAsyncApi: + def __init__( + self, + proxy_config: Optional[ProxyConfig] = None, + async_client: Optional[AsyncClient] = None, + ): + async_client = AsyncClient(timeout=20) if async_client is None else async_client + async_client.headers.update({"Accept-Language": "en-US"}) + + if proxy_config is not None: + async_client.proxies = proxy_config.to_requests_dict() + if proxy_config.prevent_keeping_connections_alive: + async_client.headers.update({"Connection": "close"}) + if proxy_config.retries_when_blocked > 0: + transport = AsyncHTTPTransport( + retries=proxy_config.retries_when_blocked + ) + async_client._transport = transport + + self._fetcher = TranscriptListFetcher( + async_client, proxy_config=proxy_config + ) + self._handler = AsyncTranscriptHandler(self._fetcher, proxy_config) + + async def fetch_single( + self, + video_id: str, + languages: Iterable[str] = ("en",), + preserve_formatting: bool = False, + ) -> FetchedTranscript: + """ + Retrieves the transcript for a single video. This is just a shortcut for + calling: + `YouTubeTranscriptApi().list(video_id).find_transcript(languages).fetch(preserve_formatting=preserve_formatting)` + + :param video_id: the ID of the video you want to retrieve the transcript for. + Make sure that this is the actual ID, NOT the full URL to the video! + :param languages: A list of language codes in a descending priority. For + example, if this is set to ["de", "en"] it will first try to fetch the + german transcript (de) and then fetch the english transcript (en) if + it fails to do so. This defaults to ["en"]. + :param preserve_formatting: whether to keep select HTML text formatting + """ + + return await self._handler.fetch_single( + video_id, languages, preserve_formatting + ) + + async def fetch_all( + self, + video_ids: List[str], + languages: Iterable[str] = ("en",), + preserve_formatting: bool = False, + ) -> list[BulkFetchResults]: + """ + Asynchronously retrieves transcripts for a list of video IDs concurrently. + + :param video_ids: List of video IDs. + :param languages: List of language codes in descending priority (default: ["en"]). + :param preserve_formatting: Whether to keep HTML formatting. + :param continue_after_error: If True, skip failed video IDs and return partial results; else raise the first error. + :param log_errors: If True, collected errors will logged in console for more information. + :return: Dict of {video_id: FetchedTranscript}. + """ + transcripts = await self._handler.fetch_bulk( + video_ids, languages, preserve_formatting + ) + return transcripts + + async def list( + self, + video_id: str, + ) -> TranscriptList: + """ + Retrieves the list of transcripts which are available for a given video. It + returns a `TranscriptList` object which is iterable and provides methods to + filter the list of transcripts for specific languages. While iterating over + the `TranscriptList` the individual transcripts are represented by + `Transcript` objects, which provide metadata and can either be fetched by + calling `transcript.fetch()` or translated by calling `transcript.translate( + 'en')`. Example: + + ``` + ytt_api = YouTubeTranscriptApi() + + # retrieve the available transcripts + transcript_list = ytt_api.list('video_id') + + # iterate over all available transcripts + for transcript in transcript_list: + # the Transcript object provides metadata properties + print( + transcript.video_id, + transcript.language, + transcript.language_code, + # whether it has been manually created or generated by YouTube + transcript.is_generated, + # a list of languages the transcript can be translated to + transcript.translation_languages, + ) + + # fetch the actual transcript data + print(transcript.fetch()) + + # translating the transcript will return another transcript object + print(transcript.translate('en').fetch()) + + # you can also directly filter for the language you are looking for, using the transcript list + transcript = transcript_list.find_transcript(['de', 'en']) + + # or just filter for manually created transcripts + transcript = transcript_list.find_manually_created_transcript(['de', 'en']) + + # or automatically generated ones + transcript = transcript_list.find_generated_transcript(['de', 'en']) + ``` + + :param video_id: the ID of the video you want to retrieve the transcript for. + Make sure that this is the actual ID, NOT the full URL to the video! + """ + return await self._fetcher.fetch(video_id) diff --git a/youtube_transcript_api/_transcripts.py b/youtube_transcript_api/_transcripts.py index 55baa42..3e02e7a 100644 --- a/youtube_transcript_api/_transcripts.py +++ b/youtube_transcript_api/_transcripts.py @@ -8,8 +8,10 @@ from defusedxml import ElementTree import re +import asyncio from requests import HTTPError, Session, Response +from httpx import AsyncClient from .proxies import ProxyConfig from ._settings import WATCH_URL, INNERTUBE_CONTEXT, INNERTUBE_API_URL @@ -103,7 +105,7 @@ def _raise_http_errors(response: Response, video_id: str) -> Response: class Transcript: def __init__( self, - http_client: Session, + http_client: AsyncClient, video_id: str, url: str, language: str, @@ -127,14 +129,14 @@ def __init__( for translation_language in translation_languages } - def fetch(self, preserve_formatting: bool = False) -> FetchedTranscript: + async def fetch(self, preserve_formatting: bool = False) -> FetchedTranscript: """ Loads the actual transcript data. :param preserve_formatting: whether to keep select HTML text formatting """ if "&exp=xpe" in self._url: raise PoTokenRequired(self.video_id) - response = self._http_client.get(self._url) + response = await self._http_client.get(self._url) snippets = _TranscriptParser(preserve_formatting=preserve_formatting).parse( _raise_http_errors(response, self.video_id).text, ) @@ -145,6 +147,10 @@ def fetch(self, preserve_formatting: bool = False) -> FetchedTranscript: language_code=self.language_code, is_generated=self.is_generated, ) + + def fetch_sync(self, preserve_formatting: bool = False) -> FetchedTranscript: + loop = asyncio.get_event_loop() + return loop.run_until_complete(self.fetch(preserve_formatting=preserve_formatting)) def __str__(self) -> str: return '{language_code} ("{language}"){translation_description}'.format( @@ -345,22 +351,26 @@ def _get_language_description(self, transcript_strings: Iterable[str]) -> str: class TranscriptListFetcher: - def __init__(self, http_client: Session, proxy_config: Optional[ProxyConfig]): + def __init__(self, http_client: AsyncClient, proxy_config: Optional[ProxyConfig]): self._http_client = http_client self._proxy_config = proxy_config - def fetch(self, video_id: str) -> TranscriptList: + async def fetch(self, video_id: str) -> TranscriptList: return TranscriptList.build( self._http_client, video_id, - self._fetch_captions_json(video_id), + await self._fetch_captions_json(video_id), ) + + def fetch_sync(self, video_id: str) -> TranscriptList: + loop = asyncio.get_event_loop() + return loop.run_until_complete(self.fetch(video_id)) - def _fetch_captions_json(self, video_id: str, try_number: int = 0) -> Dict: + async def _fetch_captions_json(self, video_id: str, try_number: int = 0) -> Dict: try: - html = self._fetch_video_html(video_id) + html = await self._fetch_video_html(video_id) api_key = self._extract_innertube_api_key(html, video_id) - innertube_data = self._fetch_innertube_data(video_id, api_key) + innertube_data = await self._fetch_innertube_data(video_id, api_key) return self._extract_captions_json(innertube_data, video_id) except RequestBlocked as exception: retries = ( @@ -429,21 +439,22 @@ def _create_consent_cookie(self, html: str, video_id: str) -> None: "CONSENT", "YES+" + match.group(1), domain=".youtube.com" ) - def _fetch_video_html(self, video_id: str) -> str: - html = self._fetch_html(video_id) + async def _fetch_video_html(self, video_id: str) -> str: + html = await self._fetch_html(video_id) if 'action="https://consent.youtube.com/s"' in html: self._create_consent_cookie(html, video_id) - html = self._fetch_html(video_id) + html = await self._fetch_html(video_id) if 'action="https://consent.youtube.com/s"' in html: raise FailedToCreateConsentCookie(video_id) return html - def _fetch_html(self, video_id: str) -> str: - response = self._http_client.get(WATCH_URL.format(video_id=video_id)) + async def _fetch_html(self, video_id: str) -> str: + + response = await self._http_client.get(WATCH_URL.format(video_id=video_id)) return unescape(_raise_http_errors(response, video_id).text) - def _fetch_innertube_data(self, video_id: str, api_key: str) -> Dict: - response = self._http_client.post( + async def _fetch_innertube_data(self, video_id: str, api_key: str) -> Dict: + response = await self._http_client.post( INNERTUBE_API_URL.format(api_key=api_key), json={ "context": INNERTUBE_CONTEXT, diff --git a/youtube_transcript_api/_transcripts_async.py b/youtube_transcript_api/_transcripts_async.py new file mode 100644 index 0000000..7774c46 --- /dev/null +++ b/youtube_transcript_api/_transcripts_async.py @@ -0,0 +1,141 @@ +from typing import List, Dict, Iterable, Optional, Union, Any +from dataclasses import dataclass, asdict +from .proxies import ProxyConfig +from ._transcripts import ( + FetchedTranscript, + TranscriptListFetcher, +) + +import asyncio + +@dataclass +class BulkFetchResults: + video_id: str + result: Union[FetchedTranscript, Dict[str, Any]] + + def to_raw_data(self): + return asdict(self) + + +class AsyncTranscriptHandler: + """ + An asynchronous handler for fetching YouTube transcripts concurrently. + + This class provides high-level methods for fetching transcripts for one + or more YouTube videos while handling concurrency limits, exceptions, + and optional proxy configuration. + + Features: + - Concurrency limiting with an asyncio.Semaphore. + - Fetching single or multiple transcripts concurrently. + - Built-in error handling with structured exception serialization. + - Proxy configuration support (optional). + + Attributes: + _fetcher (TranscriptListFetcherAsync): + The transcript fetcher responsible for retrieving transcript lists. + _proxy_config (Optional[ProxyConfig]): + Proxy configuration used when making requests. + _semaphore (asyncio.Semaphore): + Semaphore to limit the number of concurrent requests. + + Example: + >>> handler = AsyncTranscriptHandler(fetcher, max_concurrent=5) + >>> results = await handler.fetch_bulk( + ... ["video_id_1", "video_id_2"], + ... ) + >>> for r in results: + ... print(r.video_id, r.result) + + Notes: + - `fetch_bulk` will always return a list of results in the same order + as the provided `video_ids`. + - If an exception occurs during fetching, the exception is captured + and serialized into a dictionary with `type` and `message`. + """ + def __init__( + self, + fetcher: TranscriptListFetcher, + proxy_config: Optional[ProxyConfig] = None, + max_concurrent: int = 10, + ): + self._fetcher = fetcher + self._proxy_config = proxy_config + self._semaphore = asyncio.Semaphore(max_concurrent) + + async def fetch_single( + self, + video_id: str, + languages: Iterable[str] = ("en",), + preserve_formatting: bool = False, + ) -> FetchedTranscript: + """Fetch transcript for a single video""" + async with self._semaphore: + transcript_list = await self._fetcher.fetch(video_id) + transcript = transcript_list.find_transcript(languages) + return await transcript.fetch(preserve_formatting=preserve_formatting) + + async def fetch_bulk( + self, + video_ids: List[str], + languages: Iterable[str] = ("en",), + preserve_formatting: bool = False, + ) -> List[BulkFetchResults]: + """Fetch transcripts for multiple videos concurrently with error handling. + Args: + video_ids: List of YouTube video IDs. + languages: Languages to try in order. + preserve_formatting: Whether to preserve original transcript formatting. + + Returns: + A list of FetchResult objects, one per video_id. + """ + + async def _safe_fetch(video_id: str) -> Union[FetchedTranscript, Exception]: + try: + return await self.fetch_single( + video_id, + languages=languages, + preserve_formatting=preserve_formatting, + ) + except Exception as e: + return e + + tasks = [_safe_fetch(video_id) for video_id in video_ids] + results = await asyncio.gather(*tasks, return_exceptions=True) + + return self._process_bulk_results(video_ids, results) + + def _serialize_exception(self, exc: BaseException) -> Dict[str, Any]: + """Convert exception to serializable dict""" + return { + "type": exc.__class__.__name__, + "message": str(exc), + **getattr(exc, "__dict__", {}), + } + + def _process_bulk_results( + self, + video_ids: List[str], + results: List[Union[FetchedTranscript, Exception]], + ) -> List[BulkFetchResults]: + """Process bulk fetch results with error handling""" + processed_results = [] + + for video_id, result in zip(video_ids, results): + if isinstance(result, Exception): + processed_results.append( + BulkFetchResults( + video_id=video_id, + result=self._serialize_exception(result) + ) + ) + else: + processed_results.append( + BulkFetchResults( + video_id=video_id, + result=result + ) + ) + + return processed_results