-
Notifications
You must be signed in to change notification settings - Fork 38
feat: Add redirected actor logs #769
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 14 commits
ab71414
d328ada
a8ae55f
4f2146d
cc7aba9
50b2732
f25935c
4aa0125
c410190
d9a900a
1c293da
0456c20
dc88c2b
ec0d60e
fc7bb6c
bbeeee4
00bf7ea
5fd0fc7
5d4c1f4
51dd246
71d7fb5
0b2e6e6
0440e39
0b84a32
bca27b2
8eb6e71
dfa5dcd
3e48c60
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ import ow from 'ow'; | |
|
|
||
| import type { RUN_GENERAL_ACCESS } from '@apify/consts'; | ||
| import { ACT_JOB_STATUSES, ACTOR_PERMISSION_LEVEL, META_ORIGINS } from '@apify/consts'; | ||
| import { Log } from '@apify/log'; | ||
|
|
||
| import type { ApiClientSubResourceOptions } from '../base/api_client'; | ||
| import { ResourceClient } from '../base/resource_client'; | ||
|
|
@@ -13,6 +14,7 @@ import { ActorVersionCollectionClient } from './actor_version_collection'; | |
| import type { Build, BuildClientGetOptions } from './build'; | ||
| import { BuildClient } from './build'; | ||
| import { BuildCollectionClient } from './build_collection'; | ||
| import type { StreamedLog } from './log'; | ||
| import { RunClient } from './run'; | ||
| import { RunCollectionClient } from './run_collection'; | ||
| import type { WebhookUpdateData } from './webhook'; | ||
|
|
@@ -139,18 +141,37 @@ export class ActorClient extends ResourceClient { | |
| webhooks: ow.optional.array.ofType(ow.object), | ||
| maxItems: ow.optional.number.not.negative, | ||
| maxTotalChargeUsd: ow.optional.number.not.negative, | ||
| log: ow.optional.any(ow.null, ow.object.instanceOf(Log)), | ||
| restartOnError: ow.optional.boolean, | ||
| forcePermissionLevel: ow.optional.string.oneOf(Object.values(ACTOR_PERMISSION_LEVEL)), | ||
| }), | ||
| ); | ||
|
|
||
| const { waitSecs, ...startOptions } = options; | ||
| const { waitSecs, log = 'default', ...startOptions } = options; | ||
| const { id } = await this.start(input, startOptions); | ||
|
|
||
| // Calling root client because we need access to top level API. | ||
| // Creating a new instance of RunClient here would only allow | ||
| // setting it up as a nested route under actor API. | ||
| return this.apifyClient.run(id).waitForFinish({ waitSecs }); | ||
| const newRunClient = this.apifyClient.run(id); | ||
|
|
||
| if (!log) { | ||
| return newRunClient.waitForFinish({ waitSecs }); | ||
| } | ||
| let streamedLog: StreamedLog; | ||
|
|
||
| if (log === 'default') { | ||
| streamedLog = await newRunClient.getStreamedLog(); | ||
| } else { | ||
| streamedLog = await newRunClient.getStreamedLog({ toLog: log }); | ||
| } | ||
|
|
||
| try { | ||
| await streamedLog.start(); | ||
| return this.apifyClient.run(id).waitForFinish({ waitSecs }); | ||
| } finally { | ||
| await streamedLog.stop(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -426,6 +447,7 @@ export interface ActorStartOptions { | |
|
|
||
| export interface ActorCallOptions extends Omit<ActorStartOptions, 'waitForFinish'> { | ||
| waitSecs?: number; | ||
| log?: Log | null; | ||
|
||
| } | ||
|
|
||
| export interface ActorRunListItem { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,11 @@ | ||
| // eslint-disable-next-line max-classes-per-file | ||
| import type { Readable } from 'node:stream'; | ||
|
|
||
| import c from 'ansi-colors'; | ||
|
|
||
| import type { Log } from '@apify/log'; | ||
| import { Logger, LogLevel } from '@apify/log'; | ||
|
|
||
| import type { ApifyApiError } from '../apify_api_error'; | ||
| import type { ApiClientSubResourceOptions } from '../base/api_client'; | ||
| import { ResourceClient } from '../base/resource_client'; | ||
|
|
@@ -20,11 +26,11 @@ export class LogClient extends ResourceClient { | |
| /** | ||
| * https://docs.apify.com/api/v2#/reference/logs/log/get-log | ||
| */ | ||
| async get(): Promise<string | undefined> { | ||
| async get(options: LogOptions = {}): Promise<string | undefined> { | ||
| const requestOpts: ApifyRequestConfig = { | ||
| url: this._url(), | ||
| method: 'GET', | ||
| params: this._params(), | ||
| params: this._params(options), | ||
| }; | ||
|
|
||
| try { | ||
|
|
@@ -41,9 +47,10 @@ export class LogClient extends ResourceClient { | |
| * Gets the log in a Readable stream format. Only works in Node.js. | ||
| * https://docs.apify.com/api/v2#/reference/logs/log/get-log | ||
| */ | ||
| async stream(): Promise<Readable | undefined> { | ||
| async stream(options: LogOptions = {}): Promise<Readable | undefined> { | ||
| const params = { | ||
| stream: true, | ||
| raw: options.raw, | ||
| }; | ||
|
|
||
| const requestOpts: ApifyRequestConfig = { | ||
|
|
@@ -63,3 +70,182 @@ export class LogClient extends ResourceClient { | |
| return undefined; | ||
| } | ||
| } | ||
|
|
||
| export interface LogOptions { | ||
| /** @default false */ | ||
| raw?: boolean; | ||
| } | ||
|
|
||
| // Temp create it here and ask Martin where to put it | ||
|
||
|
|
||
| const DEFAULT_OPTIONS = { | ||
| /** Whether to exclude timestamp of log redirection in redirected logs. */ | ||
| skipTime: true, | ||
| /** Level of log redirection */ | ||
| level: LogLevel.DEBUG, | ||
| }; | ||
|
|
||
| /** | ||
| * Logger for redirected actor logs. | ||
| */ | ||
| export class LoggerActorRedirect extends Logger { | ||
| constructor(options = {}) { | ||
| super({ ...DEFAULT_OPTIONS, ...options }); | ||
| } | ||
|
|
||
| _console_log(line: string) { | ||
Pijukatel marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| console.log(line); // eslint-disable-line no-console | ||
| } | ||
Pijukatel marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| override _log(level: LogLevel, message: string, data?: any, exception?: unknown, opts: Record<string, any> = {}) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm wondering, what's the biggest difference between this implementation and the
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Logger in our codebase is kind of a combination of a formater and a handler in Python terms. While
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we need to adjust the shared logger to support this use case?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is not really needed for this change. This change works with the current version of the codebase. Changing |
||
| if (level > this.options.level) { | ||
| return; | ||
| } | ||
| if (data || exception) { | ||
| throw new Error('Redirect logger does not use other arguments than level and message'); | ||
| } | ||
| let { prefix } = opts; | ||
| prefix = prefix ? `${prefix}` : ''; | ||
|
|
||
| let maybeDate = ''; | ||
| if (!this.options.skipTime) { | ||
| maybeDate = `${new Date().toISOString().replace('Z', '').replace('T', ' ')} `; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need to repeat the timestamps?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Those timestamps will not be the same. One is the timestamp of origin, and the other is the timestamp of redirection. For example> It is important to
|
||
| } | ||
|
|
||
| const line = `${c.gray(maybeDate)}${c.cyan(prefix)}${message || ''}`; | ||
| this._console_log(line); | ||
| return line; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Helper class for redirecting streamed Actor logs to another log. | ||
| */ | ||
| export class StreamedLog { | ||
Pijukatel marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| private toLog: Log; | ||
Pijukatel marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| private streamBuffer: Buffer[] = []; | ||
| private splitMarker = /(?:\n|^)(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)/g; | ||
| private relevancyTimeLimit: Date | null; | ||
|
|
||
| private logClient: LogClient; | ||
| private streamingTask: Promise<void> | null = null; | ||
| private stopLogging = false; | ||
|
|
||
| constructor(logClient: LogClient, toLog: Log, fromStart = true) { | ||
Pijukatel marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| this.toLog = toLog; | ||
| this.logClient = logClient; | ||
| this.relevancyTimeLimit = fromStart ? null : new Date(); | ||
| } | ||
|
|
||
| /** | ||
| * Start log redirection. | ||
| */ | ||
| public async start(): Promise<void> { | ||
| if (this.streamingTask) { | ||
| throw new Error('Streaming task already active'); | ||
| } | ||
| this.stopLogging = false; | ||
| this.streamingTask = this._streamLog(); | ||
| return this.streamingTask; | ||
| } | ||
|
|
||
| /** | ||
| * Stop log redirection. | ||
| */ | ||
| public async stop(): Promise<void> { | ||
| if (!this.streamingTask) { | ||
| throw new Error('Streaming task is not active'); | ||
| } | ||
| this.stopLogging = true; | ||
| try { | ||
| await this.streamingTask; | ||
| } catch (err) { | ||
| if (!(err instanceof Error && err.name === 'AbortError')) { | ||
| throw err; | ||
| } | ||
| } finally { | ||
| this.streamingTask = null; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Get log stream from response and redirect it to another log. | ||
| */ | ||
| private async _streamLog(): Promise<void> { | ||
| const logStream = await this.logClient.stream({ raw: true }); | ||
| if (!logStream) { | ||
| return; | ||
| } | ||
|
|
||
| for await (const chunk of logStream) { | ||
Pijukatel marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // Keep processing the new data until stopped | ||
| this.streamBuffer.push(chunk); | ||
| if (this.splitMarker.test(chunk.toString())) { | ||
| this.logBufferContent(false); | ||
| } | ||
| if (this.stopLogging) { | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| // Process the remaining buffer | ||
| this.logBufferContent(true); | ||
| } | ||
|
|
||
| /** | ||
| * Parse the buffer and log complete messages. | ||
| */ | ||
| private logBufferContent(includeLastPart = false): void { | ||
| const allParts = Buffer.concat(this.streamBuffer).toString().split(this.splitMarker).slice(1); | ||
| let messageMarkers; | ||
| let messageContents; | ||
| // Parse the buffer parts into complete messages | ||
| if (includeLastPart) { | ||
| // This is final call, so log everything. Do not keep anything in the buffer. | ||
| messageMarkers = allParts.filter((_, i) => i % 2 === 0); | ||
| messageContents = allParts.filter((_, i) => i % 2 !== 0); | ||
| this.streamBuffer = []; | ||
| } else { | ||
| messageMarkers = allParts.filter((_, i) => i % 2 === 0).slice(0, -1); | ||
| messageContents = allParts.filter((_, i) => i % 2 !== 0).slice(0, -1); | ||
|
|
||
| // The last two parts (marker and message) are possibly not complete and will be left in the buffer. | ||
| this.streamBuffer = [Buffer.from(allParts.slice(-2).join(''))]; | ||
| } | ||
|
|
||
| messageMarkers.forEach((marker, index) => { | ||
| const decodedMarker = marker; | ||
| const decodedContent = messageContents[index]; | ||
| if (this.relevancyTimeLimit) { | ||
| // Log only relevant messages. Ignore too old log messages. | ||
| const logTime = new Date(decodedMarker); | ||
| if (logTime < this.relevancyTimeLimit) { | ||
| return; | ||
| } | ||
| } | ||
| const message = decodedMarker + decodedContent; | ||
|
|
||
| // Log parsed message at guessed level. | ||
| this.logAtGuessedLevel(message); | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Log messages at appropriate log level guessed from the message content. | ||
| * | ||
| * Original log level information does not have to be included in the message at all. | ||
| * This is methods just guesses, exotic formating or specific keywords can break the guessing logic. | ||
| */ | ||
| private logAtGuessedLevel(message: string) { | ||
| message = message.trim(); | ||
|
|
||
| if (message.includes('ERROR')) this.toLog.error(message); | ||
| else if (message.includes('SOFT_FAIL')) this.toLog.softFail(message); | ||
| else if (message.includes('WARNING')) this.toLog.warning(message); | ||
| else if (message.includes('INFO')) this.toLog.info(message); | ||
| else if (message.includes('DEBUG')) this.toLog.debug(message); | ||
| else if (message.includes('PERF')) this.toLog.perf(message); | ||
| // Fallback in case original log message does not indicate known log level. | ||
| else this.toLog.info(message); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just temp for testing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, what are you testing this way?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have an Actor that just points to this branch and I can test the tip of it on the Apify platform as it compiles on the fly. It takes time, but it does not require me to do any manual actions.
The actor has in pacakge.json
and I removed
--omit=devfrom npm install in the docker fileIf there is a more convenient way for TypeScript-based code, I would be very happy to learn about it.