Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion bin/clickhouse-logs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ CREATE OR REPLACE TABLE logs31
INDEX idx_attributes_str_values mapValues(attributes_map_str) TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_mat_body_ipv4_matches mat_body_ipv4_matches TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_body_ngram3 body TYPE ngrambf_v1(3, 25000, 2, 0) GRANULARITY 1,
INDEX idx_observed_minmax observed_timestamp TYPE minmax GRANULARITY 1,
PROJECTION projection_aggregate_counts
(
SELECT
Expand Down Expand Up @@ -180,6 +181,33 @@ mapSort(mapFilter((k, v) -> isNotNull(v), mapApply((k,v) -> (concat(k, '__float'
mapSort(mapFilter((k, v) -> isNotNull(v), mapApply((k,v) -> (concat(k, '__datetime'), parseDateTimeBestEffortOrNull(JSONExtract(v, 'String'), 6)), attributes))) as attributes_map_datetime,
mapSort(resource_attributes) as resource_attributes,
toInt32OrZero(_headers.value[indexOf(_headers.name, 'team_id')]) as team_id
FROM kafka_logs_avro;
FROM kafka_logs_avro settings min_insert_block_size_rows=0, min_insert_block_size_bytes=0;

create or replace table logs_kafka_metrics
(
`_partition` UInt32,
`_topic` String,
`max_offset` SimpleAggregateFunction(max, UInt64),
`max_observed_timestamp` SimpleAggregateFunction(max, DateTime64(9)),
`max_timestamp` SimpleAggregateFunction(max, DateTime64(9)),
`max_created_at` SimpleAggregateFunction(max, DateTime64(9)),
`max_lag` SimpleAggregateFunction(max, UInt64)
)
ENGINE = MergeTree
ORDER BY (_topic, _partition);

drop view if exists kafka_logs_avro_kafka_metrics_mv;
CREATE MATERIALIZED VIEW kafka_logs_avro_kafka_metrics_mv TO logs_kafka_metrics
AS
SELECT
_partition,
_topic,
maxSimpleState(_offset) as max_offset,
maxSimpleState(observed_timestamp) as max_observed_timestamp,
maxSimpleState(timestamp) as max_timestamp,
maxSimpleState(now()) as max_created_at,
maxSimpleState(now() - observed_timestamp) as max_lag
FROM kafka_logs_avro
group by _partition, _topic;

select 'clickhouse logs tables initialised successfully!';
Binary file modified frontend/__snapshots__/lemon-ui-icons2--shelf-p--dark.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified frontend/__snapshots__/lemon-ui-icons2--shelf-p--light.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
14 changes: 14 additions & 0 deletions frontend/src/lib/lemon-ui/LemonTable/LemonTable.scss
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
@keyframes LemonTable__row__highlight__animation {
0% {
background-color: var(--color-accent-highlight-secondary);
}

100% {
background-color: transparent;
}
}

.LemonTable {
--row-base-height: auto;
--row-horizontal-padding: 0.5rem;
Expand Down Expand Up @@ -163,6 +173,10 @@
}
}

&.LemonTable__row--status-highlight-new {
animation: LemonTable__row__highlight__animation 1.5s ease-out;
}

&:not(.LemonTable__expansion) {
> td {
// Make spacing of buttons tighter in tables by adding negative vertical margin
Expand Down
5 changes: 4 additions & 1 deletion frontend/src/lib/lemon-ui/LemonTable/LemonTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ export interface LemonTableProps<T extends Record<string, any>> {
/** Color to mark each row with. */
rowRibbonColor?: string | ((record: T, rowIndex: number) => string | null | undefined)
/** Status of each row. Defaults no status. */
rowStatus?: 'highlighted' | ((record: T, rowIndex: number) => 'highlighted' | null)
rowStatus?:
| 'highlighted'
| 'highlight-new'
| ((record: T, rowIndex: number) => 'highlighted' | 'highlight-new' | null)
/** Function that for each row determines what props should its `tr` element have based on the row's record. */
onRow?: (record: T, index: number) => Omit<HTMLProps<HTMLTableRowElement>, 'key'>
/** How tall should rows be. The default value is `"middle"`. */
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/lib/lemon-ui/LemonTable/TableRow.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export interface TableRowProps<T extends Record<string, any>> {
rowKeyDetermined: string | number
rowClassNameDetermined: string | null | undefined
rowRibbonColorDetermined: string | null | undefined
rowStatusDetermined: 'highlighted' | null | undefined
rowStatusDetermined: 'highlighted' | 'highlight-new' | null | undefined
columnGroups: LemonTableColumnGroup<T>[]
onRow: ((record: T, index: number) => Omit<HTMLProps<HTMLTableRowElement>, 'key'>) | undefined
expandable: ExpandableConfig<T> | undefined
Expand Down
13 changes: 13 additions & 0 deletions frontend/src/lib/lemon-ui/icons/icons.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,19 @@ export function IconPlayCircle(props: LemonIconProps): JSX.Element {
)
}

export function IconPauseCircle(props: LemonIconProps): JSX.Element {
return (
<LemonIconBase {...props}>
<path
d="M12 2C6.48 2 2 6.48 2 12C2 17.52 6.48 22 12 22C17.52 22 22 17.52 22 12C22 6.48 17.52 2 12 2ZM12 20C7.59 20 4 16.41 4 12C4 7.59 7.59 4 12 4C16.41 4 20 7.59 20 12C20 16.41 16.41 20 12 20Z"
fill="currentColor"
/>
<rect x="9.5" y="8" width="2" height="8" rx="0.75" fill="currentColor" />
<rect x="12.5" y="8" width="2" height="8" rx="0.75" fill="currentColor" />
</LemonIconBase>
)
}

export function IconSkipBackward(props: LemonIconProps): JSX.Element {
return (
<LemonIconBase {...props}>
Expand Down
10 changes: 10 additions & 0 deletions frontend/src/queries/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -17517,6 +17517,13 @@
"level": {
"$ref": "#/definitions/LogSeverityLevel"
},
"live_logs_checkpoint": {
"format": "date-time",
"type": "string"
},
"new": {
"type": "boolean"
},
"observed_timestamp": {
"format": "date-time",
"type": "string"
Expand Down Expand Up @@ -17602,6 +17609,9 @@
"limit": {
"$ref": "#/definitions/integer"
},
"liveLogsCheckpoint": {
"type": "string"
},
"modifiers": {
"$ref": "#/definitions/HogQLQueryModifiers",
"description": "Modifiers used when performing the query"
Expand Down
4 changes: 4 additions & 0 deletions frontend/src/queries/schema/schema-general.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2574,6 +2574,9 @@ export interface LogMessage {
resource_attributes: any
instrumentation_scope: string
event_name: string
/** @format date-time */
live_logs_checkpoint?: string
new?: boolean
}

export interface LogsQuery extends DataNode<LogsQueryResponse> {
Expand All @@ -2586,6 +2589,7 @@ export interface LogsQuery extends DataNode<LogsQueryResponse> {
severityLevels: LogSeverityLevel[]
filterGroup: PropertyGroupFilter
serviceNames: string[]
liveLogsCheckpoint?: string
}

export interface LogsQueryResponse extends AnalyticsQueryResponseBase {
Expand Down
3 changes: 2 additions & 1 deletion posthog/hogql/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
LogEntriesTable,
ReplayConsoleLogsLogEntriesTable,
)
from posthog.hogql.database.schema.logs import LogsTable
from posthog.hogql.database.schema.logs import LogsKafkaMetricsTable, LogsTable
from posthog.hogql.database.schema.numbers import NumbersTable
from posthog.hogql.database.schema.person_distinct_id_overrides import (
PersonDistinctIdOverridesTable,
Expand Down Expand Up @@ -191,6 +191,7 @@ class Database(BaseModel):
"document_embeddings": TableNode(name="document_embeddings", table=DocumentEmbeddingsTable()),
"pg_embeddings": TableNode(name="pg_embeddings", table=PgEmbeddingsTable()),
"logs": TableNode(name="logs", table=LogsTable()),
"logs_kafka_metrics": TableNode(name="logs_kafka_metrics", table=LogsKafkaMetricsTable()),
"numbers": TableNode(name="numbers", table=NumbersTable()),
"system": SystemTables(), # This is a `TableNode` already, refer to implementation
# Web analytics pre-aggregated tables (internal use only)
Expand Down
21 changes: 21 additions & 0 deletions posthog/hogql/database/schema/logs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from posthog.hogql.database.models import (
DANGEROUS_NoTeamIdCheckTable,
DateTimeDatabaseField,
FieldOrTable,
IntegerDatabaseField,
Expand Down Expand Up @@ -38,3 +39,23 @@ def to_printed_clickhouse(self, context):

def to_printed_hogql(self):
return "logs"


class LogsKafkaMetricsTable(DANGEROUS_NoTeamIdCheckTable):
"""
Table stores meta information about kafka consumption _not_ scoped to teams
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice


This is so we can find out the overall lag per partition and filter live logs accordingly
"""

fields: dict[str, FieldOrTable] = {
"_partition": IntegerDatabaseField(name="_partition", nullable=False),
"_topic": StringDatabaseField(name="_topic", nullable=False),
"max_observed_timestamp": DateTimeDatabaseField(name="max_observed_timestamp", nullable=False),
}

def to_printed_clickhouse(self, context):
return "logs_kafka_metrics"

def to_printed_hogql(self):
return "logs_kafka_metrics"
3 changes: 3 additions & 0 deletions posthog/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -4608,6 +4608,8 @@ class LogMessage(BaseModel):
event_name: str
instrumentation_scope: str
level: LogSeverityLevel
live_logs_checkpoint: AwareDatetime | None = None
new: bool | None = None
observed_timestamp: AwareDatetime
resource_attributes: Any
severity_number: float
Expand Down Expand Up @@ -14142,6 +14144,7 @@ class LogsQuery(BaseModel):
filterGroup: PropertyGroupFilter
kind: Literal["LogsQuery"] = "LogsQuery"
limit: int | None = None
liveLogsCheckpoint: str | None = None
modifiers: HogQLQueryModifiers | None = Field(default=None, description="Modifiers used when performing the query")
offset: int | None = None
orderBy: OrderBy3 | None = None
Expand Down
10 changes: 10 additions & 0 deletions products/logs/backend/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def query(self, request: Request, *args, **kwargs) -> Response:
if query_data is None:
return Response({"error": "No query provided"}, status=status.HTTP_400_BAD_REQUEST)

live_logs_checkpoint = query_data.get("liveLogsCheckpoint", None)
date_range = self.get_model(query_data.get("dateRange"), DateRange)
requested_limit = min(query_data.get("limit", 1000), 2000)
logs_query_params = {
Expand All @@ -37,6 +38,8 @@ def query(self, request: Request, *args, **kwargs) -> Response:
"filterGroup": query_data.get("filterGroup", None),
"limit": requested_limit + 1, # Fetch limit plus 1 to see if theres another page
}
if live_logs_checkpoint:
logs_query_params["liveLogsCheckpoint"] = live_logs_checkpoint
query = LogsQuery(**logs_query_params)

def results_generator(query: LogsQuery, logs_query_params: dict):
Expand Down Expand Up @@ -106,6 +109,13 @@ def runner_slice(

return LogsQueryRunner(slice_query, self.team), LogsQueryRunner(remainder_query, self.team)

# if we're live tailing don't do the runner slicing optimisations
# we're always only looking at the most recent 1 or 2 minutes of observed data
# which should cut things down more than the slicing anyway
if live_logs_checkpoint:
response = runner.run(ExecutionMode.CALCULATE_BLOCKING_ALWAYS)
yield from response.results
return
# if we're searching more than 20 minutes, first fetch the first 3 minutes of logs and see if that hits the limit
if date_range_length > dt.timedelta(minutes=20):
recent_runner, runner = runner_slice(runner, dt.timedelta(minutes=3), query.orderBy)
Expand Down
13 changes: 11 additions & 2 deletions products/logs/backend/logs_query_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ def _calculate(self) -> LogsQueryResponse:
filters=HogQLFilters(dateRange=self.query.dateRange),
settings=self.settings,
)

results = []
for result in response.results:
results.append(
Expand All @@ -118,6 +117,7 @@ def _calculate(self) -> LogsQueryResponse:
"resource_attributes": result[10],
"instrumentation_scope": result[11],
"event_name": result[12],
"live_logs_checkpoint": result[13],
}
)

Expand Down Expand Up @@ -161,7 +161,8 @@ def to_query(self) -> ast.SelectQuery:
severity_text as level,
resource_attributes,
instrumentation_scope,
event_name
event_name,
(select min(max_observed_timestamp) from logs_kafka_metrics) as live_logs_checkpoint
FROM logs where (_part_starting_offset+_part_offset) in ({query})
""",
placeholders={"query": query},
Expand Down Expand Up @@ -225,6 +226,14 @@ def where(self):
if self.query.filterGroup:
exprs.append(property_to_expr(self.query.filterGroup, team=self.team))

if self.query.liveLogsCheckpoint:
exprs.append(
parse_expr(
"observed_timestamp >= {liveLogsCheckpoint}",
placeholders={"liveLogsCheckpoint": ast.Constant(value=self.query.liveLogsCheckpoint)},
)
)

return ast.And(exprs=exprs)

@cached_property
Expand Down
23 changes: 18 additions & 5 deletions products/logs/frontend/LogsScene.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { Sparkline } from 'lib/components/Sparkline'
import { TZLabel, TZLabelProps } from 'lib/components/TZLabel'
import { ListHog } from 'lib/components/hedgehogs'
import { LemonField } from 'lib/lemon-ui/LemonField'
import { IconPauseCircle, IconPlayCircle } from 'lib/lemon-ui/icons'
import { humanFriendlyNumber } from 'lib/utils'
import { cn } from 'lib/utils/css-classes'
import { Scene, SceneExport } from 'scenes/sceneTypes'
Expand Down Expand Up @@ -233,7 +234,9 @@ function LogsTable({
size="small"
embedded
rowKey="uuid"
rowStatus={(record) => (record.uuid === highlightedLogId ? 'highlighted' : null)}
rowStatus={(record) =>
record.uuid === highlightedLogId ? 'highlighted' : record.new ? 'highlight-new' : null
}
rowClassName={(record) =>
isPinned(record.uuid) ? cn('bg-primary-highlight', showPinnedWithOpacity && 'opacity-50') : 'group'
}
Expand Down Expand Up @@ -412,8 +415,8 @@ const LogTag = ({ level }: { level: LogMessage['severity_text'] }): JSX.Element
}

const Filters = (): JSX.Element => {
const { logsLoading } = useValues(logsLogic)
const { runQuery, zoomDateRange } = useActions(logsLogic)
const { logsLoading, liveTailRunning, liveTailDisabledReason } = useValues(logsLogic)
const { runQuery, zoomDateRange, setLiveTailRunning } = useActions(logsLogic)

return (
<div className="flex flex-col gap-y-1.5">
Expand Down Expand Up @@ -441,9 +444,19 @@ const Filters = (): JSX.Element => {
icon={<IconRefresh />}
type="secondary"
onClick={() => runQuery()}
loading={logsLoading}
loading={logsLoading || liveTailRunning}
disabledReason={liveTailRunning ? 'Disable live tail to manually refresh' : undefined}
>
{liveTailRunning ? 'Tailing...' : logsLoading ? 'Loading...' : 'Search'}
</LemonButton>
<LemonButton
size="small"
type={liveTailRunning ? 'primary' : 'secondary'}
icon={liveTailRunning ? <IconPauseCircle /> : <IconPlayCircle />}
onClick={() => setLiveTailRunning(!liveTailRunning)}
disabledReason={liveTailRunning ? undefined : liveTailDisabledReason}
>
{logsLoading ? 'Loading...' : 'Search'}
Live tail
</LemonButton>
</div>
</div>
Expand Down
Loading
Loading