@@ -31,27 +31,30 @@ export abstract class SentenceStream {
31
31
> ( ) ;
32
32
protected output = new TransformStream < TokenData , TokenData > ( ) ;
33
33
#closed = false ;
34
+ #inputClosed = false ;
35
+ #reader = this . output . readable . getReader ( ) ;
36
+ #writer = this . input . writable . getWriter ( ) ;
34
37
35
38
get closed ( ) : boolean {
36
39
return this . #closed;
37
40
}
38
41
39
42
/** Push a string of text to the tokenizer */
40
43
pushText ( text : string ) {
41
- // if (this.input.closed ) {
42
- // throw new Error('Input is closed');
43
- // }
44
+ if ( this . #inputClosed ) {
45
+ throw new Error ( 'Input is closed' ) ;
46
+ }
44
47
if ( this . #closed) {
45
48
throw new Error ( 'Stream is closed' ) ;
46
49
}
47
- this . input . writable . getWriter ( ) . write ( text ) ;
50
+ this . #writer . write ( text ) ;
48
51
}
49
52
50
53
/** Flush the tokenizer, causing it to process all pending text */
51
54
flush ( ) {
52
- // if (this.input.closed ) {
53
- // throw new Error('Input is closed');
54
- // }
55
+ if ( this . #inputClosed ) {
56
+ throw new Error ( 'Input is closed' ) ;
57
+ }
55
58
if ( this . #closed) {
56
59
throw new Error ( 'Stream is closed' ) ;
57
60
}
@@ -60,32 +63,31 @@ export abstract class SentenceStream {
60
63
61
64
/** Mark the input as ended and forbid additional pushes */
62
65
endInput ( ) {
63
- // if (this.input.closed ) {
64
- // throw new Error('Input is closed');
65
- // }
66
+ if ( this . #inputClosed ) {
67
+ throw new Error ( 'Input is closed' ) ;
68
+ }
66
69
if ( this . #closed) {
67
70
throw new Error ( 'Stream is closed' ) ;
68
71
}
69
- this . input . writable . close ( ) ;
72
+ this . #writer. close ( ) ;
73
+ this . #inputClosed = true ;
70
74
}
71
75
72
76
async next ( ) : Promise < IteratorResult < TokenData > > {
73
- return this . output . readable
74
- . getReader ( )
75
- . read ( )
76
- . then ( ( { value } ) => {
77
- if ( value ) {
78
- return { value, done : false } ;
79
- } else {
80
- return { value : undefined , done : true } ;
81
- }
82
- } ) ;
77
+ return this . #reader. read ( ) . then ( ( { value } ) => {
78
+ if ( value ) {
79
+ return { value, done : false } ;
80
+ } else {
81
+ return { value : undefined , done : true } ;
82
+ }
83
+ } ) ;
83
84
}
84
85
85
86
/** Close both the input and output of the tokenizer stream */
86
87
close ( ) {
87
- this . input . writable . close ( ) ;
88
- this . output . writable . close ( ) ;
88
+ if ( ! this . #inputClosed) {
89
+ this . endInput ( ) ;
90
+ }
89
91
this . #closed = true ;
90
92
}
91
93
@@ -110,6 +112,9 @@ export abstract class WordStream {
110
112
string | typeof WordStream . FLUSH_SENTINEL
111
113
> ( ) ;
112
114
protected output = new TransformStream < TokenData , TokenData > ( ) ;
115
+ #writer = this . input . writable . getWriter ( ) ;
116
+ #reader = this . output . readable . getReader ( ) ;
117
+ #inputClosed = false ;
113
118
#closed = false ;
114
119
115
120
get closed ( ) : boolean {
@@ -118,54 +123,51 @@ export abstract class WordStream {
118
123
119
124
/** Push a string of text to the tokenizer */
120
125
pushText ( text : string ) {
121
- // if (this.input.closed ) {
122
- // throw new Error('Input is closed');
123
- // }
126
+ if ( this . #inputClosed ) {
127
+ throw new Error ( 'Input is closed' ) ;
128
+ }
124
129
if ( this . #closed) {
125
130
throw new Error ( 'Stream is closed' ) ;
126
131
}
127
- this . input . writable . getWriter ( ) . write ( text ) ;
132
+ this . #writer . write ( text ) ;
128
133
}
129
134
130
135
/** Flush the tokenizer, causing it to process all pending text */
131
136
flush ( ) {
132
- // if (this.input.closed ) {
133
- // throw new Error('Input is closed');
134
- // }
137
+ if ( this . #inputClosed ) {
138
+ throw new Error ( 'Input is closed' ) ;
139
+ }
135
140
if ( this . #closed) {
136
141
throw new Error ( 'Stream is closed' ) ;
137
142
}
138
- this . input . writable . getWriter ( ) . write ( WordStream . FLUSH_SENTINEL ) ;
143
+ this . #writer . write ( WordStream . FLUSH_SENTINEL ) ;
139
144
}
140
145
141
146
/** Mark the input as ended and forbid additional pushes */
142
147
endInput ( ) {
143
- // if (this.input.closed ) {
144
- // throw new Error('Input is closed');
145
- // }
148
+ if ( this . #inputClosed ) {
149
+ throw new Error ( 'Input is closed' ) ;
150
+ }
146
151
if ( this . #closed) {
147
152
throw new Error ( 'Stream is closed' ) ;
148
153
}
149
- this . input . writable . close ( ) ;
154
+ this . #inputClosed = true ;
150
155
}
151
156
152
157
async next ( ) : Promise < IteratorResult < TokenData > > {
153
- return this . output . readable
154
- . getReader ( )
155
- . read ( )
156
- . then ( ( { value } ) => {
157
- if ( value ) {
158
- return { value, done : false } ;
159
- } else {
160
- return { value : undefined , done : true } ;
161
- }
162
- } ) ;
158
+ return this . #reader. read ( ) . then ( ( { value } ) => {
159
+ if ( value ) {
160
+ return { value, done : false } ;
161
+ } else {
162
+ return { value : undefined , done : true } ;
163
+ }
164
+ } ) ;
163
165
}
164
166
165
167
/** Close both the input and output of the tokenizer stream */
166
168
close ( ) {
167
- this . input . writable . close ( ) ;
168
- this . output . writable . close ( ) ;
169
+ this . endInput ( ) ;
170
+ this . #writer . close ( ) ;
169
171
this . #closed = true ;
170
172
}
171
173
0 commit comments