@@ -29,6 +29,7 @@ const LambdaContext = require('serverless-offline/src/LambdaContext');
2929
3030const NO_KINESIS_FOUND = 'Could not find kinesis stream' ;
3131const KINESIS_RETRY_DELAY = 200 ;
32+ const KINESIS_RETRY_TIMEOUT = 30000 ;
3233
3334const printBlankLine = ( ) => console . log ( ) ;
3435
@@ -166,65 +167,45 @@ class ServerlessOfflineKinesis {
166167 if ( isString ( physicalResourceName ) ) return physicalResourceName ;
167168 }
168169
169- this . serverless . cli . log ( `Could not resolve stream name for spec: ${ JSON . stringify ( streamEvent , null , 2 ) } ` ) ;
170+ this . serverless . cli . log (
171+ `Could not resolve stream name for spec: ${ JSON . stringify ( streamEvent , null , 2 ) } `
172+ ) ;
170173
171174 throw new Error (
172175 `StreamName not found. See https://github.com/CoorpAcademy/serverless-plugins/tree/master/packages/serverless-offline-kinesis#functions`
173176 ) ;
174177 }
175178
176- // FIXME: to really incorporate [to be done after conflict resolving]
177- pollStreamUntilActive ( streamName , timeout ) {
178- const client = this . getClient ( ) ;
179- const lastTime = Date . now ( ) + timeout ;
180- return new Promise ( ( resolve , reject ) => {
181- const poll = async ( ) => {
182- const {
183- StreamDescription : { StreamStatus}
184- } = await client . describeStream ( { StreamName : streamName } ) . promise ( ) ;
185- if ( StreamStatus === 'ACTIVE' ) {
186- resolve ( ) ;
187- } else if ( Date . now ( ) > lastTime ) {
188- reject (
189- new Error (
190- `Stream ${ streamName } did not become active within timeout of ${ Math . floor (
191- timeout / 1000
192- ) } s`
193- )
194- ) ;
195- } else {
196- setTimeout ( poll , 1000 ) ;
197- }
198- } ;
199- poll ( ) ;
200- } ) ;
201- }
202-
203- async createKinesisReadable ( functionName , streamEvent , retry = false ) {
179+ async createKinesisReadable ( functionName , streamEvent , delay = null ) {
204180 const client = this . getClient ( ) ;
205181 const streamName = this . getStreamName ( streamEvent ) ;
206182
207183 this . serverless . cli . log ( `Waiting for ${ streamName } to become active` ) ;
208184
209- await this . pollStreamUntilActive ( streamName , this . getConfig ( ) . waitForActiveTimeout || 30000 ) ; // FIXME
210-
211185 const kinesisStream = await client
212186 . describeStream ( {
213187 StreamName : streamName
214188 } )
215189 . promise ( )
190+ . then ( ( { StreamDescription} ) => {
191+ if ( StreamDescription . StreamStatus !== 'ACTIVE' )
192+ throw new Error ( 'Stream found but not yet active' ) ;
193+ return { StreamDescription} ;
194+ } )
216195 . catch ( err => err ) ;
217196
218197 if ( kinesisStream instanceof Error ) {
219- if ( ! retry ) throw new Error ( NO_KINESIS_FOUND ) ;
198+ if ( delay === null ) throw new Error ( NO_KINESIS_FOUND ) ;
199+ if ( delay < KINESIS_RETRY_DELAY )
200+ throw new Error ( `Stream ${ streamName } did not become active within specified timeout` ) ;
220201
221202 this . serverless . cli . log (
222203 `${ streamName } - not found because of ${
223204 kinesisStream . code
224205 } , retrying in ${ KINESIS_RETRY_DELAY } ms`
225206 ) ;
226207 return setTimeout ( ( ) => {
227- this . createKinesisReadable ( functionName , streamEvent , retry ) ;
208+ this . createKinesisReadable ( functionName , streamEvent , delay - KINESIS_RETRY_DELAY ) ;
228209 } , KINESIS_RETRY_DELAY ) ;
229210 }
230211
@@ -286,8 +267,10 @@ class ServerlessOfflineKinesis {
286267 printBlankLine ( ) ;
287268 this . serverless . cli . log ( `Kinesis for ${ functionName } :` ) ;
288269
270+ const waitForStreamDelay = this . getConfig ( ) . waitForActiveTimeout || KINESIS_RETRY_TIMEOUT ;
271+ // ! FIXME: probably rename (and document the variable name)
289272 forEach ( streamEvent => {
290- this . createKinesisReadable ( functionName , streamEvent , true ) ; // TMP: retry is not configurable so far
273+ this . createKinesisReadable ( functionName , streamEvent , waitForStreamDelay ) ;
291274 } , streams ) ;
292275
293276 printBlankLine ( ) ;
0 commit comments