Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docker-compose.base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ services:
path /public/webhooks/*
path /public/m/
path /public/m/*
path /public/links/
path /public/links/*
}

handle @capture {
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.dev-minimal.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ services:
path /public/webhooks/*
path /public/m/
path /public/m/*
path /public/links/
path /public/links/*
}

handle @capture {
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ services:
path /public/webhooks/*
path /public/m
path /public/m/*
path /public/links/
path /public/links/*
}

handle @capture {
Expand Down
7 changes: 7 additions & 0 deletions plugin-server/src/capabilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
cdpCyclotronWorkerDelay: true,
cdpBehaviouralEvents: true,
cdpApi: true,
linksApi: true,
}

case PluginServerMode.local_cdp:
return {
ingestionV2: true,
linksApi: true,
cdpProcessedEvents: true,
cdpPersonUpdates: true,
cdpInternalEvents: true,
Expand Down Expand Up @@ -95,5 +97,10 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
// NOTE: This is temporary until we have removed plugins
appManagementSingleton: true,
}

case PluginServerMode.links_api:
return {
linksApi: true,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,34 @@ import { PluginEvent } from '@posthog/plugin-scaffold'

import { ModifiedRequest } from '~/api/router'

import { HealthCheckResult, HealthCheckResultError, HealthCheckResultOk, Hub, PluginServerService } from '../types'
import { logger } from '../utils/logger'
import { UUID, UUIDT, delay } from '../utils/utils'
import { HealthCheckResult, HealthCheckResultError, HealthCheckResultOk, Hub, PluginServerService } from '../../types'
import { logger } from '../../utils/logger'
import { UUID, UUIDT, delay } from '../../utils/utils'
import {
CdpSourceWebhooksConsumer,
HogFunctionWebhookResult,
SourceWebhookError,
} from './consumers/cdp-source-webhooks.consumer'
import { HogTransformerService } from './hog-transformations/hog-transformer.service'
import { createCdpRedisPool } from './redis'
import { HogExecutorExecuteAsyncOptions, HogExecutorService, MAX_ASYNC_STEPS } from './services/hog-executor.service'
import { HogFlowExecutorService, createHogFlowInvocation } from './services/hogflows/hogflow-executor.service'
import { HogFlowFunctionsService } from './services/hogflows/hogflow-functions.service'
import { HogFlowManagerService } from './services/hogflows/hogflow-manager.service'
import { HogFunctionManagerService } from './services/managers/hog-function-manager.service'
import { HogFunctionTemplateManagerService } from './services/managers/hog-function-template-manager.service'
import { RecipientsManagerService } from './services/managers/recipients-manager.service'
import { EmailTrackingService } from './services/messaging/email-tracking.service'
import { RecipientPreferencesService } from './services/messaging/recipient-preferences.service'
import { RecipientTokensService } from './services/messaging/recipient-tokens.service'
import { HogFunctionMonitoringService } from './services/monitoring/hog-function-monitoring.service'
import { HogWatcherService, HogWatcherState } from './services/monitoring/hog-watcher.service'
import { NativeDestinationExecutorService } from './services/native-destination-executor.service'
import { SegmentDestinationExecutorService } from './services/segment-destination-executor.service'
import { HOG_FUNCTION_TEMPLATES } from './templates'
import { HogFunctionInvocationGlobals, HogFunctionType, MinimalLogEntry } from './types'
import { convertToHogFunctionInvocationGlobals, isNativeHogFunction, isSegmentPluginHogFunction } from './utils'
import { convertToHogFunctionFilterGlobal } from './utils/hog-function-filtering'
} from '../consumers/cdp-source-webhooks.consumer'
import { HogTransformerService } from '../hog-transformations/hog-transformer.service'
import { createCdpRedisPool } from '../redis'
import { HogExecutorExecuteAsyncOptions, HogExecutorService, MAX_ASYNC_STEPS } from '../services/hog-executor.service'
import { HogFlowExecutorService, createHogFlowInvocation } from '../services/hogflows/hogflow-executor.service'
import { HogFlowFunctionsService } from '../services/hogflows/hogflow-functions.service'
import { HogFlowManagerService } from '../services/hogflows/hogflow-manager.service'
import { HogFunctionManagerService } from '../services/managers/hog-function-manager.service'
import { HogFunctionTemplateManagerService } from '../services/managers/hog-function-template-manager.service'
import { RecipientsManagerService } from '../services/managers/recipients-manager.service'
import { EmailTrackingService } from '../services/messaging/email-tracking.service'
import { RecipientPreferencesService } from '../services/messaging/recipient-preferences.service'
import { RecipientTokensService } from '../services/messaging/recipient-tokens.service'
import { HogFunctionMonitoringService } from '../services/monitoring/hog-function-monitoring.service'
import { HogWatcherService, HogWatcherState } from '../services/monitoring/hog-watcher.service'
import { NativeDestinationExecutorService } from '../services/native-destination-executor.service'
import { SegmentDestinationExecutorService } from '../services/segment-destination-executor.service'
import { HOG_FUNCTION_TEMPLATES } from '../templates'
import { HogFunctionInvocationGlobals, HogFunctionType, MinimalLogEntry } from '../types'
import { convertToHogFunctionInvocationGlobals, isNativeHogFunction, isSegmentPluginHogFunction } from '../utils'
import { convertToHogFunctionFilterGlobal } from '../utils/hog-function-filtering'

export class CdpApi {
private hogExecutor: HogExecutorService
Expand Down
58 changes: 58 additions & 0 deletions plugin-server/src/cdp/api/links-api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import express from 'ultimate-express'

import { ModifiedRequest } from '~/api/router'

import { HealthCheckResult, HealthCheckResultError, HealthCheckResultOk, Hub, PluginServerService } from '../../types'
import { LinksService } from '../services/links/links.service'

export class LinksApi {
private linksService: LinksService

constructor(private hub: Hub) {
this.linksService = new LinksService(hub)
}

public get service(): PluginServerService {
return {
id: 'links-api',
onShutdown: async () => await this.stop(),
healthcheck: () => this.isHealthy() ?? new HealthCheckResultError('Links API is not healthy', {}),
}
}

async start(): Promise<void> {
// await this.
}

async stop(): Promise<void> {
await this.linksService.stop()
}

isHealthy(): HealthCheckResult {
// NOTE: There isn't really anything to check for here so we are just always healthy
return new HealthCheckResultOk()
}

router(): express.Router {
const router = express.Router()

const asyncHandler =
(fn: (req: ModifiedRequest, res: express.Response) => Promise<void>) =>
(req: ModifiedRequest, res: express.Response, next: express.NextFunction): Promise<void> =>
fn(req, res).catch(next)

router.get('/public/links/:id', asyncHandler(this.handleLink()))

return router
}

private handleLink =
() =>
async (req: ModifiedRequest, res: express.Response): Promise<any> => {
try {
await this.linksService.processLink(req, res)
} catch (error) {
return res.status(500).json({ error: 'Internal error' })
}
}
}
2 changes: 1 addition & 1 deletion plugin-server/src/cdp/cdp-api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { closeHub, createHub } from '../utils/db/hub'
import { HOG_EXAMPLES, HOG_FILTERS_EXAMPLES, HOG_INPUTS_EXAMPLES } from './_tests/examples'
import { insertHogFunction as _insertHogFunction, createHogFunction } from './_tests/fixtures'
import { deleteKeysWithPrefix } from './_tests/redis'
import { CdpApi } from './cdp-api'
import { CdpApi } from './api/cdp-api'
import { posthogFilterOutPlugin } from './legacy-plugins/_transformations/posthog-filter-out-plugin/template'
import { createCdpRedisPool } from './redis'
import { BASE_REDIS_KEY, HogWatcherState } from './services/monitoring/hog-watcher.service'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import express from 'ultimate-express'

import { setupExpressApp } from '~/api/router'
import { insertHogFunction, insertHogFunctionTemplate } from '~/cdp/_tests/fixtures'
import { CdpApi } from '~/cdp/cdp-api'
import { CdpApi } from '~/cdp/api/cdp-api'
import { template as pixelTemplate } from '~/cdp/templates/_sources/pixel/pixel.template'
import { template as incomingWebhookTemplate } from '~/cdp/templates/_sources/webhook/incoming_webhook.template'
import { HogFunctionType } from '~/cdp/types'
Expand Down
78 changes: 78 additions & 0 deletions plugin-server/src/cdp/services/links/links.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import express from 'ultimate-express'

import { ModifiedRequest } from '~/api/router'
import { PromiseScheduler } from '~/utils/promise-scheduler'

import { Hub } from '../../../types'
import { PostgresUse } from '../../../utils/db/postgres'
import { LazyLoader } from '../../../utils/lazy-loader'
import { logger } from '../../../utils/logger'
import { HogFunctionMonitoringService } from '../monitoring/hog-function-monitoring.service'

export type LinkType = {
id: string
team_id: number
short_link: string
redirect_url: string
}

// TODO: Add redis based bloom filter as first step to decide whether to lookup fully or not...
export class LinksService {
private lazyLoader: LazyLoader<LinkType>
private promises: PromiseScheduler
private hogFunctionMonitoringService: HogFunctionMonitoringService

constructor(private hub: Hub) {
this.promises = new PromiseScheduler()
this.lazyLoader = new LazyLoader({
name: 'link_manager',
loader: async (ids) => await this.fetchLinks(ids),
})
this.hogFunctionMonitoringService = new HogFunctionMonitoringService(hub)
}

async stop() {
await this.promises.waitForAllSettled()
}

async processLink(req: ModifiedRequest, res: express.Response): Promise<void> {
const id = req.params.id
const domain = this.hub.SHORT_LINKS_DOMAIN
const shortLink = `${domain}/${id}`
const link = await this.lazyLoader.get(shortLink)

if (!link) {
res.status(404).send('Not found')
return
}

this.hogFunctionMonitoringService.queueAppMetric(
{
app_source_id: link.id,
metric_kind: 'success',
metric_name: 'succeeded',
team_id: link.team_id,
count: 1,
},
'link'
)
void this.promises.schedule(this.hogFunctionMonitoringService.flush())

res.redirect(link.redirect_url)
}

private async fetchLinks(ids: string[]): Promise<Record<string, LinkType | undefined>> {
logger.debug('[LinkManager]', 'Fetching links', { ids })

const response = await this.hub.postgres.query<LinkType>(
PostgresUse.COMMON_READ,
`select id, team_id, redirect_url from posthog_link WHERE concat(short_link_domain, '/', short_code) = ANY($1)`,
[ids],
'fetchShortLinks'
)
return response.rows.reduce<Record<string, LinkType | undefined>>((acc, link) => {
acc[link.short_link] = link
return acc
}, {})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { setupExpressApp } from '~/api/router'
import { FixtureHogFlowBuilder } from '~/cdp/_tests/builders/hogflow.builder'
import { insertHogFunction } from '~/cdp/_tests/fixtures'
import { insertHogFlow } from '~/cdp/_tests/fixtures-hogflows'
import { CdpApi } from '~/cdp/cdp-api'
import { CdpApi } from '~/cdp/api/cdp-api'
import { HogFunctionType } from '~/cdp/types'
import { KAFKA_APP_METRICS_2 } from '~/config/kafka-topics'
import { HogFlow } from '~/schema/hogflow'
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/src/cdp/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ export type HogFunctionFilterGlobals = {
}
}

export type MetricLogSource = 'hog_function' | 'hog_flow'
export type MetricLogSource = 'hog_function' | 'hog_flow' | 'link'

export type LogEntryLevel = 'debug' | 'info' | 'warn' | 'error'

Expand Down
3 changes: 3 additions & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,9 @@ export function getDefaultConfig(): PluginsServerConfig {
SES_ACCESS_KEY_ID: isTestEnv() || isDevEnv() ? 'test' : '',
SES_SECRET_ACCESS_KEY: isTestEnv() || isDevEnv() ? 'test' : '',
SES_REGION: 'us-east-1',

// Links
SHORT_LINKS_DOMAIN: 'phog.gg',
}
}

Expand Down
12 changes: 11 additions & 1 deletion plugin-server/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import express from 'ultimate-express'

import { setupCommonRoutes, setupExpressApp } from './api/router'
import { getPluginServerCapabilities } from './capabilities'
import { CdpApi } from './cdp/cdp-api'
import { CdpApi } from './cdp/api/cdp-api'
import { LinksApi } from './cdp/api/links-api'
import { CdpBehaviouralEventsConsumer } from './cdp/consumers/cdp-behavioural-events.consumer'
import { CdpCyclotronDelayConsumer } from './cdp/consumers/cdp-cyclotron-delay.consumer'
import { CdpCyclotronWorkerHogFlow } from './cdp/consumers/cdp-cyclotron-worker-hogflow.consumer'
Expand Down Expand Up @@ -199,6 +200,15 @@ export class PluginServer {
})
}

if (capabilities.linksApi) {
serviceLoaders.push(async () => {
const api = new LinksApi(hub)
this.expressApp.use('/', api.router())
await api.start()
return api.service
})
}

if (capabilities.cdpCyclotronWorker) {
await initPlugins()
serviceLoaders.push(async () => {
Expand Down
5 changes: 5 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ export enum PluginServerMode {
async_webhooks = 'async-webhooks',
recordings_blob_ingestion_v2 = 'recordings-blob-ingestion-v2',
recordings_blob_ingestion_v2_overflow = 'recordings-blob-ingestion-v2-overflow',
links_api = 'links-api',
cdp_processed_events = 'cdp-processed-events',
cdp_person_updates = 'cdp-person-updates',
cdp_internal_events = 'cdp-internal-events',
Expand Down Expand Up @@ -466,6 +467,9 @@ export interface PluginsServerConfig extends CdpConfig, IngestionConsumerConfig
SES_ACCESS_KEY_ID: string
SES_SECRET_ACCESS_KEY: string
SES_REGION: string

// Links
SHORT_LINKS_DOMAIN: string
}

export interface Hub extends PluginsServerConfig {
Expand Down Expand Up @@ -525,6 +529,7 @@ export interface PluginServerCapabilities {
processAsyncWebhooksHandlers?: boolean
sessionRecordingBlobIngestionV2?: boolean
sessionRecordingBlobIngestionV2Overflow?: boolean
linksApi?: boolean
cdpProcessedEvents?: boolean
cdpPersonUpdates?: boolean
cdpInternalEvents?: boolean
Expand Down
Loading