Skip to content
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/experiment-browser/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"@amplitude/experiment-core": "^0.11.0",
"@amplitude/ua-parser-js": "^0.7.31",
"base64-js": "1.5.1",
"eventsource": "^2",
"unfetch": "4.1.0"
},
"devDependencies": {
Expand Down
14 changes: 14 additions & 0 deletions packages/experiment-browser/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,18 @@ export interface ExperimentConfig {
*/
flagConfigPollingIntervalMillis?: number;

/**
* If true, the client will stream updates for remote evaluation from the server.
* fetch() will update variants and initiate a connection to the server.
*/
streamVariants?: boolean;

/**
* The URL to stream remote evaluation updates from. This is only used if
* `streamVariants` is `true`.
*/
streamVariantsServerUrl?: string;

/**
* Explicitly enable or disable calling {@link fetch()} on {@link start()}:
*
Expand Down Expand Up @@ -190,6 +202,8 @@ export const Defaults: ExperimentConfig = {
automaticExposureTracking: true,
pollOnStart: true,
flagConfigPollingIntervalMillis: 300000,
streamVariants: false,
streamVariantsServerUrl: 'https://stream.lab.amplitude.com',
fetchOnStart: true,
automaticFetchOnAmplitudeIdentityChange: false,
userProvider: null,
Expand Down
149 changes: 54 additions & 95 deletions packages/experiment-browser/src/experimentClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import {
EvaluationApi,
EvaluationEngine,
EvaluationFlag,
EvaluationVariant,
FetchError,
FlagApi,
Poller,
SdkEvaluationApi,
SdkFlagApi,
SdkStreamEvaluationApi,
TimeoutError,
topologicalSort,
} from '@amplitude/experiment-core';
Expand All @@ -29,6 +31,7 @@ import {
import { LocalStorage } from './storage/local-storage';
import { SessionStorage } from './storage/session-storage';
import { FetchHttpClient, WrapperClient } from './transport/http';
import { defaultSseProvider } from './transport/stream';
import { exposureEvent } from './types/analytics';
import { Client, FetchOptions } from './types/client';
import { Exposure, ExposureTrackingProvider } from './types/exposure';
Expand All @@ -51,18 +54,22 @@ import {
} from './util/convert';
import { SessionAnalyticsProvider } from './util/sessionAnalyticsProvider';
import { SessionExposureTrackingProvider } from './util/sessionExposureTrackingProvider';
import {
VariantsFetchUpdater,
VariantsRetryAndFallbackWrapperUpdater,
VariantsStreamUpdater,
VariantUpdater,
} from './util/updaters';

// Configs which have been removed from the public API.
// May be added back in the future.
const fetchBackoffTimeout = 10000;
const fetchBackoffAttempts = 8;
const fetchBackoffMinMillis = 500;
const fetchBackoffMaxMillis = 10000;
const fetchBackoffScalar = 1.5;
const minFlagPollerIntervalMillis = 60000;
const streamConnectionTimeoutMillis = 3000;
const streamRetryIntervalMillis = 10 * 60 * 1000;

const euServerUrl = 'https://api.lab.eu.amplitude.com';
const euFlagsServerUrl = 'https://flag.lab.eu.amplitude.com';
const euStreamVariantsServerUrl = 'https://stream.lab.eu.amplitude.com';

/**
* The default {@link Client} used to fetch variations from Experiment's
Expand All @@ -76,7 +83,7 @@ export class ExperimentClient implements Client {
private readonly variants: LoadStoreCache<Variant>;
private readonly flags: LoadStoreCache<EvaluationFlag>;
private readonly flagApi: FlagApi;
private readonly evaluationApi: EvaluationApi;
private readonly variantUpdater: VariantUpdater;
private readonly engine: EvaluationEngine = new EvaluationEngine();
private user: ExperimentUser | undefined;
private userProvider: ExperimentUserProvider | undefined;
Expand Down Expand Up @@ -117,6 +124,11 @@ export class ExperimentClient implements Client {
(config?.serverZone?.toLowerCase() === 'eu'
? euFlagsServerUrl
: Defaults.flagsServerUrl),
streamVariantsServerUrl:
config?.streamVariantsServerUrl ||
(config?.serverZone?.toLowerCase() === 'eu'
? euStreamVariantsServerUrl
: Defaults.streamVariantsServerUrl),
// Force minimum flag config polling interval.
flagConfigPollingIntervalMillis:
config.flagConfigPollingIntervalMillis < minFlagPollerIntervalMillis
Expand Down Expand Up @@ -161,11 +173,27 @@ export class ExperimentClient implements Client {
this.config.flagsServerUrl,
httpClient,
);
this.evaluationApi = new SdkEvaluationApi(
const evaluationApi = new SdkEvaluationApi(
this.apiKey,
this.config.serverUrl,
httpClient,
);
this.variantUpdater = new VariantsFetchUpdater(evaluationApi);
if (config.streamVariants) {
const streamEvaluationApi = new SdkStreamEvaluationApi(
this.apiKey,
this.config.streamVariantsServerUrl,
defaultSseProvider,
streamConnectionTimeoutMillis,
evaluationApi,
);
const streamUpdater = new VariantsStreamUpdater(streamEvaluationApi);
this.variantUpdater = new VariantsRetryAndFallbackWrapperUpdater(
streamUpdater,
this.variantUpdater,
streamRetryIntervalMillis,
);
}
// Storage & Caching
let storage: Storage;
const storageInstanceName = internalInstanceName
Expand Down Expand Up @@ -233,6 +261,7 @@ export class ExperimentClient implements Client {
* Stop the local flag configuration poller.
*/
public stop() {
this.variantUpdater.stop();
if (!this.isRunning) {
return;
}
Expand Down Expand Up @@ -267,23 +296,26 @@ export class ExperimentClient implements Client {
user: ExperimentUser = this.user,
options?: FetchOptions,
): Promise<ExperimentClient> {
// Don't even try to fetch variants if API key is not set
if (!this.apiKey) {
throw Error('Experiment API key is empty');
}
this.setUser(user || {});
try {
await this.fetchInternal(
user,
this.config.fetchTimeoutMillis,
this.config.retryFetchOnFailure,
options,
);
} catch (e) {
if (this.config.debug) {
if (e instanceof TimeoutError) {
console.debug(e);
} else {
console.error(e);
user = await this.addContextOrWait(user);
user = this.cleanUserPropsForFetch(user);
await this.variantUpdater.start(
async (results: Record<string, EvaluationVariant>) => {
const variants: Variants = {};
for (const key of Object.keys(results)) {
variants[key] = convertEvaluationVariantToVariant(results[key]);
}
}
}
await this.storeVariants(variants, options);
},
(err) => {
console.error(err);
},
{ user, options, config: this.config },
);
return this;
}

Expand Down Expand Up @@ -665,63 +697,12 @@ export class ExperimentClient implements Client {
return defaultSourceVariant;
}

private async fetchInternal(
user: ExperimentUser,
timeoutMillis: number,
retry: boolean,
options?: FetchOptions,
): Promise<Variants> {
// Don't even try to fetch variants if API key is not set
if (!this.apiKey) {
throw Error('Experiment API key is empty');
}

this.debug(`[Experiment] Fetch all: retry=${retry}`);

// Proactively cancel retries if active in order to avoid unnecessary API
// requests. A new failure will restart the retries.
if (retry) {
this.stopRetries();
}

try {
const variants = await this.doFetch(user, timeoutMillis, options);
await this.storeVariants(variants, options);
return variants;
} catch (e) {
if (retry && this.shouldRetryFetch(e)) {
void this.startRetries(user, options);
}
throw e;
}
}

private cleanUserPropsForFetch(user: ExperimentUser): ExperimentUser {
const cleanedUser = { ...user };
delete cleanedUser.cookie;
return cleanedUser;
}

private async doFetch(
user: ExperimentUser,
timeoutMillis: number,
options?: FetchOptions,
): Promise<Variants> {
user = await this.addContextOrWait(user);
user = this.cleanUserPropsForFetch(user);
this.debug('[Experiment] Fetch variants for user: ', user);
const results = await this.evaluationApi.getVariants(user, {
timeoutMillis: timeoutMillis,
...options,
});
const variants: Variants = {};
for (const key of Object.keys(results)) {
variants[key] = convertEvaluationVariantToVariant(results[key]);
}
this.debug('[Experiment] Received variants: ', variants);
return variants;
}

private async doFlags(): Promise<void> {
try {
let user: ExperimentUser;
Expand Down Expand Up @@ -780,28 +761,6 @@ export class ExperimentClient implements Client {
this.debug('[Experiment] Stored variants: ', variants);
}

private async startRetries(
user: ExperimentUser,
options: FetchOptions,
): Promise<void> {
this.debug('[Experiment] Retry fetch');
this.retriesBackoff = new Backoff(
fetchBackoffAttempts,
fetchBackoffMinMillis,
fetchBackoffMaxMillis,
fetchBackoffScalar,
);
void this.retriesBackoff.start(async () => {
await this.fetchInternal(user, fetchBackoffTimeout, false, options);
});
}

private stopRetries(): void {
if (this.retriesBackoff) {
this.retriesBackoff.cancel();
}
}

private addContext(user: ExperimentUser): ExperimentUser {
const providedUser = this.userProvider?.getUser();
const integrationUser = this.integrationManager.getUser();
Expand Down
17 changes: 17 additions & 0 deletions packages/experiment-browser/src/transport/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/**
* @packageDocumentation
* @internal
*/

import { SSE } from '@amplitude/experiment-core';
import EventSource from 'eventsource';

export const defaultSseProvider = (
url: string,
headers: Record<string, string>,
): SSE => {
const es = new EventSource(url, {
headers,
});
return es;
};
Loading
Loading