Skip to content

Commit 6e53e5a

Browse files
committed
minor adjustments
1 parent 1938168 commit 6e53e5a

File tree

6 files changed

+103
-44
lines changed

6 files changed

+103
-44
lines changed

lib/src/token_source/caching.dart

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import 'dart:async';
1616

17+
import '../support/completer_manager.dart';
1718
import 'jwt.dart';
1819
import 'token_source.dart';
1920

@@ -23,11 +24,11 @@ import 'token_source.dart';
2324
/// return `true` if the cached credentials are still valid for the given request.
2425
///
2526
/// The default validator checks JWT expiration using [isResponseExpired].
26-
typedef TokenValidator = bool Function(TokenRequestOptions? options, TokenSourceResponse response);
27+
typedef TokenValidator = bool Function(TokenRequestOptions options, TokenSourceResponse response);
2728

2829
/// A tuple containing the request options and response that were cached.
2930
class TokenStoreItem {
30-
final TokenRequestOptions? options;
31+
final TokenRequestOptions options;
3132
final TokenSourceResponse response;
3233

3334
const TokenStoreItem({
@@ -44,7 +45,7 @@ abstract class TokenStore {
4445
/// Store credentials in the store.
4546
///
4647
/// This replaces any existing cached credentials with the new ones.
47-
Future<void> store(TokenRequestOptions? options, TokenSourceResponse response);
48+
Future<void> store(TokenRequestOptions options, TokenSourceResponse response);
4849

4950
/// Retrieve the cached credentials.
5051
///
@@ -63,7 +64,7 @@ class InMemoryTokenStore implements TokenStore {
6364
TokenStoreItem? _cached;
6465

6566
@override
66-
Future<void> store(TokenRequestOptions? options, TokenSourceResponse response) async {
67+
Future<void> store(TokenRequestOptions options, TokenSourceResponse response) async {
6768
_cached = TokenStoreItem(options: options, response: response);
6869
}
6970

@@ -79,7 +80,7 @@ class InMemoryTokenStore implements TokenStore {
7980
}
8081

8182
/// Default validator that checks JWT expiration using [isResponseExpired].
82-
bool _defaultValidator(TokenRequestOptions? options, TokenSourceResponse response) {
83+
bool _defaultValidator(TokenRequestOptions options, TokenSourceResponse response) {
8384
return !isResponseExpired(response);
8485
}
8586

@@ -96,7 +97,7 @@ class CachingTokenSource implements TokenSourceConfigurable {
9697
final TokenSourceConfigurable _wrapped;
9798
final TokenStore _store;
9899
final TokenValidator _validator;
99-
Completer<TokenSourceResponse>? _fetchInProgress;
100+
final Map<TokenRequestOptions, CompleterManager<TokenSourceResponse>> _inflightRequests = {};
100101

101102
/// Initialize a caching wrapper around any token source.
102103
///
@@ -112,31 +113,32 @@ class CachingTokenSource implements TokenSourceConfigurable {
112113
_validator = validator ?? _defaultValidator;
113114

114115
@override
115-
Future<TokenSourceResponse> fetch([TokenRequestOptions? options]) async {
116-
if (_fetchInProgress != null) {
117-
return _fetchInProgress!.future;
116+
Future<TokenSourceResponse> fetch(TokenRequestOptions options) async {
117+
final existingManager = _inflightRequests[options];
118+
if (existingManager != null && existingManager.isActive) {
119+
return existingManager.future;
118120
}
119121

120-
_fetchInProgress = Completer<TokenSourceResponse>();
122+
final manager = existingManager ?? CompleterManager<TokenSourceResponse>();
123+
_inflightRequests[options] = manager;
124+
final resultFuture = manager.future;
121125

122126
try {
123-
// Check if we have a valid cached token
124127
final cached = await _store.retrieve();
125128
if (cached != null && cached.options == options && _validator(cached.options, cached.response)) {
126-
_fetchInProgress!.complete(cached.response);
127-
return cached.response;
129+
manager.complete(cached.response);
130+
return resultFuture;
128131
}
129132

130-
final requestOptions = options ?? const TokenRequestOptions();
131-
final response = await _wrapped.fetch(requestOptions);
133+
final response = await _wrapped.fetch(options);
132134
await _store.store(options, response);
133-
_fetchInProgress!.complete(response);
134-
return response;
135-
} catch (e) {
136-
_fetchInProgress!.completeError(e);
135+
manager.complete(response);
136+
return resultFuture;
137+
} catch (e, stackTrace) {
138+
manager.completeError(e, stackTrace);
137139
rethrow;
138140
} finally {
139-
_fetchInProgress = null;
141+
_inflightRequests.remove(options);
140142
}
141143
}
142144

lib/src/token_source/endpoint.dart

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import 'token_source.dart';
2525
/// - Encodes the request parameters as [TokenRequestOptions] JSON in the request body
2626
/// - Includes any custom headers specified via [headers]
2727
/// - Expects the response to be decoded as [TokenSourceResponse] JSON
28-
/// - Validates HTTP status codes (200) and throws appropriate errors for failures
28+
/// - Validates HTTP status codes (200-299) and throws appropriate errors for failures
2929
class EndpointTokenSource implements TokenSourceConfigurable {
3030
/// The URL endpoint for token generation.
3131
/// This should point to your backend service that generates LiveKit tokens.
@@ -54,21 +54,31 @@ class EndpointTokenSource implements TokenSourceConfigurable {
5454
});
5555

5656
@override
57-
Future<TokenSourceResponse> fetch([TokenRequestOptions? options]) async {
58-
final requestOptions = options ?? const TokenRequestOptions();
59-
final requestBody = jsonEncode(requestOptions.toRequest().toJson());
57+
Future<TokenSourceResponse> fetch(TokenRequestOptions options) async {
58+
final requestBody = jsonEncode(options.toRequest().toJson());
6059
final uri = Uri.parse(url);
6160
final requestHeaders = {
6261
'Content-Type': 'application/json',
6362
...headers,
6463
};
6564

6665
final httpClient = client ?? http.Client();
67-
final response = method.toUpperCase() == 'GET'
68-
? await httpClient.get(uri, headers: requestHeaders)
69-
: await httpClient.post(uri, headers: requestHeaders, body: requestBody);
66+
final shouldCloseClient = client == null;
67+
late final http.Response response;
7068

71-
if (response.statusCode != 200) {
69+
try {
70+
final request = http.Request(method, uri);
71+
request.headers.addAll(requestHeaders);
72+
request.body = requestBody;
73+
final streamedResponse = await httpClient.send(request);
74+
response = await http.Response.fromStream(streamedResponse);
75+
} finally {
76+
if (shouldCloseClient) {
77+
httpClient.close();
78+
}
79+
}
80+
81+
if (response.statusCode < 200 || response.statusCode >= 300) {
7282
throw Exception('Error generating token from endpoint $url: received ${response.statusCode} / ${response.body}');
7383
}
7484

lib/src/token_source/token_source.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,5 +175,5 @@ abstract class TokenSourceFixed {
175175
/// - [EndpointTokenSource]: For custom backend endpoints using LiveKit's JSON format
176176
/// - [CachingTokenSource]: For caching credentials (or use the `.cached()` extension method)
177177
abstract class TokenSourceConfigurable {
178-
Future<TokenSourceResponse> fetch([TokenRequestOptions? options]);
178+
Future<TokenSourceResponse> fetch(TokenRequestOptions options);
179179
}

test/token/caching_token_source_test.dart

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@ void main() {
3434
final cachingSource = CachingTokenSource(mockSource);
3535

3636
// First fetch
37-
final result1 = await cachingSource.fetch();
37+
final result1 = await cachingSource.fetch(const TokenRequestOptions());
3838
expect(fetchCount, 1);
3939

4040
// Second fetch should use cache
41-
final result2 = await cachingSource.fetch();
41+
final result2 = await cachingSource.fetch(const TokenRequestOptions());
4242
expect(fetchCount, 1);
4343
expect(result2.participantToken, result1.participantToken);
4444
});
@@ -57,11 +57,11 @@ void main() {
5757
final cachingSource = CachingTokenSource(mockSource);
5858

5959
// First fetch with expired token
60-
await cachingSource.fetch();
60+
await cachingSource.fetch(const TokenRequestOptions());
6161
expect(fetchCount, 1);
6262

6363
// Second fetch should refetch due to expiration
64-
await cachingSource.fetch();
64+
await cachingSource.fetch(const TokenRequestOptions());
6565
expect(fetchCount, 2);
6666
});
6767

@@ -190,14 +190,37 @@ void main() {
190190
final cachingSource = CachingTokenSource(mockSource);
191191

192192
// Start multiple concurrent fetches
193-
final futures = List.generate(5, (_) => cachingSource.fetch());
193+
final futures = List.generate(5, (_) => cachingSource.fetch(const TokenRequestOptions()));
194194
final results = await Future.wait(futures);
195195

196196
// Should only fetch once despite concurrent requests
197197
expect(fetchCount, 1);
198198
expect(results.every((r) => r.participantToken == results.first.participantToken), isTrue);
199199
});
200200

201+
test('concurrent fetches with different options fetch independently', () async {
202+
var fetchCount = 0;
203+
final mockSource = _MockTokenSource((options) async {
204+
fetchCount++;
205+
await Future.delayed(Duration(milliseconds: 50));
206+
final token = _generateValidToken();
207+
return TokenSourceResponse(
208+
serverUrl: 'https://test.livekit.io',
209+
participantToken: '$token-${options.roomName}',
210+
);
211+
});
212+
213+
final cachingSource = CachingTokenSource(mockSource);
214+
215+
final futureOne = cachingSource.fetch(const TokenRequestOptions(roomName: 'room-a'));
216+
final futureTwo = cachingSource.fetch(const TokenRequestOptions(roomName: 'room-b'));
217+
218+
final responses = await Future.wait([futureOne, futureTwo]);
219+
220+
expect(fetchCount, 2);
221+
expect(responses[0].participantToken == responses[1].participantToken, isFalse);
222+
});
223+
201224
test('invalidate clears cache', () async {
202225
var fetchCount = 0;
203226
final mockSource = _MockTokenSource((options) async {
@@ -211,14 +234,14 @@ void main() {
211234

212235
final cachingSource = CachingTokenSource(mockSource);
213236

214-
await cachingSource.fetch();
237+
await cachingSource.fetch(const TokenRequestOptions());
215238
expect(fetchCount, 1);
216239

217240
// Invalidate cache
218241
await cachingSource.invalidate();
219242

220243
// Should refetch after invalidation
221-
await cachingSource.fetch();
244+
await cachingSource.fetch(const TokenRequestOptions());
222245
expect(fetchCount, 2);
223246
});
224247

@@ -235,7 +258,7 @@ void main() {
235258

236259
expect(await cachingSource.cachedResponse(), isNull);
237260

238-
final response = await cachingSource.fetch();
261+
final response = await cachingSource.fetch(const TokenRequestOptions());
239262
expect(await cachingSource.cachedResponse(), isNotNull);
240263
expect((await cachingSource.cachedResponse())?.participantToken, response.participantToken);
241264

test/token/endpoint_token_source_test.dart

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ void main() {
8484
expect(agents[0]['metadata'], 'agent-metadata');
8585
});
8686

87-
test('GET endpoint', () async {
87+
test('GET endpoint with body', () async {
8888
http.Request? capturedRequest;
8989

9090
final mockClient = MockClient((request) async {
@@ -104,13 +104,37 @@ void main() {
104104
client: mockClient,
105105
);
106106

107-
final response = await source.fetch();
107+
final response = await source.fetch(const TokenRequestOptions());
108108

109109
expect(response.serverUrl, 'wss://www.example.com');
110110
expect(response.participantToken, 'token');
111111

112112
expect(capturedRequest, isNotNull);
113113
expect(capturedRequest!.method, 'GET');
114+
// Body is always sent even for GET requests
115+
expect(capturedRequest!.body, '{}');
116+
});
117+
118+
test('accepts non-200 success responses', () async {
119+
final mockClient = MockClient((request) async {
120+
return http.Response(
121+
jsonEncode({
122+
'server_url': 'wss://www.example.com',
123+
'participant_token': 'token',
124+
}),
125+
201,
126+
);
127+
});
128+
129+
final source = EndpointTokenSource(
130+
url: 'https://example.com/token',
131+
client: mockClient,
132+
);
133+
134+
final response = await source.fetch(const TokenRequestOptions());
135+
136+
expect(response.serverUrl, 'wss://www.example.com');
137+
expect(response.participantToken, 'token');
114138
});
115139

116140
test('camelCase backward compatibility', () async {
@@ -131,7 +155,7 @@ void main() {
131155
client: mockClient,
132156
);
133157

134-
final response = await source.fetch();
158+
final response = await source.fetch(const TokenRequestOptions());
135159

136160
expect(response.serverUrl, 'wss://www.example.com');
137161
expect(response.participantToken, 'token');
@@ -155,7 +179,7 @@ void main() {
155179
client: mockClient,
156180
);
157181

158-
final response = await source.fetch();
182+
final response = await source.fetch(const TokenRequestOptions());
159183

160184
expect(response.serverUrl, 'wss://www.example.com');
161185
expect(response.participantToken, 'token');
@@ -174,7 +198,7 @@ void main() {
174198
);
175199

176200
expect(
177-
() => source.fetch(),
201+
() => source.fetch(const TokenRequestOptions()),
178202
throwsA(isA<Exception>().having(
179203
(e) => e.toString(),
180204
'message',
@@ -194,7 +218,7 @@ void main() {
194218
);
195219

196220
expect(
197-
() => source.fetch(),
221+
() => source.fetch(const TokenRequestOptions()),
198222
throwsA(isA<Exception>().having(
199223
(e) => e.toString(),
200224
'message',

test/token/token_source_test.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ void main() {
160160
}
161161

162162
final source = CustomTokenSource(customFunction);
163-
final result = await source.fetch();
163+
final result = await source.fetch(const TokenRequestOptions());
164164

165165
expect(result.serverUrl, 'https://custom.livekit.io');
166166
expect(result.participantToken, 'custom-token');

0 commit comments

Comments
 (0)