Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions scripts/scaffold/kafka-connect/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ USER root
RUN yum install -y jq findutils unzip

RUN confluent-hub install snowflakeinc/snowflake-kafka-connector:2.5.0 --no-prompt
RUN confluent-hub install confluentinc/kafka-connect-jdbc:10.7.4 --no-prompt

COPY tmp/kafka-connect-http/ /usr/share/confluent-hub-components/kafka-connect-http/
COPY tmp/custom-plugins/tinybird-smt-1.0.1.jar /usr/share/java/tinybird-smt-1.0.1.jar


VOLUME /storage
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
DESCRIPTION >
CDP dashboard metrics per segment with hierarchy aggregation.
Clean approach assuming segments table is properly synced with complete hierarchy data.

NODE subprojectMetrics
SQL >
SELECT
segments.id AS segmentId,
'subproject' AS segmentType,
segments.parentId,
segments.grandparentId,
segments.slug AS segmentSlug,
segments.parentSlug,
segments.grandparentSlug,
segments.name AS segmentName,
segments.parentName,
segments.grandparentName,
count() AS activitiesTotal,
countIf(ar.createdAt >= now() - INTERVAL 30 DAY) AS activitiesLast30Days,
countDistinct(ar.memberId) AS membersTotal,
countDistinctIf(ar.memberId, mem.joinedAt >= now() - INTERVAL 30 DAY) AS membersLast30Days,
countDistinct(ar.organizationId) AS organizationsTotal,
countDistinctIf(ar.organizationId, org.createdAt >= now() - INTERVAL 30 DAY) AS organizationsLast30Days
FROM segments FINAL
LEFT JOIN activityRelations_deduplicated_ds AS ar ON segments.id = ar.segmentId
LEFT JOIN (SELECT * FROM members FINAL) AS mem ON ar.memberId = mem.id
LEFT JOIN (SELECT * FROM organizations FINAL) AS org ON ar.organizationId = org.id
WHERE segments.parentSlug IS NOT NULL AND segments.grandparentSlug IS NOT NULL
AND segments.parentId IS NOT NULL AND segments.grandparentId IS NOT NULL
AND segments.parentId != '' AND segments.grandparentId != ''
GROUP BY segments.id, segments.parentId, segments.grandparentId, segments.slug, segments.parentSlug, segments.grandparentSlug,
segments.name, segments.parentName, segments.grandparentName

NODE projectMetrics
SQL >
SELECT
s.parentId AS segmentId,
'project' AS segmentType,
s.parentId,
s.grandparentId,
s.parentSlug AS segmentSlug,
s.parentSlug,
s.grandparentSlug,
s.parentName AS segmentName,
s.parentName,
s.grandparentName,
count() AS activitiesTotal,
countIf(ar.createdAt >= now() - INTERVAL 30 DAY) AS activitiesLast30Days,
countDistinct(ar.memberId) AS membersTotal,
countDistinctIf(ar.memberId, mem.joinedAt >= now() - INTERVAL 30 DAY) AS membersLast30Days,
countDistinct(ar.organizationId) AS organizationsTotal,
countDistinctIf(ar.organizationId, org.createdAt >= now() - INTERVAL 30 DAY) AS organizationsLast30Days
FROM (SELECT * FROM segments FINAL) AS s
LEFT JOIN activityRelations_deduplicated_ds AS ar ON s.id = ar.segmentId
LEFT JOIN (SELECT * FROM members FINAL) AS mem ON ar.memberId = mem.id
LEFT JOIN (SELECT * FROM organizations FINAL) AS org ON ar.organizationId = org.id
WHERE s.parentSlug IS NOT NULL AND s.grandparentSlug IS NOT NULL
AND s.parentId IS NOT NULL AND s.grandparentId IS NOT NULL
AND s.parentId != '' AND s.grandparentId != ''
GROUP BY s.parentId, s.grandparentId, s.parentSlug, s.grandparentSlug, s.parentName, s.grandparentName

NODE projectGroupMetrics
SQL >
SELECT
s.grandparentId AS segmentId,
'projectGroup' AS segmentType,
s.grandparentId AS parentId,
s.grandparentId,
s.grandparentSlug AS segmentSlug,
s.grandparentSlug AS parentSlug,
s.grandparentSlug AS grandparentSlug,
s.grandparentName AS segmentName,
s.grandparentName AS parentName,
s.grandparentName AS grandparentName,
count() AS activitiesTotal,
countIf(ar.createdAt >= now() - INTERVAL 30 DAY) AS activitiesLast30Days,
countDistinct(ar.memberId) AS membersTotal,
countDistinctIf(ar.memberId, mem.joinedAt >= now() - INTERVAL 30 DAY) AS membersLast30Days,
countDistinct(ar.organizationId) AS organizationsTotal,
countDistinctIf(ar.organizationId, org.createdAt >= now() - INTERVAL 30 DAY) AS organizationsLast30Days
FROM (SELECT * FROM segments FINAL) AS s
LEFT JOIN activityRelations_deduplicated_ds AS ar ON s.id = ar.segmentId
LEFT JOIN (SELECT * FROM members FINAL) AS mem ON ar.memberId = mem.id
LEFT JOIN (SELECT * FROM organizations FINAL) AS org ON ar.organizationId = org.id
WHERE s.parentSlug IS NOT NULL AND s.grandparentSlug IS NOT NULL
AND s.parentId IS NOT NULL AND s.grandparentId IS NOT NULL
AND s.parentId != '' AND s.grandparentId != ''
GROUP BY s.grandparentId, s.grandparentSlug, s.grandparentName

NODE cdpDashboardMetricsPerSegment
SQL >
SELECT * FROM subprojectMetrics
UNION ALL
SELECT * FROM projectMetrics
UNION ALL
SELECT * FROM projectGroupMetrics

TYPE SINK
EXPORT_SERVICE kafka
EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming-staging
EXPORT_SCHEDULE 0 9 * * *
EXPORT_FORMAT json
EXPORT_STRATEGY @new
EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_per_segment_sink
46 changes: 46 additions & 0 deletions services/libs/tinybird/pipes/cdp_dashboard_metrics_total_sink.pipe
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
DESCRIPTION >
Global metrics for activities, organizations, and members used for CDP dashboard. We. are referring to the total witouth filtering by any segment

NODE activityRelationsMetricsTotal
SQL >
SELECT
count() AS activitiesTotal,
countIf(createdAt >= now() - INTERVAL 30 DAY) AS activitiesLast30Days
FROM activityRelations_deduplicated_ds

NODE organizationsMetricsTotal
SQL >
SELECT
count() AS organizationsTotal,
countIf(createdAt >= now() - INTERVAL 30 DAY) AS organizationsLast30Days
FROM organizations FINAL

NODE membersMetricsTotal
SQL >
SELECT count() AS membersTotal, countIf(joinedAt >= now() - INTERVAL 30 DAY) AS membersLast30Days
FROM members FINAL

NODE mergeResults
SQL >
SELECT
-- activity
(SELECT activitiesTotal FROM activityRelationsMetricsTotal) AS activitiesTotal,
(SELECT activitiesLast30Days FROM activityRelationsMetricsTotal) AS activitiesLast30Days,
-- organizations
(SELECT organizationsTotal FROM organizationsMetricsTotal) AS organizationsTotal,
(SELECT organizationsLast30Days FROM organizationsMetricsTotal) AS organizationsLast30Days,
-- members
(SELECT membersTotal FROM membersMetricsTotal) AS membersTotal,
(SELECT membersLast30Days FROM membersMetricsTotal) AS membersLast30Days

NODE cdpDashboardFullMetricsTotal
SQL >
SELECT * FROM mergeResults

TYPE SINK
EXPORT_SERVICE kafka
EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming-staging
EXPORT_SCHEDULE 0 9 * * *
EXPORT_FORMAT json
EXPORT_STRATEGY @new
EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_total_sink
Loading