Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions cmd/lk/perf.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,70 @@ var (
},
},
},
{
Name: "stress-test",
Usage: "Run stress tests against LiveKit with simulated publishers & subscribers",
Action: stressTest,
Flags: []cli.Flag{
&cli.IntFlag{
Name: "rooms",
Usage: "`NUMBER` of rooms to open",
},
&cli.StringFlag{
Name: "room-prefix",
Usage: "Room `PREFIX` of tester participants (defaults to a random prefix)",
},
&cli.DurationFlag{
Name: "duration",
Usage: "`TIME` duration to run, 1m, 1h (by default will run until canceled)",
Value: 0,
},
&cli.IntFlag{
Name: "video-publishers",
Aliases: []string{"publishers"},
Usage: "`NUMBER` of participants that would publish video tracks",
},
&cli.IntFlag{
Name: "audio-publishers",
Usage: "`NUMBER` of participants that would publish audio tracks",
},
&cli.IntFlag{
Name: "subscribers",
Usage: "`NUMBER` of participants that would subscribe to tracks",
},
&cli.StringFlag{
Name: "identity-prefix",
Usage: "Identity `PREFIX` of tester participants (defaults to a random prefix)",
},
&cli.StringFlag{
Name: "video-resolution",
Usage: "Resolution `QUALITY` of video to publish (\"high\", \"medium\", or \"low\")",
Value: "high",
},
&cli.StringFlag{
Name: "video-codec",
Usage: "`CODEC` \"h264\" or \"vp8\", both will be used when unset",
},
&cli.FloatFlag{
Name: "num-per-second",
Usage: "`NUMBER` of testers to start every second",
Value: 5,
},
&cli.StringFlag{
Name: "layout",
Usage: "`LAYOUT` to simulate, choose from \"speaker\", \"3x3\", \"4x4\", \"5x5\"",
Value: "speaker",
},
&cli.BoolFlag{
Name: "no-simulcast",
Usage: "Disables simulcast publishing (simulcast is enabled by default)",
},
&cli.BoolFlag{
Name: "simulate-speakers",
Usage: "Fire random speaker events to simulate speaker changes",
},
},
},
{
Name: "agent-load-test",
Usage: "Run load tests for a running agent",
Expand Down Expand Up @@ -177,6 +241,70 @@ var (
},
},
},
{
Name: "stress-test",
Usage: "Run stress tests against LiveKit with simulated publishers & subscribers",
Action: stressTest,
Flags: []cli.Flag{
&cli.IntFlag{
Name: "rooms",
Usage: "`NUMBER` of rooms to open",
},
&cli.StringFlag{
Name: "room-prefix",
Usage: "Room `PREFIX` of tester participants (defaults to a random prefix)",
},
&cli.DurationFlag{
Name: "duration",
Usage: "`TIME` duration to run, 1m, 1h (by default will run until canceled)",
Value: 0,
},
&cli.IntFlag{
Name: "video-publishers",
Aliases: []string{"publishers"},
Usage: "`NUMBER` of participants that would publish video tracks",
},
&cli.IntFlag{
Name: "audio-publishers",
Usage: "`NUMBER` of participants that would publish audio tracks",
},
&cli.IntFlag{
Name: "subscribers",
Usage: "`NUMBER` of participants that would subscribe to tracks",
},
&cli.StringFlag{
Name: "identity-prefix",
Usage: "Identity `PREFIX` of tester participants (defaults to a random prefix)",
},
&cli.StringFlag{
Name: "video-resolution",
Usage: "Resolution `QUALITY` of video to publish (\"high\", \"medium\", or \"low\")",
Value: "high",
},
&cli.StringFlag{
Name: "video-codec",
Usage: "`CODEC` \"h264\" or \"vp8\", both will be used when unset",
},
&cli.FloatFlag{
Name: "num-per-second",
Usage: "`NUMBER` of testers to start every second",
Value: 5,
},
&cli.StringFlag{
Name: "layout",
Usage: "`LAYOUT` to simulate, choose from \"speaker\", \"3x3\", \"4x4\", \"5x5\"",
Value: "speaker",
},
&cli.BoolFlag{
Name: "no-simulcast",
Usage: "Disables simulcast publishing (simulcast is enabled by default)",
},
&cli.BoolFlag{
Name: "simulate-speakers",
Usage: "Fire random speaker events to simulate speaker changes",
},
},
},
}
)

Expand Down Expand Up @@ -225,6 +353,40 @@ func loadTest(ctx context.Context, cmd *cli.Command) error {
return test.Run(ctx)
}

func stressTest(ctx context.Context, cmd *cli.Command) error {
pc, err := loadProjectDetails(cmd)
if err != nil {
return err
}
if !cmd.Bool("verbose") {
lksdk.SetLogger(logger.LogRLogger(logr.Discard()))
}
_ = raiseULimit()

params := loadtester.Params{
VideoResolution: cmd.String("video-resolution"),
VideoCodec: cmd.String("video-codec"),
Duration: cmd.Duration("duration"),
NumPerSecond: cmd.Float("num-per-second"),
Simulcast: !cmd.Bool("no-simulcast"),
SimulateSpeakers: cmd.Bool("simulate-speakers"),
TesterParams: loadtester.TesterParams{
URL: pc.URL,
APIKey: pc.APIKey,
APISecret: pc.APISecret,
IdentityPrefix: cmd.String("identity-prefix"),
Layout: loadtester.LayoutFromString(cmd.String("layout")),
},
}
params.Rooms = int(cmd.Int("rooms"))
params.RoomPrefix = cmd.String("room-prefix")
params.VideoPublishers = int(cmd.Int("video-publishers"))
params.AudioPublishers = int(cmd.Int("audio-publishers"))
params.Subscribers = int(cmd.Int("subscribers"))
test := loadtester.NewLoadTest(params)
return test.RunStress(ctx)
}

func agentLoadTest(ctx context.Context, cmd *cli.Command) error {
pc, err := loadProjectDetails(cmd)
if err != nil {
Expand Down
121 changes: 120 additions & 1 deletion pkg/loadtester/loadtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type LoadTest struct {
}

type Params struct {
Rooms int
VideoPublishers int
AudioPublishers int
Subscribers int
Expand Down Expand Up @@ -271,6 +272,124 @@ func (t *LoadTest) RunSuite(ctx context.Context) error {
return nil
}

func (t *LoadTest) RunStress(ctx context.Context) error {
parsedUrl, err := url.Parse(t.Params.URL)
if err != nil {
return err
}
if strings.HasSuffix(parsedUrl.Hostname(), ".livekit.cloud") {
if t.Params.VideoPublishers > 50 || t.Params.Subscribers > 50 || t.Params.AudioPublishers > 50 {
return errors.New("Unable to perform load test on LiveKit Cloud. Load testing is prohibited by our acceptable use policy: https://livekit.io/legal/acceptable-use-policy")
}
}

if t.Params.RoomPrefix == "" {
t.Params.RoomPrefix = randStringRunes(5)
}

roomsStats := make(map[string]map[string]*testerStats)
roomsStatsMutex := sync.Mutex{}

wg := sync.WaitGroup{}
wg.Add(t.Params.Rooms)
for i := 0; i < t.Params.Rooms; i++ {
roomName := fmt.Sprintf("%s_%d", t.Params.RoomPrefix, i)
params := t.Params
params.Room = roomName

go func() {
defer wg.Done()
stats, err := t.run(ctx, params)
if err != nil {
return
}
roomsStatsMutex.Lock()
defer roomsStatsMutex.Unlock()
roomsStats[roomName] = stats
}()
}
wg.Wait()

var totals summary
var summariesLen int64

summaryTable := util.CreateTable().
Headers("Room", "Tracks", "Bitrate", "Total Pkt. Loss", "Error").
StyleFunc(func(row, col int) lipgloss.Style {
if row == table.HeaderRow {
return util.FormHeaderStyle
}
return util.FormBaseStyle
})

for roomName, stats := range roomsStats {
// tester results
summaries := make(map[string]*summary)
names := make([]string, 0, len(stats))
for name := range stats {
if strings.HasPrefix(name, "Pub") {
continue
}
names = append(names, name)
}
sort.Strings(names)

for _, name := range names {
testerStats := stats[name]
summaries[name] = getTesterSummary(testerStats)
trackStatsSlice := make([]*trackStats, 0, len(testerStats.trackStats))
for _, ts := range testerStats.trackStats {
trackStatsSlice = append(trackStatsSlice, ts)
}
sort.Slice(trackStatsSlice, func(i, j int) bool {
return strings.Compare(
string(trackStatsSlice[i].kind),
string(trackStatsSlice[j].kind),
) < 0
})
}

if len(summaries) == 0 {
continue
}
{
// totals row
s := getTestSummary(summaries)
sDropped := formatLossRate(s.packets, s.dropped)
// avg bitrate per sub
sBitrate := fmt.Sprintf("%s (%s avg)",
formatBitrate(s.bytes, s.elapsed),
formatBitrate(s.bytes/int64(len(summaries)), s.elapsed),
)
summaryTable.Row(roomName, fmt.Sprintf("%d/%d", s.tracks, s.expected), sBitrate, sDropped, strconv.FormatInt(s.errCount, 10))

totals.tracks += s.tracks
totals.expected += s.expected
totals.packets += s.packets
totals.bytes += s.bytes
totals.dropped += s.dropped
totals.elapsed += s.elapsed
totals.errCount += s.errCount
summariesLen += int64(len(summaries))
}
}

{
sDropped := formatLossRate(totals.packets, totals.dropped)
sBitrate := fmt.Sprintf("%s (%s avg)",
formatBitrate(totals.bytes*int64(len(roomsStats)), totals.elapsed),
formatBitrate((totals.bytes*int64(len(roomsStats))/summariesLen), totals.elapsed),
)

summaryTable.Row("Total", fmt.Sprintf("%d/%d", totals.tracks, totals.expected), sBitrate, sDropped, strconv.FormatInt(totals.errCount, 10))
}

fmt.Println("\nRoom summaries:")
fmt.Println(summaryTable)

return nil
}

func (t *LoadTest) run(ctx context.Context, params Params) (map[string]*testerStats, error) {
if params.Room == "" {
params.Room = fmt.Sprintf("testroom%d", rand.Int31n(1000))
Expand Down Expand Up @@ -387,7 +506,7 @@ func (t *LoadTest) run(ctx context.Context, params Params) (map[string]*testerSt
// a really long time
duration = 1000 * time.Hour
}
fmt.Printf("Finished connecting to room, waiting %s\n", duration.String())
fmt.Printf("Finished connecting to room %s, waiting %s\n", params.Room, duration.String())

select {
case <-ctx.Done():
Expand Down
9 changes: 5 additions & 4 deletions pkg/loadtester/loadtester.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type TesterParams struct {
APIKey string
APISecret string
Room string
RoomPrefix string
IdentityPrefix string
Layout Layout
// true to subscribe to all published tracks
Expand Down Expand Up @@ -152,7 +153,7 @@ func (t *LoadTester) PublishAudioTrack(name string) (string, error) {
return "", nil
}

fmt.Println("publishing audio track -", t.room.LocalParticipant.Identity())
fmt.Println("publishing room", t.room.Name(), "audio track -", t.room.LocalParticipant.Identity())
audioLooper, err := provider2.CreateAudioLooper()
if err != nil {
return "", err
Expand All @@ -179,7 +180,7 @@ func (t *LoadTester) PublishVideoTrack(name, resolution, codec string) (string,
return "", nil
}

fmt.Println("publishing video track -", t.room.LocalParticipant.Identity())
fmt.Println("publishing room", t.room.Name(), "video track -", t.room.LocalParticipant.Identity())
loopers, err := provider2.CreateVideoLoopers(resolution, codec, false)
if err != nil {
return "", err
Expand All @@ -204,7 +205,7 @@ func (t *LoadTester) PublishVideoTrack(name, resolution, codec string) (string,
func (t *LoadTester) PublishSimulcastTrack(name, resolution, codec string) (string, error) {
var tracks []*lksdk.LocalTrack

fmt.Println("publishing simulcast video track -", t.room.LocalParticipant.Identity())
fmt.Println("publishing room", t.room.Name(), "simulcast video track -", t.room.LocalParticipant.Identity())
loopers, err := provider2.CreateVideoLoopers(resolution, codec, true)
if err != nil {
return "", err
Expand Down Expand Up @@ -317,7 +318,7 @@ func (t *LoadTester) onTrackSubscribed(track *webrtc.TrackRemote, pub *lksdk.Rem
kind: pub.Kind(),
}
t.stats.Store(track.ID(), s)
fmt.Println("subscribed to track", t.room.LocalParticipant.Identity(), pub.SID(), pub.Kind(), fmt.Sprintf("%d/%d", numSubscribed, numTotal))
fmt.Println("subscribed to room", t.room.Name(), "- track", t.room.LocalParticipant.Identity(), pub.SID(), pub.Kind(), fmt.Sprintf("%d/%d", numSubscribed, numTotal))

// consume track
go t.consumeTrack(track, pub, rp)
Expand Down
Loading