diff --git a/docker-compose.base.yml b/docker-compose.base.yml index 14fd4e74e5d5c..5b6c26e046e46 100644 --- a/docker-compose.base.yml +++ b/docker-compose.base.yml @@ -51,6 +51,8 @@ services: path /public/webhooks/* path /public/m/ path /public/m/* + path /public/links/ + path /public/links/* } handle @capture { diff --git a/docker-compose.dev-minimal.yml b/docker-compose.dev-minimal.yml index 693269941ff78..1636115843e95 100644 --- a/docker-compose.dev-minimal.yml +++ b/docker-compose.dev-minimal.yml @@ -49,6 +49,8 @@ services: path /public/webhooks/* path /public/m/ path /public/m/* + path /public/links/ + path /public/links/* } handle @capture { diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 98307511b2642..55c76aca8be18 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -48,6 +48,8 @@ services: path /public/webhooks/* path /public/m path /public/m/* + path /public/links/ + path /public/links/* } handle @capture { diff --git a/plugin-server/src/capabilities.ts b/plugin-server/src/capabilities.ts index 92de0f890a618..6155b8b26e96c 100644 --- a/plugin-server/src/capabilities.ts +++ b/plugin-server/src/capabilities.ts @@ -22,11 +22,13 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin cdpCyclotronWorkerDelay: true, cdpBehaviouralEvents: true, cdpApi: true, + linksApi: true, } case PluginServerMode.local_cdp: return { ingestionV2: true, + linksApi: true, cdpProcessedEvents: true, cdpPersonUpdates: true, cdpInternalEvents: true, @@ -95,5 +97,10 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin // NOTE: This is temporary until we have removed plugins appManagementSingleton: true, } + + case PluginServerMode.links_api: + return { + linksApi: true, + } } } diff --git a/plugin-server/src/cdp/cdp-api.ts b/plugin-server/src/cdp/api/cdp-api.ts similarity index 93% rename from plugin-server/src/cdp/cdp-api.ts rename to plugin-server/src/cdp/api/cdp-api.ts index 9ce2131a578ad..97cd8b73f9294 100644 --- a/plugin-server/src/cdp/cdp-api.ts +++ b/plugin-server/src/cdp/api/cdp-api.ts @@ -5,34 +5,34 @@ import { PluginEvent } from '@posthog/plugin-scaffold' import { ModifiedRequest } from '~/api/router' -import { HealthCheckResult, HealthCheckResultError, HealthCheckResultOk, Hub, PluginServerService } from '../types' -import { logger } from '../utils/logger' -import { UUID, UUIDT, delay } from '../utils/utils' +import { HealthCheckResult, HealthCheckResultError, HealthCheckResultOk, Hub, PluginServerService } from '../../types' +import { logger } from '../../utils/logger' +import { UUID, UUIDT, delay } from '../../utils/utils' import { CdpSourceWebhooksConsumer, HogFunctionWebhookResult, SourceWebhookError, -} from './consumers/cdp-source-webhooks.consumer' -import { HogTransformerService } from './hog-transformations/hog-transformer.service' -import { createCdpRedisPool } from './redis' -import { HogExecutorExecuteAsyncOptions, HogExecutorService, MAX_ASYNC_STEPS } from './services/hog-executor.service' -import { HogFlowExecutorService, createHogFlowInvocation } from './services/hogflows/hogflow-executor.service' -import { HogFlowFunctionsService } from './services/hogflows/hogflow-functions.service' -import { HogFlowManagerService } from './services/hogflows/hogflow-manager.service' -import { HogFunctionManagerService } from './services/managers/hog-function-manager.service' -import { HogFunctionTemplateManagerService } from './services/managers/hog-function-template-manager.service' -import { RecipientsManagerService } from './services/managers/recipients-manager.service' -import { EmailTrackingService } from './services/messaging/email-tracking.service' -import { RecipientPreferencesService } from './services/messaging/recipient-preferences.service' -import { RecipientTokensService } from './services/messaging/recipient-tokens.service' -import { HogFunctionMonitoringService } from './services/monitoring/hog-function-monitoring.service' -import { HogWatcherService, HogWatcherState } from './services/monitoring/hog-watcher.service' -import { NativeDestinationExecutorService } from './services/native-destination-executor.service' -import { SegmentDestinationExecutorService } from './services/segment-destination-executor.service' -import { HOG_FUNCTION_TEMPLATES } from './templates' -import { HogFunctionInvocationGlobals, HogFunctionType, MinimalLogEntry } from './types' -import { convertToHogFunctionInvocationGlobals, isNativeHogFunction, isSegmentPluginHogFunction } from './utils' -import { convertToHogFunctionFilterGlobal } from './utils/hog-function-filtering' +} from '../consumers/cdp-source-webhooks.consumer' +import { HogTransformerService } from '../hog-transformations/hog-transformer.service' +import { createCdpRedisPool } from '../redis' +import { HogExecutorExecuteAsyncOptions, HogExecutorService, MAX_ASYNC_STEPS } from '../services/hog-executor.service' +import { HogFlowExecutorService, createHogFlowInvocation } from '../services/hogflows/hogflow-executor.service' +import { HogFlowFunctionsService } from '../services/hogflows/hogflow-functions.service' +import { HogFlowManagerService } from '../services/hogflows/hogflow-manager.service' +import { HogFunctionManagerService } from '../services/managers/hog-function-manager.service' +import { HogFunctionTemplateManagerService } from '../services/managers/hog-function-template-manager.service' +import { RecipientsManagerService } from '../services/managers/recipients-manager.service' +import { EmailTrackingService } from '../services/messaging/email-tracking.service' +import { RecipientPreferencesService } from '../services/messaging/recipient-preferences.service' +import { RecipientTokensService } from '../services/messaging/recipient-tokens.service' +import { HogFunctionMonitoringService } from '../services/monitoring/hog-function-monitoring.service' +import { HogWatcherService, HogWatcherState } from '../services/monitoring/hog-watcher.service' +import { NativeDestinationExecutorService } from '../services/native-destination-executor.service' +import { SegmentDestinationExecutorService } from '../services/segment-destination-executor.service' +import { HOG_FUNCTION_TEMPLATES } from '../templates' +import { HogFunctionInvocationGlobals, HogFunctionType, MinimalLogEntry } from '../types' +import { convertToHogFunctionInvocationGlobals, isNativeHogFunction, isSegmentPluginHogFunction } from '../utils' +import { convertToHogFunctionFilterGlobal } from '../utils/hog-function-filtering' export class CdpApi { private hogExecutor: HogExecutorService diff --git a/plugin-server/src/cdp/api/links-api.ts b/plugin-server/src/cdp/api/links-api.ts new file mode 100644 index 0000000000000..b80f652da9591 --- /dev/null +++ b/plugin-server/src/cdp/api/links-api.ts @@ -0,0 +1,58 @@ +import express from 'ultimate-express' + +import { ModifiedRequest } from '~/api/router' + +import { HealthCheckResult, HealthCheckResultError, HealthCheckResultOk, Hub, PluginServerService } from '../../types' +import { LinksService } from '../services/links/links.service' + +export class LinksApi { + private linksService: LinksService + + constructor(private hub: Hub) { + this.linksService = new LinksService(hub) + } + + public get service(): PluginServerService { + return { + id: 'links-api', + onShutdown: async () => await this.stop(), + healthcheck: () => this.isHealthy() ?? new HealthCheckResultError('Links API is not healthy', {}), + } + } + + async start(): Promise { + // await this. + } + + async stop(): Promise { + await this.linksService.stop() + } + + isHealthy(): HealthCheckResult { + // NOTE: There isn't really anything to check for here so we are just always healthy + return new HealthCheckResultOk() + } + + router(): express.Router { + const router = express.Router() + + const asyncHandler = + (fn: (req: ModifiedRequest, res: express.Response) => Promise) => + (req: ModifiedRequest, res: express.Response, next: express.NextFunction): Promise => + fn(req, res).catch(next) + + router.get('/public/links/:id', asyncHandler(this.handleLink())) + + return router + } + + private handleLink = + () => + async (req: ModifiedRequest, res: express.Response): Promise => { + try { + await this.linksService.processLink(req, res) + } catch (error) { + return res.status(500).json({ error: 'Internal error' }) + } + } +} diff --git a/plugin-server/src/cdp/cdp-api.test.ts b/plugin-server/src/cdp/cdp-api.test.ts index b73d9d1c2c7f5..44978d20ae2cd 100644 --- a/plugin-server/src/cdp/cdp-api.test.ts +++ b/plugin-server/src/cdp/cdp-api.test.ts @@ -14,7 +14,7 @@ import { closeHub, createHub } from '../utils/db/hub' import { HOG_EXAMPLES, HOG_FILTERS_EXAMPLES, HOG_INPUTS_EXAMPLES } from './_tests/examples' import { insertHogFunction as _insertHogFunction, createHogFunction } from './_tests/fixtures' import { deleteKeysWithPrefix } from './_tests/redis' -import { CdpApi } from './cdp-api' +import { CdpApi } from './api/cdp-api' import { posthogFilterOutPlugin } from './legacy-plugins/_transformations/posthog-filter-out-plugin/template' import { createCdpRedisPool } from './redis' import { BASE_REDIS_KEY, HogWatcherState } from './services/monitoring/hog-watcher.service' diff --git a/plugin-server/src/cdp/consumers/cdp-source-webhooks.consumer.test.ts b/plugin-server/src/cdp/consumers/cdp-source-webhooks.consumer.test.ts index 16169d4dc70c3..2580d451b7ad1 100644 --- a/plugin-server/src/cdp/consumers/cdp-source-webhooks.consumer.test.ts +++ b/plugin-server/src/cdp/consumers/cdp-source-webhooks.consumer.test.ts @@ -8,7 +8,7 @@ import express from 'ultimate-express' import { setupExpressApp } from '~/api/router' import { insertHogFunction, insertHogFunctionTemplate } from '~/cdp/_tests/fixtures' -import { CdpApi } from '~/cdp/cdp-api' +import { CdpApi } from '~/cdp/api/cdp-api' import { template as pixelTemplate } from '~/cdp/templates/_sources/pixel/pixel.template' import { template as incomingWebhookTemplate } from '~/cdp/templates/_sources/webhook/incoming_webhook.template' import { HogFunctionType } from '~/cdp/types' diff --git a/plugin-server/src/cdp/services/links/links.service.ts b/plugin-server/src/cdp/services/links/links.service.ts new file mode 100644 index 0000000000000..a5fc39fcfa920 --- /dev/null +++ b/plugin-server/src/cdp/services/links/links.service.ts @@ -0,0 +1,78 @@ +import express from 'ultimate-express' + +import { ModifiedRequest } from '~/api/router' +import { PromiseScheduler } from '~/utils/promise-scheduler' + +import { Hub } from '../../../types' +import { PostgresUse } from '../../../utils/db/postgres' +import { LazyLoader } from '../../../utils/lazy-loader' +import { logger } from '../../../utils/logger' +import { HogFunctionMonitoringService } from '../monitoring/hog-function-monitoring.service' + +export type LinkType = { + id: string + team_id: number + short_link: string + redirect_url: string +} + +// TODO: Add redis based bloom filter as first step to decide whether to lookup fully or not... +export class LinksService { + private lazyLoader: LazyLoader + private promises: PromiseScheduler + private hogFunctionMonitoringService: HogFunctionMonitoringService + + constructor(private hub: Hub) { + this.promises = new PromiseScheduler() + this.lazyLoader = new LazyLoader({ + name: 'link_manager', + loader: async (ids) => await this.fetchLinks(ids), + }) + this.hogFunctionMonitoringService = new HogFunctionMonitoringService(hub) + } + + async stop() { + await this.promises.waitForAllSettled() + } + + async processLink(req: ModifiedRequest, res: express.Response): Promise { + const id = req.params.id + const domain = this.hub.SHORT_LINKS_DOMAIN + const shortLink = `${domain}/${id}` + const link = await this.lazyLoader.get(shortLink) + + if (!link) { + res.status(404).send('Not found') + return + } + + this.hogFunctionMonitoringService.queueAppMetric( + { + app_source_id: link.id, + metric_kind: 'success', + metric_name: 'succeeded', + team_id: link.team_id, + count: 1, + }, + 'link' + ) + void this.promises.schedule(this.hogFunctionMonitoringService.flush()) + + res.redirect(link.redirect_url) + } + + private async fetchLinks(ids: string[]): Promise> { + logger.debug('[LinkManager]', 'Fetching links', { ids }) + + const response = await this.hub.postgres.query( + PostgresUse.COMMON_READ, + `select id, team_id, redirect_url from posthog_link WHERE concat(short_link_domain, '/', short_code) = ANY($1)`, + [ids], + 'fetchShortLinks' + ) + return response.rows.reduce>((acc, link) => { + acc[link.short_link] = link + return acc + }, {}) + } +} diff --git a/plugin-server/src/cdp/services/messaging/email-tracking.service.test.ts b/plugin-server/src/cdp/services/messaging/email-tracking.service.test.ts index 5490faef8b3a6..22fe86145822e 100644 --- a/plugin-server/src/cdp/services/messaging/email-tracking.service.test.ts +++ b/plugin-server/src/cdp/services/messaging/email-tracking.service.test.ts @@ -9,7 +9,7 @@ import { setupExpressApp } from '~/api/router' import { FixtureHogFlowBuilder } from '~/cdp/_tests/builders/hogflow.builder' import { insertHogFunction } from '~/cdp/_tests/fixtures' import { insertHogFlow } from '~/cdp/_tests/fixtures-hogflows' -import { CdpApi } from '~/cdp/cdp-api' +import { CdpApi } from '~/cdp/api/cdp-api' import { HogFunctionType } from '~/cdp/types' import { KAFKA_APP_METRICS_2 } from '~/config/kafka-topics' import { HogFlow } from '~/schema/hogflow' diff --git a/plugin-server/src/cdp/types.ts b/plugin-server/src/cdp/types.ts index 4a733dea53a54..3333564b13460 100644 --- a/plugin-server/src/cdp/types.ts +++ b/plugin-server/src/cdp/types.ts @@ -161,7 +161,7 @@ export type HogFunctionFilterGlobals = { } } -export type MetricLogSource = 'hog_function' | 'hog_flow' +export type MetricLogSource = 'hog_function' | 'hog_flow' | 'link' export type LogEntryLevel = 'debug' | 'info' | 'warn' | 'error' diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index 2a4665033fc96..cd322db3af19d 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -324,6 +324,9 @@ export function getDefaultConfig(): PluginsServerConfig { SES_ACCESS_KEY_ID: isTestEnv() || isDevEnv() ? 'test' : '', SES_SECRET_ACCESS_KEY: isTestEnv() || isDevEnv() ? 'test' : '', SES_REGION: 'us-east-1', + + // Links + SHORT_LINKS_DOMAIN: 'phog.gg', } } diff --git a/plugin-server/src/server.ts b/plugin-server/src/server.ts index 61efcf8a461ff..c5dac57aada37 100644 --- a/plugin-server/src/server.ts +++ b/plugin-server/src/server.ts @@ -8,7 +8,8 @@ import express from 'ultimate-express' import { setupCommonRoutes, setupExpressApp } from './api/router' import { getPluginServerCapabilities } from './capabilities' -import { CdpApi } from './cdp/cdp-api' +import { CdpApi } from './cdp/api/cdp-api' +import { LinksApi } from './cdp/api/links-api' import { CdpBehaviouralEventsConsumer } from './cdp/consumers/cdp-behavioural-events.consumer' import { CdpCyclotronDelayConsumer } from './cdp/consumers/cdp-cyclotron-delay.consumer' import { CdpCyclotronWorkerHogFlow } from './cdp/consumers/cdp-cyclotron-worker-hogflow.consumer' @@ -199,6 +200,15 @@ export class PluginServer { }) } + if (capabilities.linksApi) { + serviceLoaders.push(async () => { + const api = new LinksApi(hub) + this.expressApp.use('/', api.router()) + await api.start() + return api.service + }) + } + if (capabilities.cdpCyclotronWorker) { await initPlugins() serviceLoaders.push(async () => { diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 002ba8d4e5676..847d9c76dfce6 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -76,6 +76,7 @@ export enum PluginServerMode { async_webhooks = 'async-webhooks', recordings_blob_ingestion_v2 = 'recordings-blob-ingestion-v2', recordings_blob_ingestion_v2_overflow = 'recordings-blob-ingestion-v2-overflow', + links_api = 'links-api', cdp_processed_events = 'cdp-processed-events', cdp_person_updates = 'cdp-person-updates', cdp_internal_events = 'cdp-internal-events', @@ -466,6 +467,9 @@ export interface PluginsServerConfig extends CdpConfig, IngestionConsumerConfig SES_ACCESS_KEY_ID: string SES_SECRET_ACCESS_KEY: string SES_REGION: string + + // Links + SHORT_LINKS_DOMAIN: string } export interface Hub extends PluginsServerConfig { @@ -525,6 +529,7 @@ export interface PluginServerCapabilities { processAsyncWebhooksHandlers?: boolean sessionRecordingBlobIngestionV2?: boolean sessionRecordingBlobIngestionV2Overflow?: boolean + linksApi?: boolean cdpProcessedEvents?: boolean cdpPersonUpdates?: boolean cdpInternalEvents?: boolean