75 lines
No EOL
2.5 KiB
JavaScript
75 lines
No EOL
2.5 KiB
JavaScript
"use strict";
|
|
Object.defineProperty(exports, "__esModule", { value: true });
|
|
exports.SubscribableStream = void 0;
|
|
/**
|
|
* @module botframework-streaming
|
|
*/
|
|
/**
|
|
* Copyright (c) Microsoft Corporation. All rights reserved.
|
|
* Licensed under the MIT License.
|
|
*/
|
|
const stream_1 = require("stream");
|
|
/**
|
|
* An extension of `Duplex` that operates in conjunction with a `PayloadAssembler` to convert raw bytes into a consumable form.
|
|
*/
|
|
class SubscribableStream extends stream_1.Duplex {
|
|
/**
|
|
* Initializes a new instance of the [SubscribableStream](xref:botframework-streaming.SubscribableStream) class.
|
|
*
|
|
* @param options The `DuplexOptions` to use when constructing this stream.
|
|
*/
|
|
constructor(options) {
|
|
super(options);
|
|
this.length = 0;
|
|
this.bufferList = [];
|
|
}
|
|
/**
|
|
* Writes data to the buffered list.
|
|
* All calls to writable.write() that occur between the time writable._write() is called and the callback is called will cause the written data to be buffered.
|
|
*
|
|
* @param chunk The Buffer to be written.
|
|
* @param _encoding The encoding. Unused in the implementation of Duplex.
|
|
* @param callback Callback for when this chunk of data is flushed.
|
|
*/
|
|
_write(chunk, _encoding, callback) {
|
|
const buffer = Buffer.from(chunk);
|
|
this.bufferList.push(buffer);
|
|
this.length += chunk.length;
|
|
if (this._onData) {
|
|
this._onData(buffer);
|
|
}
|
|
callback();
|
|
}
|
|
/**
|
|
* Reads the buffered list.
|
|
* Once the readable._read() method has been called, it will not be called again until more data is pushed through the readable.push() method.
|
|
* Empty data such as empty buffers and strings will not cause readable._read() to be called.
|
|
*
|
|
* @param size Number of bytes to read.
|
|
*/
|
|
_read(size) {
|
|
if (this.bufferList.length === 0) {
|
|
// null signals end of stream
|
|
this.push(null);
|
|
}
|
|
else {
|
|
let total = 0;
|
|
while (total < size && this.bufferList.length > 0) {
|
|
const buffer = this.bufferList[0];
|
|
this.push(buffer);
|
|
this.bufferList.splice(0, 1);
|
|
total += buffer.length;
|
|
}
|
|
}
|
|
}
|
|
/**
|
|
* Subscribes to the stream when receives data.
|
|
*
|
|
* @param onData Callback to be called when onData is executed.
|
|
*/
|
|
subscribe(onData) {
|
|
this._onData = onData;
|
|
}
|
|
}
|
|
exports.SubscribableStream = SubscribableStream;
|
|
//# sourceMappingURL=subscribableStream.js.map
|