diff --git a/tests/async/conftest.py b/tests/async/conftest.py
index 65a963507..a007d55ac 100644
--- a/tests/async/conftest.py
+++ b/tests/async/conftest.py
@@ -13,19 +13,25 @@
# limitations under the License.
import asyncio
+from contextlib import asynccontextmanager
+from pathlib import Path
from typing import Any, AsyncGenerator, Awaitable, Callable, Dict, Generator
import pytest
+from playwright._impl._driver import compute_driver_executable
from playwright.async_api import (
Browser,
BrowserContext,
BrowserType,
+ FrameLocator,
+ Locator,
Page,
Playwright,
Selectors,
async_playwright,
)
+from tests.server import HTTPServer
from .utils import Utils
from .utils import utils as utils_object
@@ -131,3 +137,77 @@ async def page(context: BrowserContext) -> AsyncGenerator[Page, None]:
@pytest.fixture(scope="session")
def selectors(playwright: Playwright) -> Selectors:
return playwright.selectors
+
+
+class TraceViewerPage:
+ def __init__(self, page: Page):
+ self.page = page
+
+ @property
+ def actions_tree(self) -> Locator:
+ return self.page.get_by_test_id("actions-tree")
+
+ @property
+ def action_titles(self) -> Locator:
+ return self.page.locator(".action-title")
+
+ @property
+ def stack_frames(self) -> Locator:
+ return self.page.get_by_test_id("stack-trace-list").locator(".list-view-entry")
+
+ async def select_action(self, title: str, ordinal: int = 0) -> None:
+ await self.page.locator(f'.action-title:has-text("{title}")').nth(
+ ordinal
+ ).click()
+
+ async def select_snapshot(self, name: str) -> None:
+ await self.page.click(
+ f'.snapshot-tab .tabbed-pane-tab-label:has-text("{name}")'
+ )
+
+ async def snapshot_frame(
+ self, action_name: str, ordinal: int = 0, has_subframe: bool = False
+ ) -> FrameLocator:
+ await self.select_action(action_name, ordinal)
+ expected_frames = 4 if has_subframe else 3
+ while len(self.page.frames) < expected_frames:
+ await self.page.wait_for_event("frameattached")
+ return self.page.frame_locator("iframe.snapshot-visible[name=snapshot]")
+
+ async def show_source_tab(self) -> None:
+ await self.page.click("text='Source'")
+
+ async def expand_action(self, title: str, ordinal: int = 0) -> None:
+ await self.actions_tree.locator(".tree-view-entry", has_text=title).nth(
+ ordinal
+ ).locator(".codicon-chevron-right").click()
+
+
+@pytest.fixture
+async def show_trace_viewer(browser: Browser) -> AsyncGenerator[Callable, None]:
+ """Fixture that provides a function to show trace viewer for a trace file."""
+
+ @asynccontextmanager
+ async def _show_trace_viewer(
+ trace_path: Path,
+ ) -> AsyncGenerator[TraceViewerPage, None]:
+ trace_viewer_path = (
+ Path(compute_driver_executable()[0]) / "../package/lib/vite/traceViewer"
+ ).resolve()
+
+ server = HTTPServer()
+ server.start(trace_viewer_path)
+ server.set_route("/trace.zip", lambda request: request.serve_file(trace_path))
+
+ page = await browser.new_page()
+
+ try:
+ await page.goto(
+ f"{server.PREFIX}/index.html?trace={server.PREFIX}/trace.zip"
+ )
+ yield TraceViewerPage(page)
+ finally:
+ await page.close()
+ server.stop()
+
+ yield _show_trace_viewer
diff --git a/tests/async/test_browsertype_connect.py b/tests/async/test_browsertype_connect.py
index 8295a6960..d2eca4628 100644
--- a/tests/async/test_browsertype_connect.py
+++ b/tests/async/test_browsertype_connect.py
@@ -16,14 +16,16 @@
import os
import re
from pathlib import Path
-from typing import Callable
+from typing import AsyncContextManager, Callable
import pytest
-from playwright.async_api import BrowserType, Error, Playwright, Route
+from playwright.async_api import BrowserType, Error, Playwright, Route, expect
from tests.conftest import RemoteServer
from tests.server import Server, TestServerRequest, WebSocketProtocol
-from tests.utils import chromium_version_less_than, parse_trace
+from tests.utils import chromium_version_less_than
+
+from .conftest import TraceViewerPage
async def test_should_print_custom_ws_close_error(
@@ -325,6 +327,7 @@ async def test_should_record_trace_with_source(
server: Server,
tmp_path: Path,
browser_type: BrowserType,
+ show_trace_viewer: Callable[[Path], AsyncContextManager[TraceViewerPage]],
) -> None:
remote = launch_server()
browser = await browser_type.connect(remote.ws_endpoint)
@@ -341,14 +344,28 @@ async def test_should_record_trace_with_source(
await context.close()
await browser.close()
- (resources, events) = parse_trace(path)
- current_file_content = Path(__file__).read_bytes()
- found_current_file = False
- for name, resource in resources.items():
- if resource == current_file_content:
- found_current_file = True
- break
- assert found_current_file
+ async with show_trace_viewer(path) as trace_viewer:
+ await expect(trace_viewer.action_titles).to_have_text(
+ [
+ re.compile("Page.goto"),
+ re.compile("Page.set_content"),
+ re.compile("Page.click"),
+ ]
+ )
+ await trace_viewer.show_source_tab()
+ await expect(trace_viewer.stack_frames).to_contain_text(
+ [
+ re.compile(r"test_should_record_trace_with_source"),
+ ]
+ )
+ await trace_viewer.select_action("Page.set_content")
+ # Check that the source file is shown
+ await expect(
+ trace_viewer.page.locator(".source-tab-file-name")
+ ).to_have_attribute("title", re.compile(r".*test_browsertype_connect\.py"))
+ await expect(trace_viewer.page.locator(".source-line-running")).to_contain_text(
+ 'page.set_content("")'
+ )
async def test_should_record_trace_with_relative_trace_path(
diff --git a/tests/async/test_tracing.py b/tests/async/test_tracing.py
index 6b0c557f2..270bbfb80 100644
--- a/tests/async/test_tracing.py
+++ b/tests/async/test_tracing.py
@@ -15,11 +15,19 @@
import asyncio
import re
from pathlib import Path
-from typing import Dict, List
-
-from playwright.async_api import Browser, BrowserContext, BrowserType, Page, Response
+from typing import AsyncContextManager, Callable
+
+from playwright.async_api import (
+ Browser,
+ BrowserContext,
+ BrowserType,
+ Page,
+ Response,
+ expect,
+)
from tests.server import Server
-from tests.utils import get_trace_actions, parse_trace
+
+from .conftest import TraceViewerPage
async def test_browser_context_output_trace(
@@ -41,7 +49,7 @@ async def test_start_stop(browser: Browser) -> None:
async def test_browser_context_should_not_throw_when_stopping_without_start_but_not_exporting(
- context: BrowserContext, server: Server, tmp_path: Path
+ context: BrowserContext,
) -> None:
await context.tracing.stop()
@@ -67,27 +75,60 @@ async def test_browser_context_output_trace_chunk(
async def test_should_collect_sources(
- context: BrowserContext, page: Page, server: Server, tmp_path: Path
+ context: BrowserContext,
+ page: Page,
+ server: Server,
+ tmp_path: Path,
+ show_trace_viewer: Callable[[Path], AsyncContextManager[TraceViewerPage]],
) -> None:
await context.tracing.start(sources=True)
await page.goto(server.EMPTY_PAGE)
await page.set_content("")
- await page.click("button")
+
+ async def my_method_outer() -> None:
+ async def my_method_inner() -> None:
+ await page.get_by_text("Click").click()
+
+ await my_method_inner()
+
+ await my_method_outer()
path = tmp_path / "trace.zip"
await context.tracing.stop(path=path)
- (resources, events) = parse_trace(path)
- current_file_content = Path(__file__).read_bytes()
- found_current_file = False
- for name, resource in resources.items():
- if resource == current_file_content:
- found_current_file = True
- break
- assert found_current_file
+ async with show_trace_viewer(path) as trace_viewer:
+ await expect(trace_viewer.action_titles).to_have_text(
+ [
+ re.compile(r"Page.goto"),
+ re.compile(r"Page.set_content"),
+ re.compile(r"Locator.click"),
+ ]
+ )
+ await trace_viewer.show_source_tab()
+ # Check that stack frames are shown (they might be anonymous in Python)
+ await expect(trace_viewer.stack_frames).to_contain_text(
+ [
+ re.compile(r"my_method_inner"),
+ re.compile(r"my_method_outer"),
+ re.compile(r"test_should_collect_sources"),
+ ]
+ )
+
+ await trace_viewer.select_action("Page.set_content")
+ # Check that the source file is shown
+ await expect(
+ trace_viewer.page.locator(".source-tab-file-name")
+ ).to_have_attribute("title", re.compile(r".*test_.*\.py"))
+ await expect(trace_viewer.page.locator(".source-line-running")).to_contain_text(
+ 'page.set_content("")'
+ )
async def test_should_collect_trace_with_resources_but_no_js(
- context: BrowserContext, page: Page, server: Server, tmp_path: Path
+ context: BrowserContext,
+ page: Page,
+ server: Server,
+ tmp_path: Path,
+ show_trace_viewer: Callable[[Path], AsyncContextManager[TraceViewerPage]],
) -> None:
await context.tracing.start(screenshots=True, snapshots=True)
await page.goto(server.PREFIX + "/frames/frame.html")
@@ -108,46 +149,37 @@ async def test_should_collect_trace_with_resources_but_no_js(
trace_file_path = tmp_path / "trace.zip"
await context.tracing.stop(path=trace_file_path)
- (_, events) = parse_trace(trace_file_path)
- assert events[0]["type"] == "context-options"
- assert get_trace_actions(events) == [
- "Page.goto",
- "Page.set_content",
- "Page.click",
- "Mouse.move",
- "Mouse.dblclick",
- "Keyboard.insert_text",
- "Page.wait_for_timeout",
- "Page.route",
- "Page.goto",
- "Page.goto",
- "Page.close",
- ]
-
- assert len(list(filter(lambda e: e["type"] == "frame-snapshot", events))) >= 1
- assert len(list(filter(lambda e: e["type"] == "screencast-frame", events))) >= 1
- style = list(
- filter(
- lambda e: e["type"] == "resource-snapshot"
- and e["snapshot"]["request"]["url"].endswith("style.css"),
- events,
- )
- )[0]
- assert style
- assert style["snapshot"]["response"]["content"]["_sha1"]
- script = list(
- filter(
- lambda e: e["type"] == "resource-snapshot"
- and e["snapshot"]["request"]["url"].endswith("script.js"),
- events,
+ async with show_trace_viewer(trace_file_path) as trace_viewer:
+ await expect(trace_viewer.action_titles).to_have_text(
+ [
+ re.compile("Page.goto"),
+ re.compile("Page.set_content"),
+ re.compile("Page.click"),
+ re.compile("Mouse.move"),
+ re.compile("Mouse.dblclick"),
+ re.compile("Keyboard.insert_text"),
+ re.compile("Page.wait_for_timeout"),
+ re.compile("Page.route"),
+ re.compile("Page.goto"),
+ re.compile("Page.goto"),
+ re.compile("Page.close"),
+ ]
)
- )[0]
- assert script
- assert script["snapshot"]["response"]["content"].get("_sha1") is None
+
+ await trace_viewer.select_action("Page.set_content")
+ await expect(
+ trace_viewer.page.locator(".browser-frame-address-bar")
+ ).to_have_text(server.PREFIX + "/frames/frame.html")
+ frame = await trace_viewer.snapshot_frame("Page.set_content", 0, False)
+ await expect(frame.locator("button")).to_have_text("Click")
async def test_should_correctly_determine_sync_apiname(
- context: BrowserContext, page: Page, server: Server, tmp_path: Path
+ context: BrowserContext,
+ page: Page,
+ server: Server,
+ tmp_path: Path,
+ show_trace_viewer: Callable,
) -> None:
await context.tracing.start(screenshots=True, snapshots=True)
@@ -165,16 +197,21 @@ async def _handle_response(response: Response) -> None:
trace_file_path = tmp_path / "trace.zip"
await context.tracing.stop(path=trace_file_path)
- (_, events) = parse_trace(trace_file_path)
- assert events[0]["type"] == "context-options"
- assert get_trace_actions(events) == [
- "Page.goto",
- "Page.close",
- ]
+ async with show_trace_viewer(trace_file_path) as trace_viewer:
+ await expect(trace_viewer.action_titles).to_have_text(
+ [
+ re.compile(r"Page.goto"),
+ re.compile(r"Page.close"),
+ ]
+ )
async def test_should_collect_two_traces(
- context: BrowserContext, page: Page, server: Server, tmp_path: Path
+ context: BrowserContext,
+ page: Page,
+ server: Server,
+ tmp_path: Path,
+ show_trace_viewer: Callable[[Path], AsyncContextManager[TraceViewerPage]],
) -> None:
await context.tracing.start(screenshots=True, snapshots=True)
await page.goto(server.EMPTY_PAGE)
@@ -189,27 +226,30 @@ async def test_should_collect_two_traces(
tracing2_path = tmp_path / "trace2.zip"
await context.tracing.stop(path=tracing2_path)
- (_, events) = parse_trace(tracing1_path)
- assert events[0]["type"] == "context-options"
- assert get_trace_actions(events) == [
- "Page.goto",
- "Page.set_content",
- "Page.click",
- ]
-
- (_, events) = parse_trace(tracing2_path)
- assert events[0]["type"] == "context-options"
- assert get_trace_actions(events) == ["Page.dblclick", "Page.close"]
-
+ async with show_trace_viewer(tracing1_path) as trace_viewer:
+ await expect(trace_viewer.action_titles).to_have_text(
+ [
+ re.compile("Page.goto"),
+ re.compile("Page.set_content"),
+ re.compile("Page.click"),
+ ]
+ )
-async def test_should_not_throw_when_stopping_without_start_but_not_exporting(
- context: BrowserContext,
-) -> None:
- await context.tracing.stop()
+ async with show_trace_viewer(tracing2_path) as trace_viewer:
+ await expect(trace_viewer.action_titles).to_have_text(
+ [
+ re.compile(r"Page.dblclick"),
+ re.compile(r"Page.close"),
+ ]
+ )
async def test_should_work_with_playwright_context_managers(
- context: BrowserContext, page: Page, server: Server, tmp_path: Path
+ context: BrowserContext,
+ page: Page,
+ server: Server,
+ tmp_path: Path,
+ show_trace_viewer: Callable[[Path], AsyncContextManager[TraceViewerPage]],
) -> None:
await context.tracing.start(screenshots=True, snapshots=True)
await page.goto(server.EMPTY_PAGE)
@@ -224,21 +264,26 @@ async def test_should_work_with_playwright_context_managers(
trace_file_path = tmp_path / "trace.zip"
await context.tracing.stop(path=trace_file_path)
- (_, events) = parse_trace(trace_file_path)
- assert events[0]["type"] == "context-options"
- assert get_trace_actions(events) == [
- "Page.goto",
- "Page.set_content",
- "Page.expect_console_message",
- "Page.evaluate",
- "Page.click",
- "Page.expect_popup",
- "Page.evaluate",
- ]
+ async with show_trace_viewer(trace_file_path) as trace_viewer:
+ await expect(trace_viewer.action_titles).to_have_text(
+ [
+ re.compile("Page.goto"),
+ re.compile("Page.set_content"),
+ re.compile("Page.expect_console_message"),
+ re.compile("Page.evaluate"),
+ re.compile("Page.click"),
+ re.compile("Page.expect_popup"),
+ re.compile("Page.evaluate"),
+ ]
+ )
async def test_should_display_wait_for_load_state_even_if_did_not_wait_for_it(
- context: BrowserContext, page: Page, server: Server, tmp_path: Path
+ context: BrowserContext,
+ page: Page,
+ server: Server,
+ tmp_path: Path,
+ show_trace_viewer: Callable[[Path], AsyncContextManager[TraceViewerPage]],
) -> None:
await context.tracing.start(screenshots=True, snapshots=True)
@@ -249,19 +294,22 @@ async def test_should_display_wait_for_load_state_even_if_did_not_wait_for_it(
trace_file_path = tmp_path / "trace.zip"
await context.tracing.stop(path=trace_file_path)
- (_, events) = parse_trace(trace_file_path)
- assert get_trace_actions(events) == [
- "Page.goto",
- "Page.wait_for_load_state",
- "Page.wait_for_load_state",
- ]
+ async with show_trace_viewer(trace_file_path) as trace_viewer:
+ await expect(trace_viewer.action_titles).to_have_text(
+ [
+ re.compile(r"Page.goto"),
+ re.compile(r"Page.wait_for_load_state"),
+ re.compile(r"Page.wait_for_load_state"),
+ ]
+ )
async def test_should_respect_traces_dir_and_name(
browser_type: BrowserType,
server: Server,
tmp_path: Path,
- launch_arguments: Dict,
+ launch_arguments: dict,
+ show_trace_viewer: Callable[[Path], AsyncContextManager[TraceViewerPage]],
) -> None:
traces_dir = tmp_path / "traces"
browser = await browser_type.launch(traces_dir=traces_dir, **launch_arguments)
@@ -282,38 +330,35 @@ async def test_should_respect_traces_dir_and_name(
await browser.close()
- def resource_names(resources: Dict[str, bytes]) -> List[str]:
- return sorted(
+ async with show_trace_viewer(tmp_path / "trace1.zip") as trace_viewer:
+ await expect(trace_viewer.action_titles).to_have_text(
[
- re.sub(r"^resources/.*\.(html|css)$", r"resources/XXX.\g<1>", file)
- for file in resources.keys()
+ re.compile(r"Page.goto"),
]
)
+ frame = await trace_viewer.snapshot_frame("Page.goto", 0, False)
+ await expect(frame.locator("body")).to_have_css(
+ "background-color", "rgb(255, 192, 203)"
+ )
+ await expect(frame.locator("body")).to_have_text("hello, world!")
- (resources, events) = parse_trace(tmp_path / "trace1.zip")
- assert get_trace_actions(events) == ["Page.goto"]
- assert resource_names(resources) == [
- "resources/XXX.css",
- "resources/XXX.html",
- "trace.network",
- "trace.stacks",
- "trace.trace",
- ]
-
- (resources, events) = parse_trace(tmp_path / "trace2.zip")
- assert get_trace_actions(events) == ["Page.goto"]
- assert resource_names(resources) == [
- "resources/XXX.css",
- "resources/XXX.html",
- "resources/XXX.html",
- "trace.network",
- "trace.stacks",
- "trace.trace",
- ]
+ async with show_trace_viewer(tmp_path / "trace2.zip") as trace_viewer:
+ await expect(trace_viewer.action_titles).to_have_text(
+ [
+ re.compile(r"Page.goto"),
+ ]
+ )
+ frame = await trace_viewer.snapshot_frame("Page.goto", 0, False)
+ await expect(frame.locator("body")).to_have_css(
+ "background-color", "rgb(255, 192, 203)"
+ )
+ await expect(frame.locator("body")).to_have_text("hello, world!")
async def test_should_show_tracing_group_in_action_list(
- context: BrowserContext, tmp_path: Path
+ context: BrowserContext,
+ tmp_path: Path,
+ show_trace_viewer: Callable[[Path], AsyncContextManager[TraceViewerPage]],
) -> None:
await context.tracing.start()
page = await context.new_page()
@@ -331,15 +376,16 @@ async def test_should_show_tracing_group_in_action_list(
trace_path = tmp_path / "trace.zip"
await context.tracing.stop(path=trace_path)
- (resources, events) = parse_trace(trace_path)
- actions = get_trace_actions(events)
-
- assert actions == [
- "BrowserContext.new_page",
- "outer group",
- "Page.goto",
- "inner group 1",
- "Locator.click",
- "inner group 2",
- "Locator.is_visible",
- ]
+ async with show_trace_viewer(trace_path) as trace_viewer:
+ await trace_viewer.expand_action("inner group 1")
+ await expect(trace_viewer.action_titles).to_have_text(
+ [
+ "BrowserContext.new_page",
+ "outer group",
+ re.compile("Page.goto"),
+ "inner group 1",
+ re.compile("Locator.click"),
+ "inner group 2",
+ re.compile("Locator.is_visible"),
+ ]
+ )
diff --git a/tests/conftest.py b/tests/conftest.py
index 15505c30c..2b533f15f 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -57,10 +57,12 @@ def headless(pytestconfig: pytest.Config) -> bool:
@pytest.fixture(scope="session")
def launch_arguments(pytestconfig: pytest.Config, headless: bool) -> Dict:
- return {
+ args: Dict = {
"headless": headless,
- "channel": pytestconfig.getoption("--browser-channel"),
}
+ if pytestconfig.getoption("--browser-channel"):
+ args["channel"] = pytestconfig.getoption("--browser-channel")
+ return args
@pytest.fixture
diff --git a/tests/server.py b/tests/server.py
index cc8145317..d69176950 100644
--- a/tests/server.py
+++ b/tests/server.py
@@ -17,6 +17,7 @@
import contextlib
import gzip
import mimetypes
+import pathlib
import socket
import threading
from contextlib import closing
@@ -85,8 +86,7 @@ def process(self) -> None:
self.content.seek(0, 0)
else:
self.post_body = None
- uri = urlparse(self.uri.decode())
- path = uri.path
+ path = urlparse(self.uri.decode()).path
request_subscriber = server.request_subscribers.get(path)
if request_subscriber:
@@ -118,15 +118,23 @@ def process(self) -> None:
if server.routes.get(path):
server.routes[path](self)
return
+
+ self._serve_file(server.static_path / path[1:], path)
+
+ def serve_file(self, path: pathlib.Path) -> None:
+ return self._serve_file(path, urlparse(self.uri.decode()).path)
+
+ def _serve_file(self, path: pathlib.Path, request_path: str) -> None:
+ server = self.channel.factory.server_instance
file_content = None
try:
- file_content = (server.static_path / path[1:]).read_bytes()
+ file_content = path.read_bytes()
content_type = mimetypes.guess_type(path)[0]
if content_type and content_type.startswith("text/"):
content_type += "; charset=utf-8"
self.setHeader(b"Content-Type", content_type)
self.setHeader(b"Cache-Control", "no-cache, no-store")
- if path in server.gzip_routes:
+ if request_path in server.gzip_routes:
self.setHeader("Content-Encoding", "gzip")
self.write(gzip.compress(file_content))
else:
@@ -173,7 +181,7 @@ def __repr__(self) -> str:
def listen(self, factory: TestServerFactory) -> None:
pass
- def start(self) -> None:
+ def start(self, static_path: pathlib.Path = _dirname / "assets") -> None:
request_subscribers: Dict[str, asyncio.Future] = {}
auth: Dict[str, Tuple[str, str]] = {}
csp: Dict[str, str] = {}
@@ -185,7 +193,7 @@ def start(self) -> None:
self.routes = routes
self._ws_handlers: List[Callable[["WebSocketProtocol"], None]] = []
self.gzip_routes = gzip_routes
- self.static_path = _dirname / "assets"
+ self.static_path = static_path
factory = TestServerFactory()
factory.server_instance = self
@@ -276,17 +284,34 @@ def once_web_socket_connection(
class HTTPServer(Server):
+ def __init__(self) -> None:
+ self._listeners: list[Any] = []
+ super().__init__()
+
def listen(self, factory: http.HTTPFactory) -> None:
- reactor.listenTCP(self.PORT, factory, interface="127.0.0.1")
+ self._listeners.append(
+ reactor.listenTCP(self.PORT, factory, interface="127.0.0.1")
+ )
try:
- reactor.listenTCP(self.PORT, factory, interface="::1")
+ self._listeners.append(
+ reactor.listenTCP(self.PORT, factory, interface="::1")
+ )
except Exception:
pass
+ def stop(self) -> None:
+ for listener in self._listeners:
+ listener.stopListening()
+ self._listeners.clear()
+
class HTTPSServer(Server):
protocol = "https"
+ def __init__(self) -> None:
+ self._listeners: list[Any] = []
+ super().__init__()
+
def listen(self, factory: http.HTTPFactory) -> None:
cert = ssl.PrivateCertificate.fromCertificateAndKeyPair(
ssl.Certificate.loadPEM(
@@ -297,12 +322,21 @@ def listen(self, factory: http.HTTPFactory) -> None:
),
)
contextFactory = cert.options()
- reactor.listenSSL(self.PORT, factory, contextFactory, interface="127.0.0.1")
+ self._listeners.append(
+ reactor.listenSSL(self.PORT, factory, contextFactory, interface="127.0.0.1")
+ )
try:
- reactor.listenSSL(self.PORT, factory, contextFactory, interface="::1")
+ self._listeners.append(
+ reactor.listenSSL(self.PORT, factory, contextFactory, interface="::1")
+ )
except Exception:
pass
+ def stop(self) -> None:
+ for listener in self._listeners:
+ listener.stopListening()
+ self._listeners.clear()
+
class WebSocketProtocol(WebSocketServerProtocol):
def __init__(self, *args: Any, **kwargs: Any) -> None:
diff --git a/tests/sync/conftest.py b/tests/sync/conftest.py
index b825ca2fe..46bf86239 100644
--- a/tests/sync/conftest.py
+++ b/tests/sync/conftest.py
@@ -14,20 +14,26 @@
import asyncio
+from contextlib import contextmanager
+from pathlib import Path
from typing import Any, Callable, Dict, Generator, List
import pytest
from greenlet import greenlet
+from playwright._impl._driver import compute_driver_executable
from playwright.sync_api import (
Browser,
BrowserContext,
BrowserType,
+ FrameLocator,
+ Locator,
Page,
Playwright,
Selectors,
sync_playwright,
)
+from tests.server import HTTPServer
from .utils import Utils
from .utils import utils as utils_object
@@ -121,3 +127,71 @@ async def task() -> None:
return list(map(lambda action: results[action], actions))
yield _sync_gather_impl
+
+
+class TraceViewerPage:
+ def __init__(self, page: Page):
+ self.page = page
+
+ @property
+ def actions_tree(self) -> Locator:
+ return self.page.get_by_test_id("actions-tree")
+
+ @property
+ def action_titles(self) -> Locator:
+ return self.page.locator(".action-title")
+
+ @property
+ def stack_frames(self) -> Locator:
+ return self.page.get_by_test_id("stack-trace-list").locator(".list-view-entry")
+
+ def select_action(self, title: str, ordinal: int = 0) -> None:
+ self.page.locator(f'.action-title:has-text("{title}")').nth(ordinal).click()
+
+ def select_snapshot(self, name: str) -> None:
+ self.page.click(f'.snapshot-tab .tabbed-pane-tab-label:has-text("{name}")')
+
+ def snapshot_frame(
+ self, action_name: str, ordinal: int = 0, has_subframe: bool = False
+ ) -> FrameLocator:
+ self.select_action(action_name, ordinal)
+ expected_frames = 4 if has_subframe else 3
+ while len(self.page.frames) < expected_frames:
+ self.page.wait_for_event("frameattached")
+ return self.page.frame_locator("iframe.snapshot-visible[name=snapshot]")
+
+ def show_source_tab(self) -> None:
+ self.page.click("text='Source'")
+
+ def expand_action(self, title: str, ordinal: int = 0) -> None:
+ self.actions_tree.locator(".tree-view-entry", has_text=title).nth(
+ ordinal
+ ).locator(".codicon-chevron-right").click()
+
+
+@pytest.fixture
+def show_trace_viewer(browser: Browser) -> Generator[Callable, None, None]:
+ """Fixture that provides a function to show trace viewer for a trace file."""
+
+ @contextmanager
+ def _show_trace_viewer(
+ trace_path: Path,
+ ) -> Generator[TraceViewerPage, None, None]:
+ trace_viewer_path = (
+ Path(compute_driver_executable()[0]) / "../package/lib/vite/traceViewer"
+ ).resolve()
+
+ server = HTTPServer()
+ server.start(trace_viewer_path)
+ server.set_route("/trace.zip", lambda request: request.serve_file(trace_path))
+
+ page = browser.new_page()
+
+ try:
+ page.goto(f"{server.PREFIX}/index.html?trace={server.PREFIX}/trace.zip")
+ yield TraceViewerPage(page)
+ finally:
+ page.close()
+ server.stop()
+
+ yield _show_trace_viewer
diff --git a/tests/sync/test_tracing.py b/tests/sync/test_tracing.py
index cf08ac0c6..43e875b16 100644
--- a/tests/sync/test_tracing.py
+++ b/tests/sync/test_tracing.py
@@ -15,11 +15,19 @@
import re
import threading
from pathlib import Path
-from typing import Any, Dict, List
-
-from playwright.sync_api import Browser, BrowserContext, BrowserType, Page, Response
+from typing import Callable, ContextManager
+
+from playwright.sync_api import (
+ Browser,
+ BrowserContext,
+ BrowserType,
+ Page,
+ Response,
+ expect,
+)
from tests.server import Server
-from tests.utils import get_trace_actions, parse_trace
+
+from .conftest import TraceViewerPage
def test_browser_context_output_trace(
@@ -33,6 +41,13 @@ def test_browser_context_output_trace(
assert Path(tmp_path / "trace.zip").exists()
+def test_start_stop(browser: Browser) -> None:
+ context = browser.new_context()
+ context.tracing.start()
+ context.tracing.stop()
+ context.close()
+
+
def test_browser_context_should_not_throw_when_stopping_without_start_but_not_exporting(
context: BrowserContext,
) -> None:
@@ -60,27 +75,60 @@ def test_browser_context_output_trace_chunk(
def test_should_collect_sources(
- context: BrowserContext, page: Page, server: Server, tmp_path: Path
+ context: BrowserContext,
+ page: Page,
+ server: Server,
+ tmp_path: Path,
+ show_trace_viewer: Callable[[Path], ContextManager[TraceViewerPage]],
) -> None:
context.tracing.start(sources=True)
page.goto(server.EMPTY_PAGE)
page.set_content("")
- page.click("button")
+
+ def my_method_outer() -> None:
+ def my_method_inner() -> None:
+ page.get_by_text("Click").click()
+
+ my_method_inner()
+
+ my_method_outer()
path = tmp_path / "trace.zip"
context.tracing.stop(path=path)
- (resources, events) = parse_trace(path)
- current_file_content = Path(__file__).read_bytes()
- found_current_file = False
- for name, resource in resources.items():
- if resource == current_file_content:
- found_current_file = True
- break
- assert found_current_file
+ with show_trace_viewer(path) as trace_viewer:
+ expect(trace_viewer.action_titles).to_have_text(
+ [
+ re.compile(r"Page.goto"),
+ re.compile(r"Page.set_content"),
+ re.compile(r"Locator.click"),
+ ]
+ )
+ trace_viewer.show_source_tab()
+ # Check that stack frames are shown (they might be anonymous in Python)
+ expect(trace_viewer.stack_frames).to_contain_text(
+ [
+ re.compile(r"my_method_inner"),
+ re.compile(r"my_method_outer"),
+ re.compile(r"test_should_collect_sources"),
+ ]
+ )
+
+ trace_viewer.select_action("Page.set_content")
+ # Check that the source file is shown
+ expect(trace_viewer.page.locator(".source-tab-file-name")).to_have_attribute(
+ "title", re.compile(r".*test_.*\.py")
+ )
+ expect(trace_viewer.page.locator(".source-line-running")).to_contain_text(
+ 'page.set_content("")'
+ )
def test_should_collect_trace_with_resources_but_no_js(
- context: BrowserContext, page: Page, server: Server, tmp_path: Path
+ context: BrowserContext,
+ page: Page,
+ server: Server,
+ tmp_path: Path,
+ show_trace_viewer: Callable[[Path], ContextManager[TraceViewerPage]],
) -> None:
context.tracing.start(screenshots=True, snapshots=True)
page.goto(server.PREFIX + "/frames/frame.html")
@@ -101,48 +149,40 @@ def test_should_collect_trace_with_resources_but_no_js(
trace_file_path = tmp_path / "trace.zip"
context.tracing.stop(path=trace_file_path)
- (_, events) = parse_trace(trace_file_path)
- assert events[0]["type"] == "context-options"
- assert get_trace_actions(events) == [
- "Page.goto",
- "Page.set_content",
- "Page.click",
- "Mouse.move",
- "Mouse.dblclick",
- "Keyboard.insert_text",
- "Page.wait_for_timeout",
- "Page.route",
- "Page.goto",
- "Page.goto",
- "Page.close",
- ]
-
- assert len(list(filter(lambda e: e["type"] == "frame-snapshot", events))) >= 1
- assert len(list(filter(lambda e: e["type"] == "screencast-frame", events))) >= 1
- style = list(
- filter(
- lambda e: e["type"] == "resource-snapshot"
- and e["snapshot"]["request"]["url"].endswith("style.css"),
- events,
+ with show_trace_viewer(trace_file_path) as trace_viewer:
+ expect(trace_viewer.action_titles).to_have_text(
+ [
+ re.compile("Page.goto"),
+ re.compile("Page.set_content"),
+ re.compile("Page.click"),
+ re.compile("Mouse.move"),
+ re.compile("Mouse.dblclick"),
+ re.compile("Keyboard.insert_text"),
+ re.compile("Page.wait_for_timeout"),
+ re.compile("Page.route"),
+ re.compile("Page.goto"),
+ re.compile("Page.goto"),
+ re.compile("Page.close"),
+ ]
)
- )[0]
- assert style
- assert style["snapshot"]["response"]["content"]["_sha1"]
- script = list(
- filter(
- lambda e: e["type"] == "resource-snapshot"
- and e["snapshot"]["request"]["url"].endswith("script.js"),
- events,
+
+ trace_viewer.select_action("Page.set_content")
+ expect(trace_viewer.page.locator(".browser-frame-address-bar")).to_have_text(
+ server.PREFIX + "/frames/frame.html"
)
- )[0]
- assert script
- assert script["snapshot"]["response"]["content"].get("_sha1") is None
+ frame = trace_viewer.snapshot_frame("Page.set_content", 0, False)
+ expect(frame.locator("button")).to_have_text("Click")
def test_should_correctly_determine_sync_apiname(
- context: BrowserContext, page: Page, server: Server, tmp_path: Path
+ context: BrowserContext,
+ page: Page,
+ server: Server,
+ tmp_path: Path,
+ show_trace_viewer: Callable,
) -> None:
context.tracing.start(screenshots=True, snapshots=True)
+
received_response = threading.Event()
def _handle_response(response: Response) -> None:
@@ -158,16 +198,21 @@ def _handle_response(response: Response) -> None:
trace_file_path = tmp_path / "trace.zip"
context.tracing.stop(path=trace_file_path)
- (_, events) = parse_trace(trace_file_path)
- assert events[0]["type"] == "context-options"
- assert get_trace_actions(events) == [
- "Page.goto",
- "Page.close",
- ]
+ with show_trace_viewer(trace_file_path) as trace_viewer:
+ expect(trace_viewer.action_titles).to_have_text(
+ [
+ re.compile(r"Page.goto"),
+ re.compile(r"Page.close"),
+ ]
+ )
def test_should_collect_two_traces(
- context: BrowserContext, page: Page, server: Server, tmp_path: Path
+ context: BrowserContext,
+ page: Page,
+ server: Server,
+ tmp_path: Path,
+ show_trace_viewer: Callable[[Path], ContextManager[TraceViewerPage]],
) -> None:
context.tracing.start(screenshots=True, snapshots=True)
page.goto(server.EMPTY_PAGE)
@@ -182,27 +227,30 @@ def test_should_collect_two_traces(
tracing2_path = tmp_path / "trace2.zip"
context.tracing.stop(path=tracing2_path)
- (_, events) = parse_trace(tracing1_path)
- assert events[0]["type"] == "context-options"
- assert get_trace_actions(events) == [
- "Page.goto",
- "Page.set_content",
- "Page.click",
- ]
-
- (_, events) = parse_trace(tracing2_path)
- assert events[0]["type"] == "context-options"
- assert get_trace_actions(events) == ["Page.dblclick", "Page.close"]
-
+ with show_trace_viewer(tracing1_path) as trace_viewer:
+ expect(trace_viewer.action_titles).to_have_text(
+ [
+ re.compile("Page.goto"),
+ re.compile("Page.set_content"),
+ re.compile("Page.click"),
+ ]
+ )
-def test_should_not_throw_when_stopping_without_start_but_not_exporting(
- context: BrowserContext,
-) -> None:
- context.tracing.stop()
+ with show_trace_viewer(tracing2_path) as trace_viewer:
+ expect(trace_viewer.action_titles).to_have_text(
+ [
+ re.compile(r"Page.dblclick"),
+ re.compile(r"Page.close"),
+ ]
+ )
def test_should_work_with_playwright_context_managers(
- context: BrowserContext, page: Page, server: Server, tmp_path: Path
+ context: BrowserContext,
+ page: Page,
+ server: Server,
+ tmp_path: Path,
+ show_trace_viewer: Callable[[Path], ContextManager[TraceViewerPage]],
) -> None:
context.tracing.start(screenshots=True, snapshots=True)
page.goto(server.EMPTY_PAGE)
@@ -217,21 +265,26 @@ def test_should_work_with_playwright_context_managers(
trace_file_path = tmp_path / "trace.zip"
context.tracing.stop(path=trace_file_path)
- (_, events) = parse_trace(trace_file_path)
- assert events[0]["type"] == "context-options"
- assert get_trace_actions(events) == [
- "Page.goto",
- "Page.set_content",
- "Page.expect_console_message",
- "Page.evaluate",
- "Page.click",
- "Page.expect_popup",
- "Page.evaluate",
- ]
+ with show_trace_viewer(trace_file_path) as trace_viewer:
+ expect(trace_viewer.action_titles).to_have_text(
+ [
+ re.compile("Page.goto"),
+ re.compile("Page.set_content"),
+ re.compile("Page.expect_console_message"),
+ re.compile("Page.evaluate"),
+ re.compile("Page.click"),
+ re.compile("Page.expect_popup"),
+ re.compile("Page.evaluate"),
+ ]
+ )
def test_should_display_wait_for_load_state_even_if_did_not_wait_for_it(
- context: BrowserContext, page: Page, server: Server, tmp_path: Path
+ context: BrowserContext,
+ page: Page,
+ server: Server,
+ tmp_path: Path,
+ show_trace_viewer: Callable[[Path], ContextManager[TraceViewerPage]],
) -> None:
context.tracing.start(screenshots=True, snapshots=True)
@@ -242,19 +295,22 @@ def test_should_display_wait_for_load_state_even_if_did_not_wait_for_it(
trace_file_path = tmp_path / "trace.zip"
context.tracing.stop(path=trace_file_path)
- (_, events) = parse_trace(trace_file_path)
- assert get_trace_actions(events) == [
- "Page.goto",
- "Page.wait_for_load_state",
- "Page.wait_for_load_state",
- ]
+ with show_trace_viewer(trace_file_path) as trace_viewer:
+ expect(trace_viewer.action_titles).to_have_text(
+ [
+ re.compile(r"Page.goto"),
+ re.compile(r"Page.wait_for_load_state"),
+ re.compile(r"Page.wait_for_load_state"),
+ ]
+ )
def test_should_respect_traces_dir_and_name(
browser_type: BrowserType,
server: Server,
tmp_path: Path,
- launch_arguments: Any,
+ launch_arguments: dict,
+ show_trace_viewer: Callable[[Path], ContextManager[TraceViewerPage]],
) -> None:
traces_dir = tmp_path / "traces"
browser = browser_type.launch(traces_dir=traces_dir, **launch_arguments)
@@ -275,38 +331,35 @@ def test_should_respect_traces_dir_and_name(
browser.close()
- def resource_names(resources: Dict[str, bytes]) -> List[str]:
- return sorted(
+ with show_trace_viewer(tmp_path / "trace1.zip") as trace_viewer:
+ expect(trace_viewer.action_titles).to_have_text(
[
- re.sub(r"^resources/.*\.(html|css)$", r"resources/XXX.\g<1>", file)
- for file in resources.keys()
+ re.compile(r"Page.goto"),
]
)
+ frame = trace_viewer.snapshot_frame("Page.goto", 0, False)
+ expect(frame.locator("body")).to_have_css(
+ "background-color", "rgb(255, 192, 203)"
+ )
+ expect(frame.locator("body")).to_have_text("hello, world!")
- (resources, events) = parse_trace(tmp_path / "trace1.zip")
- assert get_trace_actions(events) == ["Page.goto"]
- assert resource_names(resources) == [
- "resources/XXX.css",
- "resources/XXX.html",
- "trace.network",
- "trace.stacks",
- "trace.trace",
- ]
-
- (resources, events) = parse_trace(tmp_path / "trace2.zip")
- assert get_trace_actions(events) == ["Page.goto"]
- assert resource_names(resources) == [
- "resources/XXX.css",
- "resources/XXX.html",
- "resources/XXX.html",
- "trace.network",
- "trace.stacks",
- "trace.trace",
- ]
+ with show_trace_viewer(tmp_path / "trace2.zip") as trace_viewer:
+ expect(trace_viewer.action_titles).to_have_text(
+ [
+ re.compile(r"Page.goto"),
+ ]
+ )
+ frame = trace_viewer.snapshot_frame("Page.goto", 0, False)
+ expect(frame.locator("body")).to_have_css(
+ "background-color", "rgb(255, 192, 203)"
+ )
+ expect(frame.locator("body")).to_have_text("hello, world!")
def test_should_show_tracing_group_in_action_list(
- context: BrowserContext, tmp_path: Path
+ context: BrowserContext,
+ tmp_path: Path,
+ show_trace_viewer: Callable[[Path], ContextManager[TraceViewerPage]],
) -> None:
context.tracing.start()
page = context.new_page()
@@ -324,15 +377,16 @@ def test_should_show_tracing_group_in_action_list(
trace_path = tmp_path / "trace.zip"
context.tracing.stop(path=trace_path)
- (resources, events) = parse_trace(trace_path)
- actions = get_trace_actions(events)
-
- assert actions == [
- "BrowserContext.new_page",
- "outer group",
- "Page.goto",
- "inner group 1",
- "Locator.click",
- "inner group 2",
- "Locator.is_visible",
- ]
+ with show_trace_viewer(trace_path) as trace_viewer:
+ trace_viewer.expand_action("inner group 1")
+ expect(trace_viewer.action_titles).to_have_text(
+ [
+ "BrowserContext.new_page",
+ "outer group",
+ re.compile("Page.goto"),
+ "inner group 1",
+ re.compile("Locator.click"),
+ "inner group 2",
+ re.compile("Locator.is_visible"),
+ ]
+ )
diff --git a/tests/utils.py b/tests/utils.py
index c6c10a810..6a78eefaf 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -12,50 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import json
-import zipfile
-from pathlib import Path
-from typing import Any, Dict, List, Optional, Tuple, TypeVar
-
-
-def parse_trace(path: Path) -> Tuple[Dict[str, bytes], List[Any]]:
- resources: Dict[str, bytes] = {}
- with zipfile.ZipFile(path, "r") as zip:
- for name in zip.namelist():
- resources[name] = zip.read(name)
- action_map: Dict[str, Any] = {}
- events: List[Any] = []
- for name in ["trace.trace", "trace.network"]:
- for line in resources[name].decode().splitlines():
- if not line:
- continue
- event = json.loads(line)
- if event["type"] == "before":
- event["type"] = "action"
- action_map[event["callId"]] = event
- events.append(event)
- elif event["type"] == "input":
- pass
- elif event["type"] == "after":
- existing = action_map[event["callId"]]
- existing["error"] = event.get("error", None)
- else:
- events.append(event)
- return (resources, events)
-
-
-def get_trace_actions(events: List[Any]) -> List[str]:
- action_events = sorted(
- list(
- filter(
- lambda e: e["type"] == "action",
- events,
- )
- ),
- key=lambda e: e["startTime"],
- )
- return [e["apiName"] for e in action_events]
-
+from typing import Optional, TypeVar
TARGET_CLOSED_ERROR_MESSAGE = "Target page, context or browser has been closed"