Files: 164dd8a3251906b4d06836dfca7230a72f68ed50 / node_modules / readable-stream / lib / _stream_transform.js
7947 bytesRaw
1 | // Copyright Joyent, Inc. and other Node contributors. |
2 | // |
3 | // Permission is hereby granted, free of charge, to any person obtaining a |
4 | // copy of this software and associated documentation files (the |
5 | // "Software"), to deal in the Software without restriction, including |
6 | // without limitation the rights to use, copy, modify, merge, publish, |
7 | // distribute, sublicense, and/or sell copies of the Software, and to permit |
8 | // persons to whom the Software is furnished to do so, subject to the |
9 | // following conditions: |
10 | // |
11 | // The above copyright notice and this permission notice shall be included |
12 | // in all copies or substantial portions of the Software. |
13 | // |
14 | // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
15 | // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |
16 | // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN |
17 | // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, |
18 | // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR |
19 | // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE |
20 | // USE OR OTHER DEALINGS IN THE SOFTWARE. |
21 | // a transform stream is a readable/writable stream where you do |
22 | // something with the data. Sometimes it's called a "filter", |
23 | // but that's not a great name for it, since that implies a thing where |
24 | // some bits pass through, and others are simply ignored. (That would |
25 | // be a valid example of a transform, of course.) |
26 | // |
27 | // While the output is causally related to the input, it's not a |
28 | // necessarily symmetric or synchronous transformation. For example, |
29 | // a zlib stream might take multiple plain-text writes(), and then |
30 | // emit a single compressed chunk some time in the future. |
31 | // |
32 | // Here's how this works: |
33 | // |
34 | // The Transform stream has all the aspects of the readable and writable |
35 | // stream classes. When you write(chunk), that calls _write(chunk,cb) |
36 | // internally, and returns false if there's a lot of pending writes |
37 | // buffered up. When you call read(), that calls _read(n) until |
38 | // there's enough pending readable data buffered up. |
39 | // |
40 | // In a transform stream, the written data is placed in a buffer. When |
41 | // _read(n) is called, it transforms the queued up data, calling the |
42 | // buffered _write cb's as it consumes chunks. If consuming a single |
43 | // written chunk would result in multiple output chunks, then the first |
44 | // outputted bit calls the readcb, and subsequent chunks just go into |
45 | // the read buffer, and will cause it to emit 'readable' if necessary. |
46 | // |
47 | // This way, back-pressure is actually determined by the reading side, |
48 | // since _read has to be called to start processing a new chunk. However, |
49 | // a pathological inflate type of transform can cause excessive buffering |
50 | // here. For example, imagine a stream where every byte of input is |
51 | // interpreted as an integer from 0-255, and then results in that many |
52 | // bytes of output. Writing the 4 bytes {ff,ff,ff,ff} would result in |
53 | // 1kb of data being output. In this case, you could write a very small |
54 | // amount of input, and end up with a very large amount of output. In |
55 | // such a pathological inflating mechanism, there'd be no way to tell |
56 | // the system to stop doing the transform. A single 4MB write could |
57 | // cause the system to run out of memory. |
58 | // |
59 | // However, even in such a pathological case, only a single written chunk |
60 | // would be consumed, and then the rest would wait (un-transformed) until |
61 | // the results of the previous transformed chunk were consumed. |
62 | ; |
63 | |
64 | module.exports = Transform; |
65 | |
66 | var _require$codes = require('../errors').codes, |
67 | ERR_METHOD_NOT_IMPLEMENTED = _require$codes.ERR_METHOD_NOT_IMPLEMENTED, |
68 | ERR_MULTIPLE_CALLBACK = _require$codes.ERR_MULTIPLE_CALLBACK, |
69 | ERR_TRANSFORM_ALREADY_TRANSFORMING = _require$codes.ERR_TRANSFORM_ALREADY_TRANSFORMING, |
70 | ERR_TRANSFORM_WITH_LENGTH_0 = _require$codes.ERR_TRANSFORM_WITH_LENGTH_0; |
71 | |
72 | var Duplex = require('./_stream_duplex'); |
73 | |
74 | require('inherits')(Transform, Duplex); |
75 | |
76 | function afterTransform(er, data) { |
77 | var ts = this._transformState; |
78 | ts.transforming = false; |
79 | var cb = ts.writecb; |
80 | |
81 | if (cb === null) { |
82 | return this.emit('error', new ERR_MULTIPLE_CALLBACK()); |
83 | } |
84 | |
85 | ts.writechunk = null; |
86 | ts.writecb = null; |
87 | if (data != null) // single equals check for both `null` and `undefined` |
88 | this.push(data); |
89 | cb(er); |
90 | var rs = this._readableState; |
91 | rs.reading = false; |
92 | |
93 | if (rs.needReadable || rs.length < rs.highWaterMark) { |
94 | this._read(rs.highWaterMark); |
95 | } |
96 | } |
97 | |
98 | function Transform(options) { |
99 | if (!(this instanceof Transform)) return new Transform(options); |
100 | Duplex.call(this, options); |
101 | this._transformState = { |
102 | afterTransform: afterTransform.bind(this), |
103 | needTransform: false, |
104 | transforming: false, |
105 | writecb: null, |
106 | writechunk: null, |
107 | writeencoding: null |
108 | }; // start out asking for a readable event once data is transformed. |
109 | |
110 | this._readableState.needReadable = true; // we have implemented the _read method, and done the other things |
111 | // that Readable wants before the first _read call, so unset the |
112 | // sync guard flag. |
113 | |
114 | this._readableState.sync = false; |
115 | |
116 | if (options) { |
117 | if (typeof options.transform === 'function') this._transform = options.transform; |
118 | if (typeof options.flush === 'function') this._flush = options.flush; |
119 | } // When the writable side finishes, then flush out anything remaining. |
120 | |
121 | |
122 | this.on('prefinish', prefinish); |
123 | } |
124 | |
125 | function prefinish() { |
126 | var _this = this; |
127 | |
128 | if (typeof this._flush === 'function' && !this._readableState.destroyed) { |
129 | this._flush(function (er, data) { |
130 | done(_this, er, data); |
131 | }); |
132 | } else { |
133 | done(this, null, null); |
134 | } |
135 | } |
136 | |
137 | Transform.prototype.push = function (chunk, encoding) { |
138 | this._transformState.needTransform = false; |
139 | return Duplex.prototype.push.call(this, chunk, encoding); |
140 | }; // This is the part where you do stuff! |
141 | // override this function in implementation classes. |
142 | // 'chunk' is an input chunk. |
143 | // |
144 | // Call `push(newChunk)` to pass along transformed output |
145 | // to the readable side. You may call 'push' zero or more times. |
146 | // |
147 | // Call `cb(err)` when you are done with this chunk. If you pass |
148 | // an error, then that'll put the hurt on the whole operation. If you |
149 | // never call cb(), then you'll never get another chunk. |
150 | |
151 | |
152 | Transform.prototype._transform = function (chunk, encoding, cb) { |
153 | cb(new ERR_METHOD_NOT_IMPLEMENTED('_transform()')); |
154 | }; |
155 | |
156 | Transform.prototype._write = function (chunk, encoding, cb) { |
157 | var ts = this._transformState; |
158 | ts.writecb = cb; |
159 | ts.writechunk = chunk; |
160 | ts.writeencoding = encoding; |
161 | |
162 | if (!ts.transforming) { |
163 | var rs = this._readableState; |
164 | if (ts.needTransform || rs.needReadable || rs.length < rs.highWaterMark) this._read(rs.highWaterMark); |
165 | } |
166 | }; // Doesn't matter what the args are here. |
167 | // _transform does all the work. |
168 | // That we got here means that the readable side wants more data. |
169 | |
170 | |
171 | Transform.prototype._read = function (n) { |
172 | var ts = this._transformState; |
173 | |
174 | if (ts.writechunk !== null && !ts.transforming) { |
175 | ts.transforming = true; |
176 | |
177 | this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform); |
178 | } else { |
179 | // mark that we need a transform, so that any data that comes in |
180 | // will get processed, now that we've asked for it. |
181 | ts.needTransform = true; |
182 | } |
183 | }; |
184 | |
185 | Transform.prototype._destroy = function (err, cb) { |
186 | Duplex.prototype._destroy.call(this, err, function (err2) { |
187 | cb(err2); |
188 | }); |
189 | }; |
190 | |
191 | function done(stream, er, data) { |
192 | if (er) return stream.emit('error', er); |
193 | if (data != null) // single equals check for both `null` and `undefined` |
194 | stream.push(data); // TODO(BridgeAR): Write a test for these two error cases |
195 | // if there's nothing in the write buffer, then that means |
196 | // that nothing more will ever be provided |
197 | |
198 | if (stream._writableState.length) throw new ERR_TRANSFORM_WITH_LENGTH_0(); |
199 | if (stream._transformState.transforming) throw new ERR_TRANSFORM_ALREADY_TRANSFORMING(); |
200 | return stream.push(null); |
201 | } |
Built with git-ssb-web