1 /**
  2  * @fileOverview Cloud file steams.
  3  *
  4  * Used for file reading / writing.
  5  */
  6 // TODO: Could have common Stream base class and share _handleErr.
  7 
  8 /**
  9  * @name stream
 10  */
 11 (function () {
 12   var util = require('util'),
 13     Stream = require('stream').Stream,
 14     EventEmitter = require('events').EventEmitter,
 15     utils = require("./utils"),
 16     CloudError = require("./errors").CloudError,
 17     ReadStream,
 18     WriteStream;
 19 
 20   /**
 21    * Data event ('``data``').
 22    *
 23    * *Implements* **[``Event 'data'``][0]**
 24    * [0]: http://nodejs.org/docs/v0.4.9/api/streams.html#event_data_
 25    *
 26    * @name stream.ReadStream#event:data
 27    * @event
 28    * @param  {Object} data            Data chunk.
 29    * @param  {Object} meta            Headers, meta object.
 30    * @config {Object} [headers]       HTTP headers.
 31    * @config {Object} [cloudHeaders]  Cloud provider headers.
 32    * @config {Object} [metadata]      Cloud metadata.
 33    */
 34   /**
 35    * Completion event ('``end``').
 36    *
 37    * *Implements* **[``Event 'end'``][0]**
 38    * [0]: http://nodejs.org/docs/v0.4.9/api/streams.html#event_end_
 39    *
 40    * @name stream.ReadStream#event:end
 41    * @event
 42    * @param  {Object} results         Results object.
 43    * @param  {Object} meta            Headers, meta object.
 44    * @config {Object} [headers]       HTTP headers.
 45    * @config {Object} [cloudHeaders]  Cloud provider headers.
 46    * @config {Object} [metadata]      Cloud metadata.
 47    */
 48   /**
 49    * Error event ('``error``').
 50    *
 51    * *Implements* **[``Event 'error'``][0]**
 52    * [0]: http://nodejs.org/docs/v0.4.9/api/streams.html#event_error_
 53    *
 54    * @name stream.ReadStream#event:error
 55    * @event
 56    * @param {Error|errors.CloudError} err Error object.
 57    */
 58   /**
 59    * Readable cloud stream.
 60    *
 61    * *Implements* **[``Readable Stream``][0]** interface.
 62    * [0]: http://nodejs.org/docs/v0.4.9/api/streams.html#readable_Stream
 63    *
 64    * @param  {request.AuthenticatedRawRequest}
 65    *                      request     Request object.
 66    * @param   {Object}    [options]   Options object.
 67    * @config  {Function}  [errorFn]   Function to call with error.
 68    * @config  {Function}  [endFn]     Function to call with results. Called:
 69    *                                  endFn(response).
 70    * @exports ReadStream as stream.ReadStream
 71    * @constructor
 72    */
 73   ReadStream = function (request, options) {
 74     options = options || {};
 75     var self = this;
 76 
 77     // Members.
 78     self._readable = true;
 79     self._request = request;
 80     self._realRequest = request.realRequest;
 81     self._requestDone = false;
 82     self._errorHandled = false;
 83     self._response = null;
 84     self._buf = [];
 85     self._errorFn = options.errorFn || null;
 86     self._endFn = options.endFn || null;
 87 
 88     // Called state
 89     self._endCalled = false;
 90     self._destroyCalled = false;
 91     self._destroySoonCalled = false;
 92 
 93     // Error handling
 94     self._handleErr = function (err, response) {
 95       if (!self._errorHandled) {
 96         self._errorHandled = true;
 97         self._readable = false;
 98         self._requestDone = true;
 99         if (self._errorFn) {
100           self._errorFn(err, self, response);
101         } else {
102           self.emit('error', err);
103         }
104       }
105     };
106 
107     // Errors: straight pass-through (AuthReq has response).
108     self._request.on('error', function (err, response) {
109       self._handleErr(err, response);
110     });
111 
112     // Event binding.
113     self._realRequest.on('response', function (response) {
114       var encoding = self._request._encoding || null,
115         meta = self._request.getMeta(response);
116 
117       // Store response and set encoding.
118       self._response = response;
119       if (encoding) {
120         response.setEncoding(encoding);
121       }
122 
123       // Add bindings.
124       response.on('data', function (chunk) {
125         var doData = self.listeners('data').length > 0,
126           isError = response.statusCode && response.statusCode >= 400;
127 
128         if (doData && !isError) {
129           // If we have data listeners and not error, emit the data.
130           self.emit('data', chunk, meta);
131         } else {
132           // Otherwise, accumulate as string.
133           self._buf.push(chunk);
134         }
135       });
136       response.on('end', function () {
137         var results = null,
138           msg,
139           err;
140 
141         // Set state.
142         self._readable = false;
143         self._requestDone = true;
144 
145         if (self._endFn) {
146           results = self._endFn(response);
147         }
148 
149         switch (response.statusCode) {
150         case 200:
151           self.emit('end', results, meta);
152           break;
153         default:
154           // Everything unknown is an error.
155           msg = utils.bufToStr(self._buf, encoding, 'utf8');
156           err = new CloudError(msg, { response: response });
157           self._handleErr(err, response);
158         }
159       });
160     });
161   };
162 
163   util.inherits(ReadStream, Stream);
164 
165   Object.defineProperties(ReadStream.prototype, {
166     /**
167      * *Implements* **[``readable``][0]**
168      * [0]: http://nodejs.org/docs/v0.4.9/api/streams.html#stream.readable
169      *
170      * "A boolean that is true by default, but turns false after an 'error'
171      * occurred, the stream came to an 'end', or destroy() was called."
172      *
173      * @name ReadStream#readable
174      * @type boolean
175      */
176     readable: {
177       get: function () {
178         return this._readable;
179       }
180     }
181   });
182 
183   /**
184    * *Implements* **[``setEncoding``][0]**
185    * [0]: http://nodejs.org/docs/v0.4.9/api/streams.html#stream.setEncoding
186    *
187    * "Makes the data event emit a string instead of a Buffer. encoding can be
188    * 'utf8', 'ascii', or 'base64'."
189    */
190   // TODO: Test ReadStream.setEncoding
191   ReadStream.prototype.setEncoding = function (encoding) {
192     this._request.setEncoding(encoding);
193   };
194 
195   /**
196    * *Implements* **[``pause``][0]**
197    * [0]: http://nodejs.org/docs/v0.4.9/api/streams.html#stream.pause
198    *
199    * "Pauses the incoming 'data' events."
200    */
201   // TODO: Test ReadStream.pause
202   ReadStream.prototype.pause = function () {
203     if (this._response) {
204       this._response.pause();
205     }
206   };
207 
208   /**
209    * *Implements* **[``resume``][0]**
210    * [0]: http://nodejs.org/docs/v0.4.9/api/streams.html#stream.resume
211    *
212    * "Resumes the incoming 'data' events after a pause()."
213    */
214   // TODO: Test ReadStream.resume
215   ReadStream.prototype.resume = function () {
216     if (this._response) {
217       this._response.resume();
218     }
219   };
220 
221   /**
222    * *Implements* **[``destroy``][0]**
223    * [0]: http://nodejs.org/docs/v0.4.9/api/streams.html#stream.destroy
224    *
225    * "Closes the underlying file descriptor. Stream will not emit any more
226    * events."
227    */
228   ReadStream.prototype.destroy = function () {
229     var self = this,
230       oldDestroyCalled = self._destroyCalled;
231 
232     self._destroyCalled = true;
233     if (!oldDestroyCalled) {
234       // Remove all listeners.
235       self.removeAllListeners('data');
236       self.removeAllListeners('end');
237       self.removeAllListeners('error');
238       self._realRequest.removeAllListeners('data');
239       self._realRequest.removeAllListeners('end');
240       self._realRequest.removeAllListeners('error');
241 
242       // Set sink for errors.
243       self.on('error', function () {});
244       self._realRequest.on('error', function () {});
245 
246       //  Close the connection.
247       self._requestDone = true;
248       if (self._realRequest && self._realRequest.connection) {
249         self._realRequest.connection.end();
250       }
251     }
252   };
253 
254   /**
255    * *Implements* **[``destroySoon``][0]**
256    * [0]: http://nodejs.org/docs/v0.4.9/api/streams.html#stream.destroySoon
257    *
258    * "After the write queue is drained, close the file descriptor."
259    */
260   // TODO: Test ReadStream.destroySoon
261   ReadStream.prototype.destroySoon = function () {
262     var self = this,
263       oldDestroySoonCalled = self._destroySoonCalled,
264       finishUp;
265 
266     self._destroySoonCalled = true;
267     if (!oldDestroySoonCalled) {
268       // The write queue here is the GET data from cloud.
269       /** @private */
270       finishUp = function () {
271         self._requestDone = true;
272         self.destroy();
273       };
274 
275       self._request.on('end', finishUp);
276       self._request.on('error', finishUp);
277       self._realRequest.on('end', finishUp);
278       self._realRequest.on('error', finishUp);
279     }
280   };
281 
282   /*
283    * *Implements* **[``pipe``][0]**
284    * [0]: http://nodejs.org/docs/v0.4.9/api/streams.html#stream.pipe
285    *
286    * "Connects this read stream to destination WriteStream. Incoming data on
287    * this stream gets written to destination. The destination and source
288    * streams are kept in sync by pausing and resuming as necessary."
289    *
290    * @param {WriteStream} destination Destination stream.
291    * @name stream.ReadStream.pipe
292    */
293   // Inherited from stream.Stream.pipe...
294   //ReadStream.prototype.pipe = function (destination, options) {
295   //};
296 
297   /**
298    * End the underlying cloud request.
299    *
300    * Typically starts the async code execution.
301    *
302    * *Note*: This function can be called multiple times without bad effect.
303    * Calling code has the option to call ``end()`` once the request is set
304    * up, or leave it to the end user.
305    */
306   ReadStream.prototype.end = function () {
307     var self = this,
308       oldEndCalled = self._endCalled;
309 
310     self._endCalled = true;
311     if (!oldEndCalled && !self._destroyCalled && !self._destroySoonCalled) {
312       self._request.end();
313     }
314   };
315 
316   module.exports.ReadStream = ReadStream;
317 
318   /**
319    * Completion event ('``end``').
320    *
321    * Extra event to allow caller to know that writing has been completed.
322    *
323    * @name stream.WriteStream#event:end
324    * @event
325    * @param  {Object} results         Results object.
326    * @param  {Object} meta            Headers, meta object.
327    * @config {Object} [headers]       HTTP headers.
328    * @config {Object} [cloudHeaders]  Cloud provider headers.
329    * @config {Object} [metadata]      Cloud metadata.
330    */
331   /**
332    * Drain event ('``drain``').
333    *
334    * *Implements* **[``Event 'drain'``][0]**
335    * [0]: http://nodejs.org/docs/v0.4.9/api/streams.html#event_drain_
336    *
337    * @name stream.WriteStream#event:drain
338    * @event
339    */
340   /**
341    * Error event ('``error``').
342    *
343    * *Implements* **[``Event 'error'``][0]**
344    * [0]: http://nodejs.org/docs/v0.4.9/api/streams.html#event_error_
345    *
346    * @name stream.WriteStream#event:error
347    * @event
348    * @param {Error|errors.CloudError} err Error object.
349    */
350   /**
351    * Close event ('``close``').
352    *
353    * *Implements* **[``Event 'close'``][0]**
354    * [0]: http://nodejs.org/docs/v0.4.9/api/streams.html#event_close_
355    *
356    * @name stream.WriteStream#event:close
357    * @event
358    */
359   /**
360    * Pipe event ('``pipe``').
361    *
362    * *Implements* **[``Event 'pipe'``][0]**
363    * [0]: http://nodejs.org/docs/v0.4.9/api/streams.html#event_pipe_
364    *
365    * @name stream.WriteStream#event:pipe
366    * @param   {Object} src A Readable Stream object.
367    * @event
368    */
369   /**
370    * Writable cloud stream.
371    *
372    * *Implements* **[``Writable Stream``][0]** interface.
373    * [0]: http://nodejs.org/docs/v0.4.9/api/streams.html#writable_Stream
374    *
375    * @param  {request.AuthenticatedRawRequest}
376    *                      request     Request object.
377    * @param   {Object}    [options]   Options object.
378    * @config  {Function}  [errorFn]   Function to call with error.
379    * @config  {Function}  [endFn]     Function to call with results. Called:
380    *                                  endFn(response).
381    * @exports WriteStream as stream.WriteStream
382    * @constructor
383    */
384   WriteStream = function (request, options) {
385     options = options || {};
386     var self = this;
387 
388     // Members.
389     self._request = request;
390     self._writable = true;
391     self._realRequest = request.realRequest;
392     self._requestDone = false;
393     self._errorHandled = false;
394     self._response = null;
395     self._buf = [];
396     self._errorFn = options.errorFn || null;
397     self._endFn = options.endFn || null;
398 
399     // Called state
400     self._endCalled = false;
401     self._destroyCalled = false;
402     self._destroySoonCalled = false;
403 
404     // Accumulate writes.
405     self._writeBuf = new Buffer(0);
406 
407     // Error handling
408     // TODO: Could have common Stream base class and share _handleErr.
409     self._handleErr = function (err, response) {
410       if (!self._errorHandled) {
411         self._errorHandled = true;
412         self._writable = false;
413         self._requestDone = true;
414         if (self._errorFn) {
415           self._errorFn(err, self, response);
416         } else {
417           self.emit('error', err);
418         }
419       }
420     };
421 
422     // Errors: straight pass-through (AuthReq has response).
423     self._request.on('error', function (err, response) {
424       self._handleErr(err, response);
425     });
426 
427     // Event binding.
428     self._realRequest.on('response', function (response) {
429       var encoding = 'utf8';
430 
431       // Store response and set encoding.
432       self._response = response;
433       response.setEncoding(encoding);
434 
435       // Add bindings.
436       response.on('data', function (chunk) {
437         self._buf.push(chunk);
438       });
439       response.on('end', function () {
440         var results = null,
441           msg,
442           err;
443 
444         // Set state.
445         self._writable = false;
446         self._requestDone = true;
447 
448         if (self._endFn) {
449           results = self._endFn(response);
450         }
451 
452         switch (response.statusCode) {
453         case 200:
454           self.emit('end', results, self._request.getMeta(response));
455           break;
456         default:
457           // Everything unknown is an error.
458           msg = utils.bufToStr(self._buf, encoding, 'utf8');
459           err = new CloudError(msg, { response: response });
460           self._handleErr(err, response);
461         }
462       });
463     });
464   };
465 
466   util.inherits(WriteStream, EventEmitter);
467 
468   /**
469    * *Implements* **[``writable``][0]**
470    * [0]: http://nodejs.org/docs/v0.4.9/api/streams.html#stream.writable
471    *
472    * "A boolean that is true by default, but turns false after an 'error'
473    * occurred or end() / destroy() was called."
474    *
475    * @name WriteStream.writable
476    * @type boolean
477    */
478   Object.defineProperties(WriteStream.prototype, {
479     writable: {
480       get: function () {
481         return this._writable;
482       }
483     }
484   });
485 
486   /**
487    * Write to internal buffer.
488    *
489    * Used to accumulate writes for real one-shot cloud PUT on ``end()``.
490    *
491    * @param {Buffer|string} value     Buffer / string to write.
492    * @param {string='utf8'} encoding  Encoding to use if string value.
493    * @private
494    */
495   WriteStream.prototype._addToBuffer = function (value, encoding) {
496     var self = this,
497       oldBuf,
498       oldBufLength,
499       newBuf,
500       newBufLength,
501       concatBuf;
502 
503     if (value) {
504       // Convert to buffer if not already.
505       if (typeof value === 'string') {
506         encoding = encoding || 'utf8';
507         value = new Buffer(value, encoding);
508       }
509 
510       // Append the buffer.
511       if (value instanceof Buffer) {
512         // Helper variables.
513         oldBuf = self._writeBuf;
514         oldBufLength = oldBuf.length;
515         newBuf = value;
516         newBufLength = value.length;
517 
518         // Copy the buffers.
519         concatBuf = new Buffer(oldBufLength + newBufLength);
520         oldBuf.copy(concatBuf, 0, 0, oldBufLength);
521         newBuf.copy(concatBuf, oldBufLength, 0, newBufLength);
522 
523         // Update the internal buffer.
524         self._writeBuf = concatBuf;
525 
526       } else {
527         throw new Error("Unknown value type: " + (typeof value));
528       }
529     }
530   };
531 
532   /**
533    * Write string/buffer to stream.
534    *
535    * Argument based options:
536    *
537    *  - ``write(string, encoding='utf8', [fd])``
538    *  - ``write(buffer)``
539    *
540    * **Note**: ``fd`` parameter is not currently supported.
541    *
542    * **Note**: AWS/GSFD does not support ``Transfer-Encoding: chunked``, so
543    * we accumulate buffers and write it all in one shot at the ``end()`` call.
544    * In the future, parallel upload might be supported.
545    *
546    * This also means that nothing actually happens on the network until an
547    * ``end()`` is called.
548    *
549    * *Implements* **[``write``][0]**
550    * [0]: http://nodejs.org/docs/v0.4.9/api/streams.html#stream.write
551    *
552    * "Writes string with the given encoding to the stream. Returns true if the
553    * string has been flushed to the kernel buffer. Returns false to indicate
554    * that the kernel buffer is full, and the data will be sent out in the
555    * future. The 'drain' event will indicate when the kernel buffer is empty
556    * again. The encoding defaults to 'utf8'.
557    *
558    * If the optional fd parameter is specified, it is interpreted as an
559    * integral file descriptor to be sent over the stream. This is only
560    * supported for UNIX streams, and is silently ignored otherwise. When
561    * writing a file descriptor in this manner, closing the descriptor before
562    * the stream drains risks sending an invalid (closed) FD."
563    *
564    * ... and ...
565    *
566    * "Same as the above except with a raw buffer."
567    *
568    * @param {Buffer|string} value     Buffer / string to write.
569    * @param {string='utf8'} encoding  Encoding to use if string value.
570    * @return {boolean} If buffer is available (always true).
571    */
572   WriteStream.prototype.write = function (value, encoding) {
573     if (this._writable) {
574       this._addToBuffer(value, encoding);
575     }
576 
577     return this._writable;
578   };
579 
580   /**
581    * *Implements* **[``end``][0]**
582    * [0]: http://nodejs.org/docs/v0.4.9/api/streams.html#stream.end
583    *
584    * "Terminates the stream with EOF or FIN."
585    *
586    * Argument based options:
587    *
588    *  - ``end()``
589    *  - ``end(string, encoding)``
590    *  - ``end(buffer)``
591    *
592    * @param {Buffer|string} value     Buffer / string to write.
593    * @param {string='utf8'} encoding  Encoding to use if string value.
594    */
595   WriteStream.prototype.end = function (value, encoding) {
596     var self = this,
597       req = self._request,
598       byteLength;
599 
600     if (self._writable && !self._endCalled) {
601       // Set object state.
602       self._endCalled = true;
603       self._writable = false;
604 
605       // Write any values to internal buffer.
606       self._addToBuffer(value, encoding);
607 
608       // Add headers for one-shot request, and actually end() request with
609       // the write buffer.
610       req.setHeader('content-length', self._writeBuf.length);
611       req.end(self._writeBuf);
612     }
613   };
614 
615   /**
616    * *Implements* **[``destroy``][0]**
617    * [0]: http://nodejs.org/docs/v0.4.9/api/streams.html#stream.destroy
618    *
619    * "Closes the underlying file descriptor. Stream will not emit any more
620    * events."
621    */
622   WriteStream.prototype.destroy = function () {
623     var self = this,
624       oldDestroyCalled = self._destroyCalled;
625 
626     self._destroyCalled = true;
627     if (!oldDestroyCalled) {
628       // Remove all listeners.
629       self.removeAllListeners('data');
630       self.removeAllListeners('end');
631       self.removeAllListeners('error');
632       self._realRequest.removeAllListeners('data');
633       self._realRequest.removeAllListeners('end');
634       self._realRequest.removeAllListeners('error');
635 
636       // Set sink for errors.
637       self.on('error', function () {});
638       self._realRequest.on('error', function () {});
639 
640       //  Close the connection.
641       self._requestDone = true;
642       if (self._realRequest && self._realRequest.connection) {
643         self._realRequest.connection.end();
644       }
645     }
646   };
647 
648   /**
649    * *Implements* **[``destroySoon``][0]**
650    * [0]: http://nodejs.org/docs/v0.4.9/api/streams.html#stream.destroySoon
651    *
652    * "After the write queue is drained, close the file descriptor.
653    * destroySoon() can still destroy straight away, as long as there is no
654    * data left in the queue for writes."
655    */
656   // TODO: Test WriteStream.destroySoon.
657   WriteStream.prototype.destroySoon = function () {
658     var self = this,
659       oldDestroySoonCalled = self._destroySoonCalled,
660       finishUp;
661 
662     self._destroySoonCalled = true;
663     if (!oldDestroySoonCalled && self._endCalled) {
664       if (self._endCalled) {
665         // Destroy immediately - we have all the write data.
666         self.destroy();
667 
668       } else {
669         // If end() is not called, we should wait a bit.
670         /** @private */
671         finishUp = function () {
672           self._requestDone = true;
673           self.destroy();
674         };
675 
676         self.on('end', finishUp);
677         self.on('error', finishUp);
678         self._request.on('end', finishUp);
679         self._request.on('error', finishUp);
680         self._realRequest.on('end', finishUp);
681         self._realRequest.on('error', finishUp);
682       }
683     }
684   };
685 
686   module.exports.WriteStream = WriteStream;
687 }());
688