Skip to content
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
5dadba0
Add subscriber timeout
pblazej Apr 29, 2025
e7b0fea
Send only to agents
pblazej Apr 30, 2025
5230985
Idempotence?
pblazej Apr 30, 2025
1198a88
Don't set participant attr
pblazej Apr 30, 2025
5801fa0
WIP: Pass recorder's track
pblazej Apr 30, 2025
d4ab705
WIP: Pass trackId
pblazej May 5, 2025
7377a4b
WIP: Move timeout, handle empty data
pblazej May 5, 2025
9c67eeb
Proto v1.37.1
pblazej May 5, 2025
c0124c1
Track prop
pblazej May 5, 2025
2bc4f73
Size
pblazej May 5, 2025
39d1569
Move methods and override mic state
pblazej May 5, 2025
21dcfcb
Old Swift, log
pblazej May 5, 2025
7d566b0
Move agent checks
pblazej May 5, 2025
39002d3
Remove attr test
pblazej May 5, 2025
ec18881
Move Room, Sendable
pblazej May 5, 2025
1dba385
Expose stop
pblazej May 5, 2025
f8f2bf3
Check track features
pblazej May 6, 2025
9e1f56b
Alternative API
pblazej May 6, 2025
a4d0b2a
Log duration
pblazej May 6, 2025
2a9bd37
Cmt
pblazej May 6, 2025
c27b7bd
Move preConnect to publish options
pblazej May 6, 2025
9e26dbe
Inject recorder
pblazej May 6, 2025
05674b9
Enable only when recording
pblazej May 8, 2025
a121762
Don't persist track across recordings
pblazej May 9, 2025
5d939e0
FIx publish options set/send order
pblazej May 9, 2025
4bb7100
Total size
pblazej May 9, 2025
8e4cfa9
WIP: Move from track subscription to active participant state
pblazej May 9, 2025
8f47930
Room cmts
pblazej May 12, 2025
8f06f40
Handle no recorder case
pblazej May 12, 2025
bbb8c08
Handle conversion fails
pblazej May 12, 2025
cbd7dde
Move timeout
pblazej May 12, 2025
d090956
Cancel timeout
pblazej May 12, 2025
a8d0c3f
Expose participant state
pblazej May 13, 2025
333fb14
Bring back RoomDelegate
pblazej May 13, 2025
14b811b
Fix test
pblazej May 13, 2025
51ceff1
Cmt
pblazej May 13, 2025
960b04c
Use subscription to stop audio
pblazej May 13, 2025
b958cf1
Stop
pblazej May 13, 2025
0585068
Constants
pblazej May 13, 2025
a65ac15
Timeout comments
pblazej May 13, 2025
6351d70
Fix error handler
pblazej May 13, 2025
6101c52
Revert "Stop"
pblazej May 13, 2025
90b1b29
Simplify
pblazej May 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 103 additions & 59 deletions Sources/LiveKit/Core/PreConnectAudioBuffer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,63 +17,93 @@
import AVFAudio
import Foundation

/// A buffer that captures audio before connecting to the server,
/// and sends it on certain ``RoomDelegate`` events.
/// A buffer that captures audio before connecting to the server.
@objc
public final class PreConnectAudioBuffer: NSObject, Loggable {
/// The default participant attribute key used to indicate that the audio buffer is active.
@objc
public static let attributeKey = "lk.agent.pre-connect-audio"
public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable {
public typealias OnError = @Sendable (Error) -> Void

public enum Constants {
public static let maxSize = 10 * 1024 * 1024 // 10MB
public static let sampleRate = 24000
public static let timeout: TimeInterval = 10
}

/// The default data topic used to send the audio buffer.
@objc
public static let dataTopic = "lk.agent.pre-connect-audio-buffer"

/// The room instance to listen for events.
/// The room instance to send the audio buffer to.
@objc
public let room: Room?
public var room: Room? { state.room }

/// The audio recorder instance.
@objc
public let recorder: LocalAudioTrackRecorder
public var recorder: LocalAudioTrackRecorder? { state.recorder }

private let state = StateSync<State>(State())
private struct State {
weak var room: Room?
var recorder: LocalAudioTrackRecorder?
var audioStream: LocalAudioTrackRecorder.Stream?
var timeoutTask: Task<Void, Error>?
var sent: Bool = false
var onError: OnError? = nil
}

/// Initialize the audio buffer with a room instance.
/// - Parameters:
/// - room: The room instance to listen for events.
/// - recorder: The audio recorder to use for capturing.
/// - room: The room instance to send the audio buffer to.
/// - onError: The error handler to call when an error occurs while sending the audio buffer.
@objc
public init(room: Room?,
recorder: LocalAudioTrackRecorder = LocalAudioTrackRecorder(
track: LocalAudioTrack.createTrack(),
format: .pcmFormatInt16, // supported by agent plugins
sampleRate: 24000, // supported by agent plugins
maxSize: 10 * 1024 * 1024 // arbitrary max recording size of 10MB
))
{
self.room = room
self.recorder = recorder
public init(room: Room?, onError: OnError? = nil) {
state.mutate {
$0.room = room
$0.onError = onError
}
super.init()
}

deinit {
stopRecording()
room?.remove(delegate: self)
}

/// Start capturing audio and listening to ``RoomDelegate`` events.
@objc
public func startRecording() async throws {
public func setErrorHandler(_ onError: OnError?) {
state.mutate { $0.onError = onError }
}

/// Start capturing audio.
/// - Parameters:
/// - timeout: The timeout for the remote participant to subscribe to the audio track.
/// The room connection needs to be established and the remote participant needs to subscribe to the audio track
/// before the timeout is reached. Otherwise, the audio stream will be flushed without sending.
/// - recorder: Optional custom recorder instance. If not provided, a new one will be created.
@objc
public func startRecording(timeout: TimeInterval = Constants.timeout, recorder: LocalAudioTrackRecorder? = nil) async throws {
room?.add(delegate: self)

let stream = try await recorder.start()
let roomOptions = room?._state.roomOptions
let newRecorder = recorder ?? LocalAudioTrackRecorder(
track: LocalAudioTrack.createTrack(options: roomOptions?.defaultAudioCaptureOptions,
reportStatistics: roomOptions?.reportRemoteTrackStatistics ?? false),
format: .pcmFormatInt16,
sampleRate: Constants.sampleRate,
maxSize: Constants.maxSize
)

let stream = try await newRecorder.start()
log("Started capturing audio", .info)

state.timeoutTask?.cancel()
state.mutate { state in
state.recorder = newRecorder
state.audioStream = stream
state.timeoutTask = Task { [weak self] in
try await Task.sleep(nanoseconds: UInt64(timeout) * NSEC_PER_SEC)
try Task.checkCancellation()
self?.stopRecording(flush: true)
}
state.sent = false
}
}

Expand All @@ -82,66 +112,80 @@ public final class PreConnectAudioBuffer: NSObject, Loggable {
/// - flush: If `true`, the audio stream will be flushed immediately without sending.
@objc
public func stopRecording(flush: Bool = false) {
guard let recorder, recorder.isRecording else { return }

recorder.stop()
log("Stopped capturing audio", .info)

if flush, let stream = state.audioStream {
log("Flushing audio stream", .info)
Task {
for await _ in stream {}
}
}
}
}

// MARK: - RoomDelegate

extension PreConnectAudioBuffer: RoomDelegate {
public func roomDidConnect(_ room: Room) {
Task {
try? await setParticipantAttribute(room: room)
room?.remove(delegate: self)
}
}

public func room(_ room: Room, participant _: LocalParticipant, remoteDidSubscribeTrack _: LocalTrackPublication) {
stopRecording()
Task {
try? await sendAudioData(to: room)
}
}

/// Set the participant attribute to indicate that the audio buffer is active.
/// - Parameters:
/// - key: The key to set the attribute.
/// - room: The room instance to set the attribute.
@objc
public func setParticipantAttribute(key _: String = attributeKey, room: Room) async throws {
var attributes = room.localParticipant.attributes
attributes[Self.attributeKey] = "true"
try await room.localParticipant.set(attributes: attributes)
log("Set participant attribute", .info)
}

/// Send the audio data to the room.
/// - Parameters:
/// - room: The room instance to send the audio data.
/// - agents: The agents to send the audio data to.
/// - topic: The topic to send the audio data.
@objc
public func sendAudioData(to room: Room, on topic: String = dataTopic) async throws {
public func sendAudioData(to room: Room, agents: [Participant.Identity], on topic: String = dataTopic) async throws {
guard !agents.isEmpty else { return }

guard !state.sent else { return }
state.mutate { $0.sent = true }

guard let recorder else {
throw LiveKitError(.invalidState, message: "Recorder is nil")
}

guard let audioStream = state.audioStream else {
throw LiveKitError(.invalidState, message: "Audio stream is nil")
}

let audioData = try await audioStream.collect()
guard audioData.count > 1024 else {
throw LiveKitError(.unknown, message: "Audio data size too small, nothing to send")
}

let streamOptions = StreamByteOptions(
topic: topic,
attributes: [
"sampleRate": "\(recorder.sampleRate)",
"channels": "\(recorder.channels)",
]
"trackId": recorder.track.sid?.stringValue ?? "",
],
destinationIdentities: agents,
totalSize: audioData.count
)
let writer = try await room.localParticipant.streamBytes(options: streamOptions)
try await writer.write(audioStream.collect())
try await writer.write(audioData)
try await writer.close()
log("Sent audio data", .info)
log("Sent \(recorder.duration(audioData.count))s = \(audioData.count / 1024)KB of audio data to \(agents.count) agent(s) \(agents)", .info)
}
}

extension PreConnectAudioBuffer: RoomDelegate {
public func room(_: Room, participant _: LocalParticipant, remoteDidSubscribeTrack _: LocalTrackPublication) {
log("Subscribed by remote participant, stopping audio", .info)
stopRecording()
}

room.remove(delegate: self)
public func room(_ room: Room, participant: Participant, didUpdateState state: ParticipantState) {
guard participant.kind == .agent, state == .active, let agent = participant.identity else { return }
log("Detected active agent participant: \(agent), sending audio", .info)
stopRecording()

Task {
do {
try await sendAudioData(to: room, agents: [agent])
} catch {
log("Unable to send preconnect audio: \(error)", .error)
self.state.onError?(error)
}
}
}
}
52 changes: 45 additions & 7 deletions Sources/LiveKit/Core/Room+PreConnect.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,51 @@
import Foundation

public extension Room {
/// Start capturing audio before connecting to the server,
/// so that it's not lost when the connection is established.
/// It will be automatically sent via data stream to the other participant
/// using the `PreConnectAudioBuffer.dataTopic` when the local track is subscribed.
/// Starts a pre-connect audio sequence that will automatically be cleaned up
/// when the operation fails.
///
/// - Parameters:
/// - timeout: The timeout for the remote participant to subscribe to the audio track.
/// The room connection needs to be established and the remote participant needs to subscribe to the audio track
/// before the timeout is reached. Otherwise, the audio stream will be flushed without sending.
/// - operation: The operation to perform while audio is being captured.
/// - onError: The error handler to call when an error occurs while sending the audio buffer.
/// - Returns: The result of the operation.
///
/// - Example:
/// ```swift
/// try await room.withPreConnectAudio {
/// // Audio is being captured automatically
/// // Perform any other (async) setup here
/// guard let connectionDetails = try await tokenService.fetchConnectionDetails(roomName: roomName, participantName: participantName) else {
/// return
/// }
/// try await room.connect(url: connectionDetails.serverUrl, token: connectionDetails.participantToken)
/// } onError: { error in
/// print("Error sending audio buffer: \(error)")
/// }
/// ```
///
/// - See: ``PreConnectAudioBuffer``
/// - Note: Use ``AudioManager/setRecordingAlwaysPreparedMode(_:)`` to request microphone permissions early.
func startCapturingBeforeConnecting() async throws {
try await preConnectBuffer.startRecording()
/// - Important: Call ``AudioManager/setRecordingAlwaysPreparedMode(_:)`` during app launch sequence to request microphone permissions early.
///
func withPreConnectAudio<T>(timeout: TimeInterval = 10,
_ operation: @Sendable @escaping () async throws -> T,
onError: PreConnectAudioBuffer.OnError? = nil) async throws -> T
{
preConnectBuffer.setErrorHandler(onError)
try await preConnectBuffer.startRecording(timeout: timeout)

do {
return try await operation()
} catch {
preConnectBuffer.stopRecording(flush: true)
throw error
}
}

@available(*, deprecated, message: "Use withPreConnectAudio instead")
func startCapturingBeforeConnecting(timeout: TimeInterval = 10) async throws {
try await preConnectBuffer.startRecording(timeout: timeout)
}
}
30 changes: 18 additions & 12 deletions Sources/LiveKit/Core/Room.swift
Original file line number Diff line number Diff line change
Expand Up @@ -354,20 +354,30 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable {
let enableMicrophone = _state.connectOptions.enableMicrophone
log("Concurrent enable microphone mode: \(enableMicrophone)")

let createMicrophoneTrackTask: Task<LocalTrack, any Error>? = enableMicrophone ? Task {
let localTrack = LocalAudioTrack.createTrack(options: _state.roomOptions.defaultAudioCaptureOptions,
reportStatistics: _state.roomOptions.reportRemoteTrackStatistics)
// Initializes AudioDeviceModule's recording
try await localTrack.start()
return localTrack
} : nil
let createMicrophoneTrackTask: Task<LocalTrack, any Error>? = {
if let recorder = preConnectBuffer.recorder, recorder.isRecording {
return Task {
recorder.track
}
} else if enableMicrophone {
return Task {
let localTrack = LocalAudioTrack.createTrack(options: _state.roomOptions.defaultAudioCaptureOptions,
reportStatistics: _state.roomOptions.reportRemoteTrackStatistics)
// Initializes AudioDeviceModule's recording
try await localTrack.start()
return localTrack
}
} else {
return nil
}
}()

do {
try await fullConnectSequence(url, token)

if let createMicrophoneTrackTask, !createMicrophoneTrackTask.isCancelled {
let track = try await createMicrophoneTrackTask.value
try await localParticipant._publish(track: track)
try await localParticipant._publish(track: track, options: _state.roomOptions.defaultAudioPublishOptions.withPreconnect(preConnectBuffer.recorder?.isRecording ?? false))
}

// Connect sequence successful
Expand Down Expand Up @@ -437,10 +447,6 @@ extension Room {
e2eeManager.cleanUp()
}

if disconnectError != nil {
preConnectBuffer.stopRecording(flush: true)
}

// Reset state
_state.mutate {
// if isFullReconnect, keep connection related states
Expand Down
13 changes: 13 additions & 0 deletions Sources/LiveKit/Extensions/AVAudioPCMBuffer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,16 @@ public extension AVAudioPCMBuffer {
}
}
}

extension AVAudioCommonFormat {
var bytesPerSample: Int {
switch self {
case .pcmFormatInt16: return 2
case .pcmFormatInt32: return 4
case .pcmFormatFloat32: return 4
case .pcmFormatFloat64: return 8
case .otherFormat: return 0
@unknown default: return 0
}
}
}
Loading
Loading