Skip to content

Commit cbdf85c

Browse files
mralephCommit Queue
authored andcommitted
[vm] Create perf_witness package
This package provides universal observability functionality for Dart SDK CLI tooling. Any tool can opt-in into observability by starting PerfWitnessServer. This will create a control socket in a fixed location which can be then discovered by perf_witness recorder. TEST=pkg/perf_witness/test Change-Id: I698617a66fed42c6629c348d53964dae3f2148df Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/413683 Reviewed-by: Martin Kustermann <[email protected]> Commit-Queue: Slava Egorov <[email protected]>
1 parent 95df731 commit cbdf85c

File tree

14 files changed

+1636
-0
lines changed

14 files changed

+1636
-0
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
include: package:lints/recommended.yaml
2+
3+
analyzer:
4+
exclude:
5+
- lib/src/assets/**
6+
7+
linter:
8+
rules:
9+
- directives_ordering
10+
- prefer_final_locals
11+
- sort_pub_dependencies

pkg/perf_witness/bin/recorder.dart

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
import 'package:perf_witness/recorder.dart' as recorder;
6+
7+
Future<void> main(List<String> args) async {
8+
await recorder.record(recorder.PerfWitnessRecorderConfig.fromArgs(args));
9+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
// Simple binary which continuously does some busy work and generates
6+
// timeline events.
7+
8+
import 'dart:async';
9+
import 'dart:developer';
10+
11+
import 'package:perf_witness/server.dart';
12+
import 'package:perf_witness/src/async_span.dart';
13+
14+
int fib(int i) {
15+
if (i < 2) return 1;
16+
return fib(i - 1) + fib(i - 2);
17+
}
18+
19+
Future<void> task(int id) async {
20+
await AsyncSpan.run('task#$id', () async {
21+
for (var i = 0; i < 10; i++) {
22+
Timeline.timeSync('fib', () {
23+
final sw = Stopwatch()..start();
24+
while (sw.elapsedMilliseconds < 100) {
25+
fib(10);
26+
}
27+
});
28+
await Future.delayed(Duration(milliseconds: 100));
29+
}
30+
});
31+
}
32+
33+
void main() async {
34+
await PerfWitnessServer.start();
35+
var id = 0;
36+
while (true) {
37+
await AsyncSpan.run('task-group-${id ~/ 2}', () async {
38+
await Future.wait([task(id++), task(id++)]);
39+
await Future.delayed(Duration(milliseconds: 100));
40+
});
41+
}
42+
}

pkg/perf_witness/lib/recorder.dart

Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
import 'dart:async';
6+
import 'dart:developer';
7+
import 'dart:io' as io;
8+
9+
import 'package:args/args.dart';
10+
import 'package:path/path.dart' as p;
11+
12+
import 'src/common.dart';
13+
import 'src/json_rpc.dart';
14+
import 'src/process_info.dart';
15+
16+
class PerfWitnessRecorderConfig {
17+
final String? outputDir;
18+
final String? tag;
19+
final bool recordNewProcesses;
20+
final bool enableAsyncSpans;
21+
final bool enableProfiler;
22+
final List<String> streams;
23+
24+
PerfWitnessRecorderConfig({
25+
this.outputDir,
26+
this.tag,
27+
this.recordNewProcesses = false,
28+
this.enableAsyncSpans = false,
29+
this.enableProfiler = true,
30+
this.streams = const [],
31+
});
32+
33+
factory PerfWitnessRecorderConfig.fromParsedArgs(ArgResults args) {
34+
var streams = args['streams'] as List<String>;
35+
if (streams.contains('all')) {
36+
streams = TimelineStream.values.map((s) => s.name).toList();
37+
}
38+
return PerfWitnessRecorderConfig(
39+
outputDir: args['output-dir'] as String?,
40+
tag: args['tag'] as String?,
41+
recordNewProcesses: args['record-new-processes'] as bool,
42+
enableAsyncSpans: args['enable-async-spans'] as bool,
43+
enableProfiler: args['enable-profiler'] as bool,
44+
streams: streams,
45+
);
46+
}
47+
48+
factory PerfWitnessRecorderConfig.fromArgs(List<String> args) {
49+
final parsedArgs = configureArgParser().parse(args);
50+
return PerfWitnessRecorderConfig.fromParsedArgs(parsedArgs);
51+
}
52+
53+
static ArgParser configureArgParser([ArgParser? parser]) {
54+
return (parser ?? ArgParser())
55+
..addOption('output-dir', abbr: 'o')
56+
..addOption('tag', help: 'Tag to filter processes by.')
57+
..addFlag(
58+
'record-new-processes',
59+
help: 'Record processes that start after the recorder.',
60+
negatable: false,
61+
)
62+
..addFlag(
63+
'enable-async-spans',
64+
help: 'Enable async spans.',
65+
negatable: false,
66+
)
67+
..addFlag(
68+
'enable-profiler',
69+
help: 'Enable profiler.',
70+
negatable: true,
71+
defaultsTo: true,
72+
)
73+
..addMultiOption(
74+
'streams',
75+
help: 'Streams to record.',
76+
allowed: [...TimelineStream.values.map((s) => s.name), 'all'],
77+
defaultsTo: [TimelineStream.gc.name, TimelineStream.dart.name],
78+
);
79+
}
80+
}
81+
82+
Future<void> record(PerfWitnessRecorderConfig config) async {
83+
final io.Directory outputDir;
84+
if (config.outputDir case final String outputDirPath) {
85+
outputDir = io.Directory(outputDirPath);
86+
} else {
87+
outputDir = io.Directory.systemTemp.createTempSync('recording');
88+
}
89+
90+
final sockets = getAllControlSockets();
91+
final connections = (await Future.wait([
92+
for (var s in sockets) Connection._tryConnectTo(s.socketPath),
93+
])).nonNulls.toList(growable: false);
94+
95+
print('Found ${connections.length} processes:');
96+
for (final c in connections) {
97+
print(' ${c.info}');
98+
}
99+
100+
final matchedConnections = _closeNotMatching(connections, config.tag);
101+
if (config.tag != null) {
102+
print('Tag ${config.tag} matched ${matchedConnections.length} processes.');
103+
}
104+
105+
print('... data will be written to $outputDir');
106+
107+
final sw = Stopwatch()..start();
108+
await Future.wait([
109+
for (var conn in matchedConnections)
110+
conn.startRecording(outputDir.path, config: config),
111+
]);
112+
113+
bool recording = true;
114+
115+
JsonRpcServer? newProcessServer;
116+
if (config.recordNewProcesses) {
117+
if (recorderSocketPath case final path?) {
118+
if (io.FileSystemEntity.typeSync(path) ==
119+
io.FileSystemEntityType.unixDomainSock) {
120+
print(
121+
'Warning: Control socket $path already exists '
122+
'(another recorder might be running).',
123+
);
124+
} else {
125+
newProcessServer = JsonRpcServer(await UnixDomainSocket.bind(path), {
126+
'process.announce': (requestor, params) async {
127+
if (!recording) {
128+
return null;
129+
}
130+
131+
final info = ProcessInfo.fromJson(params as Map<String, Object?>);
132+
print('New process announced: $info');
133+
if (config.tag == null || info.tag == config.tag) {
134+
try {
135+
final conn = Connection._(info, requestor);
136+
matchedConnections.add(conn);
137+
await conn.startRecording(outputDir.path, config: config);
138+
} catch (e) {
139+
print('Failed to start recording: $e');
140+
}
141+
}
142+
return null;
143+
},
144+
});
145+
print('Listening for new processes on $path');
146+
}
147+
} else {
148+
print(
149+
'Warning: Unable to listen for new processes '
150+
'(path to the control socket is null).',
151+
);
152+
}
153+
}
154+
155+
if (matchedConnections.isNotEmpty || config.recordNewProcesses) {
156+
await io.ProcessSignal.sigint.watch().first;
157+
recording = false;
158+
await Future.wait([
159+
for (var conn in matchedConnections)
160+
conn.stopRecording().catchError((e) {
161+
print('Failed to stop recording of process ${conn.info.pid}: $e');
162+
}),
163+
]);
164+
print('Recorded for ${sw.elapsed}');
165+
}
166+
167+
for (final conn in matchedConnections) {
168+
conn.disconnect();
169+
}
170+
await newProcessServer?.close();
171+
}
172+
173+
class Connection {
174+
final ProcessInfo info;
175+
final JsonRpcPeer _endpoint;
176+
177+
Connection._(this.info, this._endpoint);
178+
179+
Future<void> startRecording(
180+
String outputDir, {
181+
required PerfWitnessRecorderConfig config,
182+
}) async {
183+
await _endpoint.sendRequest('timeline.streamTo', {
184+
'recorder': 'perfetto',
185+
'path': p.join(outputDir, '${info.pid}.timeline'),
186+
'enableProfiler': config.enableProfiler,
187+
'enableAsyncSpans': config.enableAsyncSpans,
188+
'streams': config.streams,
189+
});
190+
}
191+
192+
Future<void> stopRecording() async {
193+
await _endpoint.sendRequest('timeline.stopStreaming');
194+
}
195+
196+
void disconnect() async {
197+
try {
198+
await _endpoint.close();
199+
} catch (_) {
200+
// Ignore exceptions
201+
}
202+
}
203+
204+
static Future<Connection> connectTo(String controlSocketPath) async {
205+
final client = jsonRpcPeerFromSocket(
206+
await UnixDomainSocket.connect(controlSocketPath),
207+
);
208+
final info = ProcessInfo.fromJson(
209+
await client.sendRequest('process.getInfo') as Map<String, Object?>,
210+
);
211+
return Connection._(info, client);
212+
}
213+
214+
static Future<Connection?> _tryConnectTo(io.File controlSocket) async {
215+
try {
216+
return await Connection.connectTo(controlSocket.path);
217+
} catch (_) {
218+
try {
219+
controlSocket.deleteSync(); // Likely stale file. Purge it.
220+
} catch (_) {}
221+
return null;
222+
}
223+
}
224+
}
225+
226+
List<Connection> _closeNotMatching(List<Connection> v, String? tag) {
227+
if (tag == null) {
228+
return v.toList(growable: true);
229+
}
230+
231+
final open = <Connection>[];
232+
for (final c in v) {
233+
if (c.info.tag == tag) {
234+
open.add(c);
235+
continue;
236+
}
237+
c.disconnect();
238+
}
239+
return open;
240+
}

0 commit comments

Comments
 (0)