Skip to content
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
5dadba0
Add subscriber timeout
pblazej Apr 29, 2025
e7b0fea
Send only to agents
pblazej Apr 30, 2025
5230985
Idempotence?
pblazej Apr 30, 2025
1198a88
Don't set participant attr
pblazej Apr 30, 2025
5801fa0
WIP: Pass recorder's track
pblazej Apr 30, 2025
d4ab705
WIP: Pass trackId
pblazej May 5, 2025
7377a4b
WIP: Move timeout, handle empty data
pblazej May 5, 2025
9c67eeb
Proto v1.37.1
pblazej May 5, 2025
c0124c1
Track prop
pblazej May 5, 2025
2bc4f73
Size
pblazej May 5, 2025
39d1569
Move methods and override mic state
pblazej May 5, 2025
21dcfcb
Old Swift, log
pblazej May 5, 2025
7d566b0
Move agent checks
pblazej May 5, 2025
39002d3
Remove attr test
pblazej May 5, 2025
ec18881
Move Room, Sendable
pblazej May 5, 2025
1dba385
Expose stop
pblazej May 5, 2025
f8f2bf3
Check track features
pblazej May 6, 2025
9e1f56b
Alternative API
pblazej May 6, 2025
a4d0b2a
Log duration
pblazej May 6, 2025
2a9bd37
Cmt
pblazej May 6, 2025
c27b7bd
Move preConnect to publish options
pblazej May 6, 2025
9e26dbe
Inject recorder
pblazej May 6, 2025
05674b9
Enable only when recording
pblazej May 8, 2025
a121762
Don't persist track across recordings
pblazej May 9, 2025
5d939e0
FIx publish options set/send order
pblazej May 9, 2025
4bb7100
Total size
pblazej May 9, 2025
8e4cfa9
WIP: Move from track subscription to active participant state
pblazej May 9, 2025
8f47930
Room cmts
pblazej May 12, 2025
8f06f40
Handle no recorder case
pblazej May 12, 2025
bbb8c08
Handle conversion fails
pblazej May 12, 2025
cbd7dde
Move timeout
pblazej May 12, 2025
d090956
Cancel timeout
pblazej May 12, 2025
a8d0c3f
Expose participant state
pblazej May 13, 2025
333fb14
Bring back RoomDelegate
pblazej May 13, 2025
14b811b
Fix test
pblazej May 13, 2025
51ceff1
Cmt
pblazej May 13, 2025
960b04c
Use subscription to stop audio
pblazej May 13, 2025
b958cf1
Stop
pblazej May 13, 2025
0585068
Constants
pblazej May 13, 2025
a65ac15
Timeout comments
pblazej May 13, 2025
6351d70
Fix error handler
pblazej May 13, 2025
6101c52
Revert "Stop"
pblazej May 13, 2025
90b1b29
Simplify
pblazej May 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 55 additions & 42 deletions Sources/LiveKit/Core/PreConnectAudioBuffer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,43 +20,43 @@ import Foundation
/// A buffer that captures audio before connecting to the server,
/// and sends it on certain ``RoomDelegate`` events.
@objc
public final class PreConnectAudioBuffer: NSObject, Loggable {
/// The default participant attribute key used to indicate that the audio buffer is active.
@objc
public static let attributeKey = "lk.agent.pre-connect-audio"

public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable {
/// The default data topic used to send the audio buffer.
@objc
public static let dataTopic = "lk.agent.pre-connect-audio-buffer"

/// The room instance to listen for events.
@objc
public let room: Room?
public var room: Room? { state.room }

/// The audio recorder instance.
@objc
public let recorder: LocalAudioTrackRecorder

private let state = StateSync<State>(State())
private struct State {
weak var room: Room?
var audioStream: LocalAudioTrackRecorder.Stream?
var timeout: TimeInterval = 10
}

/// Initialize the audio buffer with a room instance.
/// - Parameters:
/// - room: The room instance to listen for events.
/// - recorder: The audio recorder to use for capturing.
/// - recorder: The audio recorder instance to use.
@objc
public init(room: Room?,
recorder: LocalAudioTrackRecorder = LocalAudioTrackRecorder(
track: LocalAudioTrack.createTrack(),
format: .pcmFormatInt16, // supported by agent plugins
sampleRate: 24000, // supported by agent plugins
maxSize: 10 * 1024 * 1024 // arbitrary max recording size of 10MB
))
{
self.room = room
self.recorder = recorder
public init(room: Room?, recorder: LocalAudioTrackRecorder? = nil) {
state.mutate { $0.room = room }

let roomOptions = room?._state.roomOptions
self.recorder = recorder ?? LocalAudioTrackRecorder(
track: LocalAudioTrack.createTrack(options: roomOptions?.defaultAudioCaptureOptions,
reportStatistics: roomOptions?.reportRemoteTrackStatistics ?? false),
format: .pcmFormatInt16, // supported by agent plugins
sampleRate: 24000, // supported by agent plugins
maxSize: 10 * 1024 * 1024 // arbitrary max recording size of 10MB
)

super.init()
}

Expand All @@ -66,25 +66,33 @@ public final class PreConnectAudioBuffer: NSObject, Loggable {
}

/// Start capturing audio and listening to ``RoomDelegate`` events.
/// - Parameters:
/// - timeout: The timeout for the remote participant to subscribe to the audio track.
@objc
public func startRecording() async throws {
room?.add(delegate: self)

public func startRecording(timeout: TimeInterval = 10) async throws {
let stream = try await recorder.start()
log("Started capturing audio", .info)

state.mutate { state in
state.audioStream = stream
state.timeout = timeout
}

room?.add(delegate: self)
}

/// Stop capturing audio.
/// - Parameters:
/// - flush: If `true`, the audio stream will be flushed immediately without sending.
@objc
public func stopRecording(flush: Bool = false) {
guard recorder.isRecording else { return }

recorder.stop()
log("Stopped capturing audio", .info)

if flush, let stream = state.audioStream {
log("Flushing audio stream", .info)
Task {
for await _ in stream {}
}
Expand All @@ -95,53 +103,58 @@ public final class PreConnectAudioBuffer: NSObject, Loggable {
// MARK: - RoomDelegate

extension PreConnectAudioBuffer: RoomDelegate {
public func roomDidConnect(_ room: Room) {
public func roomDidConnect(_: Room) {
Task {
try? await setParticipantAttribute(room: room)
try? await Task.sleep(nanoseconds: UInt64(state.timeout) * NSEC_PER_SEC)
stopRecording(flush: true)
}
}

public func room(_ room: Room, participant _: LocalParticipant, remoteDidSubscribeTrack _: LocalTrackPublication) {
public func room(_ room: Room, participant _: LocalParticipant, remoteDidSubscribeTrack publication: LocalTrackPublication) {
stopRecording()
Task {
try? await sendAudioData(to: room)
do {
try await sendAudioData(to: room, track: publication.sid)
} catch {
log("Unable to send audio: \(error)", .error)
}
}
}

/// Set the participant attribute to indicate that the audio buffer is active.
/// - Parameters:
/// - key: The key to set the attribute.
/// - room: The room instance to set the attribute.
@objc
public func setParticipantAttribute(key _: String = attributeKey, room: Room) async throws {
var attributes = room.localParticipant.attributes
attributes[Self.attributeKey] = "true"
try await room.localParticipant.set(attributes: attributes)
log("Set participant attribute", .info)
}

/// Send the audio data to the room.
/// - Parameters:
/// - room: The room instance to send the audio data.
/// - topic: The topic to send the audio data.
@objc
public func sendAudioData(to room: Room, on topic: String = dataTopic) async throws {
public func sendAudioData(to room: Room, track: Track.Sid, on topic: String = dataTopic) async throws {
let agentIdentities = room.remoteParticipants.filter { _, value in value.kind == .agent }.map(\.key)
guard !agentIdentities.isEmpty else { return }

guard let audioStream = state.audioStream else {
throw LiveKitError(.invalidState, message: "Audio stream is nil")
}

let audioData = try await audioStream.collect()
guard audioData.count > 1024 else {
throw LiveKitError(.unknown, message: "Audio data size too small, nothing to send")
}

defer {
room.remove(delegate: self)
}

let streamOptions = StreamByteOptions(
topic: topic,
attributes: [
"sampleRate": "\(recorder.sampleRate)",
"channels": "\(recorder.channels)",
]
"trackId": track.stringValue,
],
destinationIdentities: agentIdentities
)
let writer = try await room.localParticipant.streamBytes(options: streamOptions)
try await writer.write(audioStream.collect())
try await writer.write(audioData)
try await writer.close()
log("Sent audio data", .info)

room.remove(delegate: self)
log("Sent \(recorder.duration(audioData.count))s = \(audioData.count / 1024)KB of audio data to \(agentIdentities.count) agent(s) ", .info)
}
}
45 changes: 38 additions & 7 deletions Sources/LiveKit/Core/Room+PreConnect.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,44 @@
import Foundation

public extension Room {
/// Start capturing audio before connecting to the server,
/// so that it's not lost when the connection is established.
/// It will be automatically sent via data stream to the other participant
/// using the `PreConnectAudioBuffer.dataTopic` when the local track is subscribed.
/// Starts a pre-connect audio sequence that will automatically be cleaned up
/// when the operation fails.
///
/// - Parameters:
/// - timeout: The timeout for the remote participant to subscribe to the audio track.
/// - operation: The operation to perform while audio is being captured.
/// - Returns: The result of the operation.
///
/// - Example:
/// ```swift
/// try await room.withPreConnectAudio {
/// // Audio is being captured automatically
/// // Perform any other (async) setup here
/// guard let connectionDetails = try await tokenService.fetchConnectionDetails(roomName: roomName, participantName: participantName) else {
/// return
/// }
/// try await room.connect(url: connectionDetails.serverUrl, token: connectionDetails.participantToken)
/// }
/// ```
///
/// - See: ``PreConnectAudioBuffer``
/// - Note: Use ``AudioManager/setRecordingAlwaysPreparedMode(_:)`` to request microphone permissions early.
func startCapturingBeforeConnecting() async throws {
try await preConnectBuffer.startRecording()
/// - Important: Call ``AudioManager/setRecordingAlwaysPreparedMode(_:)`` during app launch sequence to request microphone permissions early.
///
func withPreConnectAudio<T>(timeout: TimeInterval = 10,
_ operation: @Sendable @escaping () async throws -> T) async throws -> T
{
try await preConnectBuffer.startRecording(timeout: timeout)

do {
return try await operation()
} catch {
preConnectBuffer.stopRecording(flush: true)
throw error
}
}

@available(*, deprecated, message: "Use withPreConnectAudio instead")
func startCapturingBeforeConnecting(timeout: TimeInterval = 10) async throws {
try await preConnectBuffer.startRecording(timeout: timeout)
}
}
30 changes: 18 additions & 12 deletions Sources/LiveKit/Core/Room.swift
Original file line number Diff line number Diff line change
Expand Up @@ -354,20 +354,30 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable {
let enableMicrophone = _state.connectOptions.enableMicrophone
log("Concurrent enable microphone mode: \(enableMicrophone)")

let createMicrophoneTrackTask: Task<LocalTrack, any Error>? = enableMicrophone ? Task {
let localTrack = LocalAudioTrack.createTrack(options: _state.roomOptions.defaultAudioCaptureOptions,
reportStatistics: _state.roomOptions.reportRemoteTrackStatistics)
// Initializes AudioDeviceModule's recording
try await localTrack.start()
return localTrack
} : nil
let createMicrophoneTrackTask: Task<LocalTrack, any Error>? = {
if preConnectBuffer.recorder.isRecording {
return Task {
preConnectBuffer.recorder.track
}
} else if enableMicrophone {
return Task {
let localTrack = LocalAudioTrack.createTrack(options: _state.roomOptions.defaultAudioCaptureOptions,
reportStatistics: _state.roomOptions.reportRemoteTrackStatistics)
// Initializes AudioDeviceModule's recording
try await localTrack.start()
return localTrack
}
} else {
return nil
}
}()

do {
try await fullConnectSequence(url, token)

if let createMicrophoneTrackTask, !createMicrophoneTrackTask.isCancelled {
let track = try await createMicrophoneTrackTask.value
try await localParticipant._publish(track: track)
try await localParticipant._publish(track: track, options: _state.roomOptions.defaultAudioPublishOptions.withPreconnect())
}

// Connect sequence successful
Expand Down Expand Up @@ -437,10 +447,6 @@ extension Room {
e2eeManager.cleanUp()
}

if disconnectError != nil {
preConnectBuffer.stopRecording(flush: true)
}

// Reset state
_state.mutate {
// if isFullReconnect, keep connection related states
Expand Down
13 changes: 13 additions & 0 deletions Sources/LiveKit/Extensions/AVAudioPCMBuffer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,16 @@ public extension AVAudioPCMBuffer {
}
}
}

extension AVAudioCommonFormat {
var bytesPerSample: Int {
switch self {
case .pcmFormatInt16: return 2
case .pcmFormatInt32: return 4
case .pcmFormatFloat32: return 4
case .pcmFormatFloat64: return 8
case .otherFormat: return 0
@unknown default: return 0
}
}
}
13 changes: 11 additions & 2 deletions Sources/LiveKit/Participant/LocalParticipant.swift
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,14 @@ public class LocalParticipant: Participant, @unchecked Sendable {
return didUpdate
}

override public func isMicrophoneEnabled() -> Bool {
if let room = _room, room.preConnectBuffer.recorder.isRecording {
return true
} else {
return super.isMicrophoneEnabled()
}
}

// MARK: - Broadcast Activation

#if os(iOS)
Expand Down Expand Up @@ -499,9 +507,9 @@ extension [Livekit_SubscribedQuality] {

// MARK: - Private

private extension LocalParticipant {
extension LocalParticipant {
@discardableResult
internal func _publish(track: LocalTrack, options: TrackPublishOptions? = nil) async throws -> LocalTrackPublication {
func _publish(track: LocalTrack, options: TrackPublishOptions? = nil) async throws -> LocalTrackPublication {
log("[publish] \(track) options: \(String(describing: options ?? nil))...", .info)

try checkPermissions(toPublish: track)
Expand Down Expand Up @@ -610,6 +618,7 @@ private extension LocalParticipant {
populatorFunc = { populator in
populator.disableDtx = !audioPublishOptions.dtx
populator.disableRed = !audioPublishOptions.red
populator.audioFeatures = Array(audioPublishOptions.toFeatures())

if let streamName = options?.streamName {
// Set stream name if specified in options
Expand Down
18 changes: 9 additions & 9 deletions Sources/LiveKit/Participant/Participant.swift
Original file line number Diff line number Diff line change
Expand Up @@ -259,29 +259,29 @@ public class Participant: NSObject, @unchecked Sendable, ObservableObject, Logga

return true
}
}

// MARK: - Simplified API

public extension Participant {
func isCameraEnabled() -> Bool {
public func isCameraEnabled() -> Bool {
!(getTrackPublication(source: .camera)?.isMuted ?? true)
}

func isMicrophoneEnabled() -> Bool {
public func isMicrophoneEnabled() -> Bool {
!(getTrackPublication(source: .microphone)?.isMuted ?? true)
}

func isScreenShareEnabled() -> Bool {
public func isScreenShareEnabled() -> Bool {
!(getTrackPublication(source: .screenShareVideo)?.isMuted ?? true)
}
}

internal func getTrackPublication(name: String) -> TrackPublication? {
// MARK: - Simplified API

extension Participant {
func getTrackPublication(name: String) -> TrackPublication? {
_state.trackPublications.values.first(where: { $0.name == name })
}

/// find the first publication matching `source` or any compatible.
internal func getTrackPublication(source: Track.Source) -> TrackPublication? {
func getTrackPublication(source: Track.Source) -> TrackPublication? {
// if source is unknown return nil
guard source != .unknown else { return nil }
// try to find a Publication with matching source
Expand Down
Loading
Loading