Skip to content

Commit b91e821

Browse files
authored
Merge pull request #628 from GetStream/mark-read-throttle
fix: throttle mark read API requests
2 parents 164b676 + 0a29d99 commit b91e821

File tree

4 files changed

+160
-8
lines changed

4 files changed

+160
-8
lines changed

projects/stream-chat-angular/src/lib/channel.service.spec.ts

Lines changed: 112 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { fakeAsync, TestBed, tick } from '@angular/core/testing';
1+
import { fakeAsync, flush, TestBed, tick } from '@angular/core/testing';
22
import { Subject } from 'rxjs';
33
import { first, take } from 'rxjs/operators';
44
import {
@@ -208,6 +208,7 @@ describe('ChannelService', () => {
208208
events$.next({ eventType: 'connection.recovered' } as ClientEvent);
209209

210210
tick();
211+
flush();
211212

212213
expect(spy).toHaveBeenCalledWith(channels);
213214
expect(activeChannelSpy).toHaveBeenCalledWith(channels[0]);
@@ -577,6 +578,10 @@ describe('ChannelService', () => {
577578

578579
it('should watch for new message events', async () => {
579580
await init();
581+
// wait for mark read throttle time
582+
await new Promise((resolve) => {
583+
setTimeout(resolve, service['markReadThrottleTime']);
584+
});
580585
const spy = jasmine.createSpy();
581586
service.activeChannelMessages$.subscribe(spy);
582587
const prevCount = (spy.calls.mostRecent().args[0] as Channel[]).length;
@@ -991,6 +996,7 @@ describe('ChannelService', () => {
991996

992997
it('should add the new channel to the top of the list, and start watching it, if user is added to a channel', fakeAsync(async () => {
993998
await init();
999+
flush();
9941000
const newChannel = generateMockChannels()[0];
9951001
newChannel.cid = 'newchannel';
9961002
newChannel.id = 'newchannel';
@@ -1030,6 +1036,7 @@ describe('ChannelService', () => {
10301036
event: { channel: channel } as any as Event<DefaultStreamChatGenerics>,
10311037
});
10321038
tick();
1039+
flush();
10331040

10341041
const channels = spy.calls.mostRecent().args[0] as Channel[];
10351042
const firstChannel = channels[0];
@@ -1059,6 +1066,7 @@ describe('ChannelService', () => {
10591066
event: { channel: channel } as any as Event<DefaultStreamChatGenerics>,
10601067
});
10611068
tick();
1069+
flush();
10621070

10631071
const channels = spy.calls.mostRecent().args[0] as Channel[];
10641072

@@ -2242,6 +2250,7 @@ describe('ChannelService', () => {
22422250

22432251
it('should relaod active channel if active channel is not present after state reconnect', fakeAsync(async () => {
22442252
await init();
2253+
flush();
22452254
let activeChannel!: Channel<DefaultStreamChatGenerics>;
22462255
service.activeChannel$.subscribe((c) => (activeChannel = c!));
22472256
let channels!: Channel<DefaultStreamChatGenerics>[];
@@ -2251,6 +2260,7 @@ describe('ChannelService', () => {
22512260
mockChatClient.queryChannels.and.resolveTo(channels);
22522261
events$.next({ eventType: 'connection.recovered' } as ClientEvent);
22532262
tick();
2263+
flush();
22542264
const spy = jasmine.createSpy();
22552265
service.activeChannel$.subscribe(spy);
22562266

@@ -2276,6 +2286,7 @@ describe('ChannelService', () => {
22762286
activeChannel.state.messages.push(newMessage);
22772287
events$.next({ eventType: 'connection.recovered' } as ClientEvent);
22782288
tick();
2289+
flush();
22792290

22802291
expect(spy).not.toHaveBeenCalled();
22812292
expect(service.deselectActiveChannel).not.toHaveBeenCalled();
@@ -2354,6 +2365,22 @@ describe('ChannelService', () => {
23542365
expect(service.activeChannelUnreadCount).toBe(0);
23552366
});
23562367

2368+
it(`should set last read message id to undefined if unread count is 0`, () => {
2369+
const activeChannel = generateMockChannels()[0];
2370+
activeChannel.id = 'next-active-channel';
2371+
activeChannel.state.read[user.id] = {
2372+
last_read: new Date(),
2373+
last_read_message_id: 'last-read-message-id',
2374+
unread_messages: 0,
2375+
user: user,
2376+
};
2377+
2378+
service.setAsActiveChannel(activeChannel);
2379+
2380+
expect(service.activeChannelLastReadMessageId).toBe(undefined);
2381+
expect(service.activeChannelUnreadCount).toBe(0);
2382+
});
2383+
23572384
it('should be able to select empty channel as active channel', () => {
23582385
const channel = generateMockChannels()[0];
23592386
channel.id = 'new-empty-channel';
@@ -2558,6 +2585,18 @@ describe('ChannelService', () => {
25582585

25592586
expect(service.activeChannelLastReadMessageId).toBe('last-read-message');
25602587
expect(service.activeChannelUnreadCount).toBe(12);
2588+
2589+
events$.next({
2590+
eventType: 'notification.mark_unread',
2591+
event: {
2592+
channel_id: service.activeChannel?.id,
2593+
unread_messages: 0,
2594+
last_read_message_id: 'last-read-message',
2595+
} as Event<DefaultStreamChatGenerics>,
2596+
});
2597+
2598+
expect(service.activeChannelLastReadMessageId).toBe(undefined);
2599+
expect(service.activeChannelUnreadCount).toBe(0);
25612600
});
25622601

25632602
it('should halt marking the channel as read if an unread call was made in that session', async () => {
@@ -2639,4 +2678,76 @@ describe('ChannelService', () => {
26392678
expect(customQuery).toHaveBeenCalledWith('next-page');
26402679
expect(hasMoreSpy).toHaveBeenCalledWith(false);
26412680
});
2681+
2682+
it('should throttle mark read API calls', async () => {
2683+
await init();
2684+
// wait for mark read throttle time
2685+
await new Promise((resolve) => {
2686+
setTimeout(resolve, service['markReadThrottleTime']);
2687+
});
2688+
2689+
const activeChannel = service.activeChannel!;
2690+
spyOn(activeChannel, 'markRead');
2691+
2692+
(activeChannel as MockChannel).handleEvent('message.new', mockMessage());
2693+
2694+
expect(activeChannel.markRead).toHaveBeenCalledTimes(1);
2695+
2696+
(activeChannel as MockChannel).handleEvent('message.new', mockMessage());
2697+
2698+
expect(activeChannel.markRead).toHaveBeenCalledTimes(1);
2699+
2700+
// wait for mark read throttle time
2701+
await new Promise((resolve) => {
2702+
setTimeout(resolve, service['markReadThrottleTime']);
2703+
});
2704+
2705+
expect(activeChannel.markRead).toHaveBeenCalledTimes(2);
2706+
});
2707+
2708+
it('should throttle mark read API calls - channel change', async () => {
2709+
await init();
2710+
// wait for mark read throttle time
2711+
await new Promise((resolve) => {
2712+
setTimeout(resolve, service['markReadThrottleTime']);
2713+
});
2714+
2715+
const activeChannel = service.activeChannel!;
2716+
spyOn(activeChannel, 'markRead');
2717+
2718+
(activeChannel as MockChannel).handleEvent('message.new', mockMessage());
2719+
2720+
expect(activeChannel.markRead).toHaveBeenCalledTimes(1);
2721+
2722+
(activeChannel as MockChannel).handleEvent('message.new', mockMessage());
2723+
2724+
expect(activeChannel.markRead).toHaveBeenCalledTimes(1);
2725+
2726+
service.setAsActiveChannel(service.channels[1]);
2727+
2728+
expect(activeChannel.markRead).toHaveBeenCalledTimes(2);
2729+
});
2730+
2731+
it('should throttle mark read API calls - reset', async () => {
2732+
await init();
2733+
// wait for mark read throttle time
2734+
await new Promise((resolve) => {
2735+
setTimeout(resolve, service['markReadThrottleTime']);
2736+
});
2737+
2738+
const activeChannel = service.activeChannel!;
2739+
spyOn(activeChannel, 'markRead');
2740+
2741+
(activeChannel as MockChannel).handleEvent('message.new', mockMessage());
2742+
2743+
expect(activeChannel.markRead).toHaveBeenCalledTimes(1);
2744+
2745+
(activeChannel as MockChannel).handleEvent('message.new', mockMessage());
2746+
2747+
expect(activeChannel.markRead).toHaveBeenCalledTimes(1);
2748+
2749+
service.reset();
2750+
2751+
expect(activeChannel.markRead).toHaveBeenCalledTimes(2);
2752+
});
26422753
});

projects/stream-chat-angular/src/lib/channel.service.thread.spec.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { fakeAsync, TestBed, tick } from '@angular/core/testing';
1+
import { fakeAsync, flush, TestBed, tick } from '@angular/core/testing';
22
import { Subject } from 'rxjs';
33
import { first } from 'rxjs/operators';
44
import {
@@ -235,6 +235,7 @@ describe('ChannelService - threads', () => {
235235
spy.calls.reset();
236236
events$.next({ eventType: 'connection.recovered' } as ClientEvent);
237237
tick();
238+
flush();
238239

239240
expect(spy).toHaveBeenCalledWith(undefined);
240241
}));
@@ -314,6 +315,10 @@ describe('ChannelService - threads', () => {
314315

315316
it('should watch for new message events', async () => {
316317
await init();
318+
// wait for mark read throttle time
319+
await new Promise((resolve) => {
320+
setTimeout(resolve, service['markReadThrottleTime']);
321+
});
317322
const spy = jasmine.createSpy();
318323
const parentMessage = mockMessage();
319324
await service.setAsActiveParentMessage(parentMessage);

projects/stream-chat-angular/src/lib/channel.service.ts

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,9 @@ export class ChannelService<
428428
};
429429
private dismissErrorNotification?: () => void;
430430
private areReadEventsPaused = false;
431+
private markReadThrottleTime = 1050;
432+
private markReadTimeout?: ReturnType<typeof setTimeout>;
433+
private scheduledMarkReadRequest?: () => void;
431434

432435
constructor(
433436
private chatClientService: ChatClientService<T>,
@@ -563,17 +566,19 @@ export class ChannelService<
563566
return;
564567
}
565568
this.stopWatchForActiveChannelEvents(prevActiveChannel);
569+
this.flushMarkReadQueue();
566570
this.areReadEventsPaused = false;
567571
const readState =
568572
channel.state.read[this.chatClientService.chatClient.user?.id || ''];
569573
this.activeChannelLastReadMessageId = readState?.last_read_message_id;
574+
this.activeChannelUnreadCount = readState?.unread_messages || 0;
570575
if (
571576
channel.state.latestMessages[channel.state.latestMessages.length - 1]
572-
?.id === this.activeChannelLastReadMessageId
577+
?.id === this.activeChannelLastReadMessageId ||
578+
this.activeChannelUnreadCount === 0
573579
) {
574580
this.activeChannelLastReadMessageId = undefined;
575581
}
576-
this.activeChannelUnreadCount = readState?.unread_messages || 0;
577582
this.watchForActiveChannelEvents(channel);
578583
this.addChannel(channel);
579584
this.activeChannelSubject.next(channel);
@@ -595,6 +600,7 @@ export class ChannelService<
595600
return;
596601
}
597602
this.stopWatchForActiveChannelEvents(activeChannel);
603+
this.flushMarkReadQueue();
598604
this.activeChannelMessagesSubject.next([]);
599605
this.activeChannelSubject.next(undefined);
600606
this.activeParentMessageIdSubject.next(undefined);
@@ -1567,6 +1573,9 @@ export class ChannelService<
15671573
this.ngZone.run(() => {
15681574
this.activeChannelLastReadMessageId = e.last_read_message_id;
15691575
this.activeChannelUnreadCount = e.unread_messages;
1576+
if (this.activeChannelUnreadCount === 0) {
1577+
this.activeChannelLastReadMessageId = undefined;
1578+
}
15701579
this.activeChannelSubject.next(this.activeChannel);
15711580
});
15721581
})
@@ -2220,16 +2229,43 @@ export class ChannelService<
22202229
this.usersTypingInThreadSubject.next([]);
22212230
}
22222231

2223-
private markRead(channel: Channel<T>) {
2232+
private markRead(channel: Channel<T>, isThrottled = true) {
22242233
if (
22252234
this.canSendReadEvents &&
22262235
this.shouldMarkActiveChannelAsRead &&
2227-
!this.areReadEventsPaused
2236+
!this.areReadEventsPaused &&
2237+
channel.countUnread() > 0
22282238
) {
2229-
void channel.markRead();
2239+
if (isThrottled) {
2240+
this.markReadThrottled(channel);
2241+
} else {
2242+
void channel.markRead();
2243+
}
22302244
}
22312245
}
22322246

2247+
private markReadThrottled(channel: Channel<T>) {
2248+
if (!this.markReadTimeout) {
2249+
this.markRead(channel, false);
2250+
this.markReadTimeout = setTimeout(() => {
2251+
this.flushMarkReadQueue();
2252+
}, this.markReadThrottleTime);
2253+
} else {
2254+
clearTimeout(this.markReadTimeout);
2255+
this.scheduledMarkReadRequest = () => this.markRead(channel, false);
2256+
this.markReadTimeout = setTimeout(() => {
2257+
this.flushMarkReadQueue();
2258+
}, this.markReadThrottleTime);
2259+
}
2260+
}
2261+
2262+
private flushMarkReadQueue() {
2263+
this.scheduledMarkReadRequest?.();
2264+
this.scheduledMarkReadRequest = undefined;
2265+
clearTimeout(this.markReadTimeout);
2266+
this.markReadTimeout = undefined;
2267+
}
2268+
22332269
private async _init(settings: {
22342270
shouldSetActiveChannel: boolean;
22352271
messagePageSize: number;

projects/stream-chat-angular/src/lib/mocks/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ export const generateMockChannels = (length = 25) => {
9898
sendAction: () => {},
9999
deleteImage: () => {},
100100
deleteFile: () => {},
101-
countUnread: () => {},
101+
countUnread: () => 3,
102102
markRead: () => {},
103103
getReplies: () => {},
104104
keystroke: () => {},

0 commit comments

Comments
 (0)