11import byline from "byline" ;
22import Dockerode from "dockerode" ;
3- import { setTimeout } from "timers/promises" ;
43import { log } from "../common" ;
54import { getContainerRuntimeClient } from "../container-runtime" ;
65import { BoundPorts } from "../utils/bound-ports" ;
@@ -17,37 +16,46 @@ export class LogWaitStrategy extends AbstractWaitStrategy {
1716 }
1817
1918 public async waitUntilReady ( container : Dockerode . Container , boundPorts : BoundPorts , startTime ?: Date ) : Promise < void > {
20- await Promise . race ( [ this . handleTimeout ( container . id ) , this . handleLogs ( container , startTime ) ] ) ;
21- }
22-
23- async handleTimeout ( containerId : string ) : Promise < void > {
24- await setTimeout ( this . startupTimeout ) ;
25- this . throwError ( containerId , `Log message "${ this . message } " not received after ${ this . startupTimeout } ms` ) ;
26- }
27-
28- async handleLogs ( container : Dockerode . Container , startTime ?: Date ) : Promise < void > {
2919 log . debug ( `Waiting for log message "${ this . message } "...` , { containerId : container . id } ) ;
3020 const client = await getContainerRuntimeClient ( ) ;
3121 const stream = await client . container . logs ( container , { since : startTime ? startTime . getTime ( ) / 1000 : 0 } ) ;
32-
33- let matches = 0 ;
34- for await ( const line of byline ( stream ) ) {
35- if ( this . matches ( line ) ) {
36- if ( ++ matches === this . times ) {
37- return log . debug ( `Log wait strategy complete` , { containerId : container . id } ) ;
22+ return new Promise ( ( resolve , reject ) => {
23+ const timeout = setTimeout ( ( ) => {
24+ const message = `Log message "${ this . message } " not received after ${ this . startupTimeout } ms` ;
25+ log . error ( message , { containerId : container . id } ) ;
26+ reject ( new Error ( message ) ) ;
27+ } , this . startupTimeout ) ;
28+
29+ const comparisonFn : ( line : string ) => boolean = ( line : string ) => {
30+ if ( this . message instanceof RegExp ) {
31+ return this . message . test ( line ) ;
32+ } else {
33+ return line . includes ( this . message ) ;
3834 }
39- }
40- }
41-
42- this . throwError ( container . id , `Log stream ended and message "${ this . message } " was not received` ) ;
43- }
44-
45- matches ( line : string ) : boolean {
46- return this . message instanceof RegExp ? this . message . test ( line ) : line . includes ( this . message ) ;
47- }
48-
49- throwError ( containerId : string , message : string ) : void {
50- log . error ( message , { containerId } ) ;
51- throw new Error ( message ) ;
35+ } ;
36+
37+ let count = 0 ;
38+ const lineProcessor = ( line : string ) => {
39+ if ( comparisonFn ( line ) ) {
40+ if ( ++ count === this . times ) {
41+ stream . destroy ( ) ;
42+ clearTimeout ( timeout ) ;
43+ log . debug ( `Log wait strategy complete` , { containerId : container . id } ) ;
44+ resolve ( ) ;
45+ }
46+ }
47+ } ;
48+
49+ byline ( stream )
50+ . on ( "data" , lineProcessor )
51+ . on ( "err" , lineProcessor )
52+ . on ( "end" , ( ) => {
53+ stream . destroy ( ) ;
54+ clearTimeout ( timeout ) ;
55+ const message = `Log stream ended and message "${ this . message } " was not received` ;
56+ log . error ( message , { containerId : container . id } ) ;
57+ reject ( new Error ( message ) ) ;
58+ } ) ;
59+ } ) ;
5260 }
5361}
0 commit comments