This commit is contained in:
Bret Comnes
2020-02-12 16:58:30 -07:00
parent 383cb4ca53
commit 80459935e1
98 changed files with 9972 additions and 1 deletions

8
node_modules/streamx/.travis.yml generated vendored Normal file
View File

@@ -0,0 +1,8 @@
language: node_js
node_js:
- '10'
- '12'
os:
- windows
- osx
- linux

21
node_modules/streamx/LICENSE generated vendored Normal file
View File

@@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2019 Mathias Buus
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

366
node_modules/streamx/README.md generated vendored Normal file
View File

@@ -0,0 +1,366 @@
# streamx
An iteration of the Node.js core streams with a series of improvements.
```
npm install streamx
```
[![Build Status](https://travis-ci.org/mafintosh/streamx.svg?branch=master)](https://travis-ci.org/mafintosh/streamx)
## Main improvements from Node.js core stream
#### Proper lifecycle support.
Streams have an `_open` function that is called before any read/write operation and a `_destroy`
function that is always run as the last part of the stream.
This makes it easy to maintain state.
#### Easy error handling
Fully integrates a `.destroy()` function. When called the stream will wait for any
pending operation to finish and call the stream destroy logic.
Close is *always* the last event emitted and `destroy` is always ran.
#### `pipe()` error handles
`pipe` accepts a callback that is called when the pipeline is fully drained.
It also error handles the streams provided and destroys both streams if either
of them fail.
#### All streams are both binary and object mode streams
A `map` function can be provided to map your input data into buffers
or other formats. To indicate how much buffer space each data item takes
an `byteLength` function can be provided as well.
This removes the need for two modes of streams.
#### Simplicity
This is a full rewrite, all contained in one file.
Lots of stream methods are simplified based on how I and devs I work with actually use streams in the wild.
#### Backwards compat
streamx aims to be compatible with Node.js streams whenever it is reasonable to do so.
This means that streamx streams behave a lot like Node.js streams from the outside but still provides the
improvements above.
## Usage
``` js
const { Readable } = require('streamx')
const rs = new Readable({
read (cb) {
this.push('Cool data')
cb(null)
}
})
rs.on('data', data => console.log('data:', data))
```
## API
This streamx package contains 4 streams similar to Node.js core.
## Readable Stream
#### `rs = new stream.Readable([options])`
Create a new readable stream.
Options include:
```
{
highWaterMark: 16384, // max buffer size in bytes
map: (data) => data, // optional function to map input data
byteLength: (data) => size // optional function that calculates the byte size of input data
}
```
In addition you can pass the `open`, `read`, and `destroy` functions as shorthands in
the constructor instead of overwrite the methods below.
The default byteLength function returns the byte length of buffers and `1024`
for any other object. This means the buffer will contain around 16 non buffers
or buffers worth 16kb when full if the defaults are used.
#### `rs._read(cb)`
This function is called when the stream wants you to push new data.
Overwrite this and add your own read logic.
You should call the callback when you are fully done with the read.
Can also be set using `options.read` in the constructor.
Note that this function differs from Node.js streams in that it takes
the "read finished" callback.
#### `drained = rs.push(data)`
Push new data to the stream. Returns true if the buffer is not full
and you should push more data if you can.
If you call `rs.push(null)` you signal to the stream that no more
data will be pushed and that you want to end the stream.
#### `data = rs.read()`
Read a piece of data from the stream buffer. If the buffer is currently empty
`null` will be returned and you should wait for `readable` to be emitted before
trying again. If the stream has been ended it will also return `null`.
Note that this method differs from Node.js streams in that it does not accept
an optional amounts of bytes to consume.
#### `rs.unshift(data)`
Add a piece of data to the front of the buffer. Use this if you read too much
data using the `rs.read()` function.
#### `rs._open(cb)`
This function is called once before the first read is issued. Use this function
to implement your own open logic.
Can also be set using `options.open` in the constructor.
#### `rs._destroy(cb)`
This function is called just before the stream is fully destroyed. You should
use this to implement whatever teardown logic you need. The final part of the
stream life cycle is always to call destroy itself so this function will always
be called wheather or not the stream ends gracefully or forcefully.
Can also be set using `options.destroy` in the constructor.
Note that the `_destroy` might be called without the open function being called
in case no read was ever performed on the stream.
#### `rs._predestroy()`
A simple hook that is called as soon as the first `stream.destroy()` call is invoked.
Use this in case you need to cancel pending reads (if possible) instead of waiting for them to finish.
Can also be set using `options.predestroy` in the constructor.
#### `rs.destroy([error])`
Forcefully destroy the stream. Will call `_destroy` as soon as all pending reads have finished.
Once the stream is fully destroyed `close` will be emitted.
If you pass an error this error will be emitted just before `close` is, signifying a reason
as to why this stream was destroyed.
#### `rs.pause()`
Pauses the stream. You will only need to call this if you want to pause a resumed stream.
#### `rs.resume()`
Will start reading data from the stream as fast as possible.
If you do not call this, you need to use the `read()` method to read data or the `pipe()` method to
pipe the stream somewhere else or the `data` handler.
If none of these option are used the stream will stay paused.
#### `writableStream = rs.pipe(writableStream, [callback])`
Efficently pipe the readable stream to a writable stream (can be Node.js core stream or a stream from this package).
If you provide a callback the callback is called when the pipeline has fully finished with an optional error in case
it failed.
To cancel the pipeline destroy either of the streams.
#### `rs.on('readable')`
Emitted when data is pushed to the stream if the buffer was previously empty.
#### `rs.on('data', data)`
Emitted when data is being read from the stream. If you attach a data handler you are implicitly resuming the stream.
#### `rs.on('end')`
Emitted when the readable stream has ended and no data is left in it's buffer.
#### `rs.on('close')`
Emitted when the readable stream has fully closed (i.e. it's destroy function has completed)
#### `rs.on('error', err)`
Emitted if any of the stream operations fail with an error. `close` is always emitted right after this.
#### `rs.destroyed`
Boolean property indicating wheather or not this stream has been destroyed.
#### `bool = Readable.isBackpressured(rs)`
Static method to check if a readable stream is currently under backpressure.
### `stream = Readable.from(arrayOrBufferOrStringOrAsyncIterator)
Static method to turn an array or buffer or string or AsyncIterator into a readable stream.
## Writable Stream
#### `ws = new stream.Writable([options])`
Create a new writable stream.
Options include:
```
{
highWaterMark: 16384, // max buffer size in bytes
map: (data) => data, // optional function to map input data
byteLength: (data) => size // optional function that calculates the byte size of input data
}
```
In addition you can pass the `open`, `write`, `flush`, and `destroy` functions as shorthands in
the constructor instead of overwrite the methods below.
The default byteLength function returns the byte length of buffers and `1024`
for any other object. This means the buffer will contain around 16 non buffers
or buffers worth 16kb when full if the defaults are used.
#### `ws._open(cb)`
This function is called once before the first write is issued. Use this function
to implement your own open logic.
Can also be set using `options.open` in the constructor.
#### `ws._destroy(cb)`
This function is called just before the stream is fully destroyed. You should
use this to implement whatever teardown logic you need. The final part of the
stream life cycle is always to call destroy itself so this function will always
be called wheather or not the stream ends gracefully or forcefully.
Can also be set using `options.destroy` in the constructor.
Note that the `_destroy` might be called without the open function being called
in case no write was ever performed on the stream.
#### `ws._predestroy()`
A simple hook that is called as soon as the first `stream.destroy()` call is invoked.
Use this in case you need to cancel pending writes (if possible) instead of waiting for them to finish.
Can also be set using `options.predestroy` in the constructor.
#### `ws.destroy([error])`
Forcefully destroy the stream. Will call `_destroy` as soon as all pending reads have finished.
Once the stream is fully destroyed `close` will be emitted.
If you pass an error this error will be emitted just before `close` is, signifying a reason
as to why this stream was destroyed.
#### `drained = ws.write(data)`
Write a piece of data to the stream. Returns `true` if the stream buffer is not full and you
should keep writing to it if you can. If `false` is returned the stream will emit `drain`
once it's buffer is fully drained.
#### `ws._write(data, callback)`
This function is called when the stream want to write some data. Use this to implement your own
write logic. When done call the callback and the stream will call it again if more data exists in the buffer.
Can also be set using `options.write` in the constructor.
#### `ws._writev(batch, callback)`
Similar to `_write` but passes an array of all data in the current write buffer instead of the oldest one.
Useful if the destination you are writing the data to supports batching.
Can also be set using `options.writev` in the constructor.
#### `ws.end()`
Gracefully end the writable stream. Call this when you no longer want to write to the stream.
Once all writes have been fully drained `finish` will be emitted.
#### `ws._final(callback)`
This function is called just before `finish` is emitted, i.e. when all writes have flushed but `ws.end()`
have been called. Use this to implement any logic that should happen after all writes but before finish.
Can also be set using `options.final` in the constructor.
#### `ws.on('finish')`
Emitted when the stream has been ended and all writes have been drained.
#### `ws.on('close')`
Emitted when the readable stream has fully closed (i.e. it's destroy function has completed)
#### `ws.on('error', err)`
Emitted if any of the stream operations fail with an error. `close` is always emitted right after this.
#### `ws.destroyed`
Boolean property indicating wheather or not this stream has been destroyed.
#### `bool = Writable.isBackpressured(ws)`
Static method to check if a writable stream is currently under backpressure.
## Duplex Stream
#### `s = new stream.Duplex([options])`
A duplex stream is a stream that is both readable and writable.
Since JS does not support multiple inheritance it inherits directly from Readable
but implements the Writable API as well.
If you want to provide only a map function for the readable side use `mapReadable` instead.
If you want to provide only a byteLength function for the readable side use `byteLengthReadable` instead.
Same goes for the writable side but using `mapWritable` and `byteLengthWritable` instead.
## Transform Stream
#### `ts = new stream.Transform([options])`
A transform stream is a duplex stream that maps the data written to it and emits that as readable data.
Has the same options as a duplex stream except you can provide a `transform` function also.
#### `ts._transform(data, callback)`
Transform the incoming data. Call `callback(null, mappedData)` or use `ts.push(mappedData)` to
return data to the readable side of the stream.
Per default the transform function just remits the incoming data making it act as a pass-through stream.
## Contributing
If you want to help contribute to streamx a good way to start is to help writing more test
cases, compatibility tests, documentation, or performance benchmarks.
If in doubt open an issue :)
## License
MIT

73
node_modules/streamx/examples/fs.js generated vendored Normal file
View File

@@ -0,0 +1,73 @@
const fs = require('fs')
const { Writable, Readable } = require('../')
class FileWriteStream extends Writable {
constructor (filename, mode) {
super()
this.filename = filename
this.mode = mode
this.fd = 0
}
_open (cb) {
fs.open(this.filename, this.mode, (err, fd) => {
if (err) return cb(err)
this.fd = fd
cb(null)
})
}
_write (data, cb) {
fs.write(this.fd, data, 0, data.length, null, (err, written) => {
if (err) return cb(err)
if (written !== data.length) return this._write(data.slice(written), cb)
cb(null)
})
}
_destroy (cb) {
if (!this.fd) return cb()
fs.close(this.fd, cb)
}
}
class FileReadStream extends Readable {
constructor (filename) {
super()
this.filename = filename
this.fd = 0
}
_open (cb) {
fs.open(this.filename, 'r', (err, fd) => {
if (err) return cb(err)
this.fd = fd
cb(null)
})
}
_read (cb) {
let data = Buffer.alloc(16 * 1024)
fs.read(this.fd, data, 0, data.length, null, (err, read) => {
if (err) return cb(err)
if (read !== data.length) data = data.slice(0, read)
this.push(data.length ? data : null)
cb(null)
})
}
_destroy (cb) {
if (!this.fd) return cb()
fs.close(this.fd, cb)
}
}
// copy this file as an example
const rs = new FileReadStream(__filename)
const ws = new FileWriteStream(`${__filename}.cpy`, 'w')
rs.pipe(ws, function (err) {
console.log('file copied', err)
})

855
node_modules/streamx/index.js generated vendored Normal file
View File

@@ -0,0 +1,855 @@
const { EventEmitter } = require('events')
const STREAM_DESTROYED = new Error('Stream was destroyed')
const FIFO = require('fast-fifo')
/* eslint-disable no-multi-spaces */
const MAX = ((1 << 25) - 1)
// Shared state
const OPENING = 0b001
const DESTROYING = 0b010
const DESTROYED = 0b100
const NOT_OPENING = MAX ^ OPENING
// Read state
const READ_ACTIVE = 0b0000000000001 << 3
const READ_PRIMARY = 0b0000000000010 << 3
const READ_SYNC = 0b0000000000100 << 3
const READ_QUEUED = 0b0000000001000 << 3
const READ_RESUMED = 0b0000000010000 << 3
const READ_PIPE_DRAINED = 0b0000000100000 << 3
const READ_ENDING = 0b0000001000000 << 3
const READ_EMIT_DATA = 0b0000010000000 << 3
const READ_EMIT_READABLE = 0b0000100000000 << 3
const READ_EMITTED_READABLE = 0b0001000000000 << 3
const READ_DONE = 0b0010000000000 << 3
const READ_NEXT_TICK = 0b0100000000001 << 3 // also active
const READ_NEEDS_PUSH = 0b1000000000000 << 3
const READ_NOT_ACTIVE = MAX ^ READ_ACTIVE
const READ_NON_PRIMARY = MAX ^ READ_PRIMARY
const READ_NON_PRIMARY_AND_PUSHED = MAX ^ (READ_PRIMARY | READ_NEEDS_PUSH)
const READ_NOT_SYNC = MAX ^ READ_SYNC
const READ_PUSHED = MAX ^ READ_NEEDS_PUSH
const READ_PAUSED = MAX ^ READ_RESUMED
const READ_NOT_QUEUED = MAX ^ (READ_QUEUED | READ_EMITTED_READABLE)
const READ_NOT_ENDING = MAX ^ READ_ENDING
const READ_PIPE_NOT_DRAINED = MAX ^ (READ_RESUMED | READ_PIPE_DRAINED)
const READ_NOT_NEXT_TICK = MAX ^ READ_NEXT_TICK
// Write state
const WRITE_ACTIVE = 0b000000001 << 16
const WRITE_PRIMARY = 0b000000010 << 16
const WRITE_SYNC = 0b000000100 << 16
const WRITE_QUEUED = 0b000001000 << 16
const WRITE_UNDRAINED = 0b000010000 << 16
const WRITE_DONE = 0b000100000 << 16
const WRITE_EMIT_DRAIN = 0b001000000 << 16
const WRITE_NEXT_TICK = 0b010000001 << 16 // also active
const WRITE_FINISHING = 0b100000000 << 16
const WRITE_NOT_ACTIVE = MAX ^ WRITE_ACTIVE
const WRITE_NOT_SYNC = MAX ^ WRITE_SYNC
const WRITE_NON_PRIMARY = MAX ^ WRITE_PRIMARY
const WRITE_NOT_FINISHING = MAX ^ WRITE_FINISHING
const WRITE_DRAINED = MAX ^ WRITE_UNDRAINED
const WRITE_NOT_QUEUED = MAX ^ WRITE_QUEUED
const WRITE_NOT_NEXT_TICK = MAX ^ WRITE_NEXT_TICK
// Combined shared state
const ACTIVE = READ_ACTIVE | WRITE_ACTIVE
const NOT_ACTIVE = MAX ^ ACTIVE
const DONE = READ_DONE | WRITE_DONE
const DESTROY_STATUS = DESTROYING | DESTROYED
const OPEN_STATUS = DESTROY_STATUS | OPENING
const AUTO_DESTROY = DESTROY_STATUS | DONE
const NON_PRIMARY = WRITE_NON_PRIMARY & READ_NON_PRIMARY
// Combined read state
const READ_PRIMARY_STATUS = OPEN_STATUS | READ_ENDING | READ_DONE
const READ_STATUS = OPEN_STATUS | READ_DONE | READ_QUEUED
const READ_FLOWING = READ_RESUMED | READ_PIPE_DRAINED
const READ_ACTIVE_AND_SYNC = READ_ACTIVE | READ_SYNC
const READ_ACTIVE_AND_SYNC_AND_NEEDS_PUSH = READ_ACTIVE | READ_SYNC | READ_NEEDS_PUSH
const READ_PRIMARY_AND_ACTIVE = READ_PRIMARY | READ_ACTIVE
const READ_ENDING_STATUS = OPEN_STATUS | READ_ENDING | READ_QUEUED
const READ_EMIT_READABLE_AND_QUEUED = READ_EMIT_READABLE | READ_QUEUED
const READ_READABLE_STATUS = OPEN_STATUS | READ_EMIT_READABLE | READ_QUEUED | READ_EMITTED_READABLE
const SHOULD_NOT_READ = OPEN_STATUS | READ_ACTIVE | READ_ENDING | READ_DONE | READ_NEEDS_PUSH
const READ_BACKPRESSURE_STATUS = DESTROY_STATUS | READ_ENDING | READ_DONE
// Combined write state
const WRITE_PRIMARY_STATUS = OPEN_STATUS | WRITE_FINISHING | WRITE_DONE
const WRITE_QUEUED_AND_UNDRAINED = WRITE_QUEUED | WRITE_UNDRAINED
const WRITE_QUEUED_AND_ACTIVE = WRITE_QUEUED | WRITE_ACTIVE
const WRITE_DRAIN_STATUS = WRITE_QUEUED | WRITE_UNDRAINED | OPEN_STATUS | WRITE_ACTIVE
const WRITE_STATUS = OPEN_STATUS | WRITE_ACTIVE | WRITE_QUEUED
const WRITE_PRIMARY_AND_ACTIVE = WRITE_PRIMARY | WRITE_ACTIVE
const WRITE_ACTIVE_AND_SYNC = WRITE_ACTIVE | WRITE_SYNC
const WRITE_FINISHING_STATUS = OPEN_STATUS | WRITE_FINISHING | WRITE_QUEUED
const WRITE_BACKPRESSURE_STATUS = WRITE_UNDRAINED | DESTROY_STATUS | WRITE_FINISHING | WRITE_DONE
const asyncIterator = Symbol.asyncIterator || Symbol('asyncIterator')
class WritableState {
constructor (stream, { highWaterMark = 16384, map = null, mapWritable, byteLength, byteLengthWritable } = {}) {
this.stream = stream
this.queue = new FIFO()
this.highWaterMark = highWaterMark
this.buffered = 0
this.error = null
this.pipeline = null
this.byteLength = byteLengthWritable || byteLength || defaultByteLength
this.map = mapWritable || map
this.afterWrite = afterWrite.bind(this)
}
push (data) {
if (this.map !== null) data = this.map(data)
this.buffered += this.byteLength(data)
this.queue.push(data)
if (this.buffered < this.highWaterMark) {
this.stream._duplexState |= WRITE_QUEUED
return true
}
this.stream._duplexState |= WRITE_QUEUED_AND_UNDRAINED
return false
}
shift () {
const data = this.queue.shift()
const stream = this.stream
this.buffered -= this.byteLength(data)
if (this.buffered === 0) stream._duplexState &= WRITE_NOT_QUEUED
return data
}
end (data) {
if (data !== undefined && data !== null) this.push(data)
this.stream._duplexState = (this.stream._duplexState | WRITE_FINISHING) & WRITE_NON_PRIMARY
}
autoBatch (data, cb) {
const buffer = []
const stream = this.stream
buffer.push(data)
while ((stream._duplexState & WRITE_STATUS) === WRITE_QUEUED_AND_ACTIVE) {
buffer.push(stream._writableState.shift())
}
if ((stream._duplexState & OPEN_STATUS) !== 0) return cb(null)
stream._writev(buffer, cb)
}
update () {
const stream = this.stream
while ((stream._duplexState & WRITE_STATUS) === WRITE_QUEUED) {
const data = this.shift()
stream._duplexState |= WRITE_ACTIVE_AND_SYNC
stream._write(data, this.afterWrite)
stream._duplexState &= WRITE_NOT_SYNC
}
if ((stream._duplexState & WRITE_PRIMARY_AND_ACTIVE) === 0) this.updateNonPrimary()
}
updateNonPrimary () {
const stream = this.stream
if ((stream._duplexState & WRITE_FINISHING_STATUS) === WRITE_FINISHING) {
stream._duplexState = (stream._duplexState | WRITE_ACTIVE) & WRITE_NOT_FINISHING
stream._final(afterFinal.bind(this))
return
}
if ((stream._duplexState & DESTROY_STATUS) === DESTROYING) {
if ((stream._duplexState & ACTIVE) === 0) {
stream._duplexState |= ACTIVE
stream._destroy(afterDestroy.bind(this))
}
return
}
if ((stream._duplexState & OPEN_STATUS) === OPENING) {
stream._duplexState = (stream._duplexState | ACTIVE) & NOT_OPENING
stream._open(afterOpen.bind(this))
}
}
updateNextTick () {
if ((this.stream._duplexState & WRITE_NEXT_TICK) !== 0) return
this.stream._duplexState |= WRITE_NEXT_TICK
process.nextTick(updateWriteNT, this)
}
}
class ReadableState {
constructor (stream, { highWaterMark = 16384, map = null, mapReadable, byteLength, byteLengthReadable } = {}) {
this.stream = stream
this.queue = new FIFO()
this.highWaterMark = highWaterMark
this.buffered = 0
this.error = null
this.pipeline = null
this.byteLength = byteLengthReadable || byteLength || defaultByteLength
this.map = mapReadable || map
this.pipeTo = null
this.afterRead = afterRead.bind(this)
}
pipe (pipeTo, cb) {
if (this.pipeTo !== null) throw new Error('Can only pipe to one destination')
this.stream._duplexState |= READ_PIPE_DRAINED
this.pipeTo = pipeTo
this.pipeline = new Pipeline(this.stream, pipeTo, cb || null)
if (cb) this.stream.on('error', noop) // We already error handle this so supress crashes
if (isStreamx(pipeTo)) {
pipeTo._writableState.pipeline = this.pipeline
if (cb) pipeTo.on('error', noop) // We already error handle this so supress crashes
pipeTo.on('finish', this.pipeline.finished.bind(this.pipeline)) // TODO: just call finished from pipeTo itself
} else {
const onerror = this.pipeline.done.bind(this.pipeline, pipeTo)
const onclose = this.pipeline.done.bind(this.pipeline, pipeTo, null) // onclose has a weird bool arg
pipeTo.on('error', onerror)
pipeTo.on('close', onclose)
pipeTo.on('finish', this.pipeline.finished.bind(this.pipeline))
}
pipeTo.on('drain', afterDrain.bind(this))
this.stream.emit('pipe', pipeTo)
}
push (data) {
const stream = this.stream
if (data === null) {
this.highWaterMark = 0
stream._duplexState = (stream._duplexState | READ_ENDING) & READ_NON_PRIMARY_AND_PUSHED
return false
}
if (this.map !== null) data = this.map(data)
this.buffered += this.byteLength(data)
this.queue.push(data)
stream._duplexState = (stream._duplexState | READ_QUEUED) & READ_PUSHED
return this.buffered < this.highWaterMark
}
shift () {
const data = this.queue.shift()
this.buffered -= this.byteLength(data)
if (this.buffered === 0) this.stream._duplexState &= READ_NOT_QUEUED
return data
}
unshift (data) {
let tail
const pending = []
while ((tail === this.queue.shift()) !== undefined) {
pending.push(tail)
}
this.push(data)
for (let i = 0; i < pending.length; i++) {
this.queue.push(pending[i])
}
}
read () {
const stream = this.stream
if ((stream._duplexState & READ_STATUS) === READ_QUEUED) {
const data = this.shift()
if ((stream._duplexState & READ_EMIT_DATA) !== 0) stream.emit('data', data)
if (this.pipeTo !== null && this.pipeTo.write(data) === false) stream._duplexState &= READ_PIPE_NOT_DRAINED
return data
}
return null
}
drain () {
const stream = this.stream
while ((stream._duplexState & READ_STATUS) === READ_QUEUED && (stream._duplexState & READ_FLOWING) !== 0) {
const data = this.shift()
if ((stream._duplexState & READ_EMIT_DATA) !== 0) stream.emit('data', data)
if (this.pipeTo !== null && this.pipeTo.write(data) === false) stream._duplexState &= READ_PIPE_NOT_DRAINED
}
}
update () {
const stream = this.stream
this.drain()
while (this.buffered < this.highWaterMark && (stream._duplexState & SHOULD_NOT_READ) === 0) {
stream._duplexState |= READ_ACTIVE_AND_SYNC_AND_NEEDS_PUSH
stream._read(this.afterRead)
stream._duplexState &= READ_NOT_SYNC
if ((stream._duplexState & READ_ACTIVE) === 0) this.drain()
}
if ((stream._duplexState & READ_READABLE_STATUS) === READ_EMIT_READABLE_AND_QUEUED) {
stream._duplexState |= READ_EMITTED_READABLE
stream.emit('readable')
}
if ((stream._duplexState & READ_PRIMARY_AND_ACTIVE) === 0) this.updateNonPrimary()
}
updateNonPrimary () {
const stream = this.stream
if ((stream._duplexState & READ_ENDING_STATUS) === READ_ENDING) {
stream._duplexState = (stream._duplexState | READ_DONE) & READ_NOT_ENDING
stream.emit('end')
if ((stream._duplexState & AUTO_DESTROY) === DONE) stream._duplexState |= DESTROYING
if (this.pipeTo !== null) this.pipeTo.end()
}
if ((stream._duplexState & DESTROY_STATUS) === DESTROYING) {
if ((stream._duplexState & ACTIVE) === 0) {
stream._duplexState |= ACTIVE
stream._destroy(afterDestroy.bind(this))
}
return
}
if ((stream._duplexState & OPEN_STATUS) === OPENING) {
stream._duplexState = (stream._duplexState | ACTIVE) & NOT_OPENING
stream._open(afterOpen.bind(this))
}
}
updateNextTick () {
if ((this.stream._duplexState & READ_NEXT_TICK) !== 0) return
this.stream._duplexState |= READ_NEXT_TICK
process.nextTick(updateReadNT, this)
}
}
class TransformState {
constructor (stream) {
this.data = null
this.afterTransform = afterTransform.bind(stream)
this.afterFinal = null
}
}
class Pipeline {
constructor (src, dst, cb) {
this.from = src
this.to = dst
this.afterPipe = cb
this.error = null
this.pipeToFinished = false
}
finished () {
this.pipeToFinished = true
}
done (stream, err) {
if (err) this.error = err
if (stream === this.to) {
this.to = null
if (this.from !== null) {
if ((this.from._duplexState & READ_DONE) === 0 || !this.pipeToFinished) {
this.from.destroy(new Error('Writable stream closed prematurely'))
}
return
}
}
if (stream === this.from) {
this.from = null
if (this.to !== null) {
if ((stream._duplexState & READ_DONE) === 0) {
this.to.destroy(new Error('Readable stream closed before ending'))
}
return
}
}
if (this.afterPipe !== null) this.afterPipe(this.error)
this.to = this.from = this.afterPipe = null
}
}
function afterDrain () {
this.stream._duplexState |= READ_PIPE_DRAINED
if ((this.stream._duplexState & READ_ACTIVE_AND_SYNC) === 0) this.updateNextTick()
}
function afterFinal (err) {
const stream = this.stream
if (err) stream.destroy(err)
if ((stream._duplexState & DESTROY_STATUS) === 0) {
stream._duplexState |= WRITE_DONE
stream.emit('finish')
}
if ((stream._duplexState & AUTO_DESTROY) === DONE) {
stream._duplexState |= DESTROYING
}
stream._duplexState &= WRITE_NOT_ACTIVE
this.update()
}
function afterDestroy (err) {
const stream = this.stream
if (!err && this.error !== STREAM_DESTROYED) err = this.error
if (err) stream.emit('error', err)
stream._duplexState |= DESTROYED
stream.emit('close')
const rs = stream._readableState
const ws = stream._writableState
if (rs !== null && rs.pipeline !== null) rs.pipeline.done(stream, err)
if (ws !== null && ws.pipeline !== null) ws.pipeline.done(stream, err)
}
function afterWrite (err) {
const stream = this.stream
if (err) stream.destroy(err)
stream._duplexState &= WRITE_NOT_ACTIVE
if ((stream._duplexState & WRITE_DRAIN_STATUS) === WRITE_UNDRAINED) {
stream._duplexState &= WRITE_DRAINED
if ((stream._duplexState & WRITE_EMIT_DRAIN) === WRITE_EMIT_DRAIN) {
stream.emit('drain')
}
}
if ((stream._duplexState & WRITE_SYNC) === 0) this.update()
}
function afterRead (err) {
if (err) this.stream.destroy(err)
this.stream._duplexState &= READ_NOT_ACTIVE
if ((this.stream._duplexState & READ_SYNC) === 0) this.update()
}
function updateReadNT (rs) {
rs.stream._duplexState &= READ_NOT_NEXT_TICK
rs.update()
}
function updateWriteNT (ws) {
ws.stream._duplexState &= WRITE_NOT_NEXT_TICK
ws.update()
}
function afterOpen (err) {
const stream = this.stream
if (err) stream.destroy(err)
if ((stream._duplexState & DESTROYING) === 0) {
if ((stream._duplexState & READ_PRIMARY_STATUS) === 0) stream._duplexState |= READ_PRIMARY
if ((stream._duplexState & WRITE_PRIMARY_STATUS) === 0) stream._duplexState |= WRITE_PRIMARY
stream.emit('open')
}
stream._duplexState &= NOT_ACTIVE
if (stream._writableState !== null) {
stream._writableState.update()
}
if (stream._readableState !== null) {
stream._readableState.update()
}
}
function afterTransform (err, data) {
if (data !== undefined && data !== null) this.push(data)
this._writableState.afterWrite(err)
}
class Stream extends EventEmitter {
constructor (opts) {
super()
this._duplexState = 0
this._readableState = null
this._writableState = null
if (opts) {
if (opts.open) this._open = opts.open
if (opts.destroy) this._destroy = opts.destroy
if (opts.predestroy) this._predestroy = opts.predestroy
}
}
_open (cb) {
cb(null)
}
_destroy (cb) {
cb(null)
}
_predestroy () {
// does nothing
}
get readable () {
return this._readableState !== null
}
get writable () {
return this._writableState !== null
}
get destroyed () {
return (this._duplexState & DESTROYED) !== 0
}
get destroying () {
return (this._duplexState & DESTROY_STATUS) !== 0
}
destroy (err) {
if ((this._duplexState & DESTROY_STATUS) === 0) {
if (!err) err = STREAM_DESTROYED
this._duplexState = (this._duplexState | DESTROYING) & NON_PRIMARY
this._predestroy()
if (this._readableState !== null) {
this._readableState.error = err
this._readableState.updateNextTick()
}
if (this._writableState !== null) {
this._writableState.error = err
this._writableState.updateNextTick()
}
}
}
on (name, fn) {
if (this._readableState !== null) {
if (name === 'data') {
this._duplexState |= (READ_EMIT_DATA | READ_RESUMED)
this._readableState.updateNextTick()
}
if (name === 'readable') {
this._duplexState |= READ_EMIT_READABLE
this._readableState.updateNextTick()
}
}
if (this._writableState !== null) {
if (name === 'drain') {
this._duplexState |= WRITE_EMIT_DRAIN
this._writableState.updateNextTick()
}
}
return super.on(name, fn)
}
}
class Readable extends Stream {
constructor (opts) {
super(opts)
this._duplexState |= OPENING | WRITE_DONE
this._readableState = new ReadableState(this, opts)
if (opts) {
if (opts.read) this._read = opts.read
}
}
_read (cb) {
cb(null)
}
pipe (dest, cb) {
this._readableState.pipe(dest, cb)
this._readableState.updateNextTick()
return dest
}
read () {
this._readableState.updateNextTick()
return this._readableState.read()
}
push (data) {
this._readableState.updateNextTick()
return this._readableState.push(data)
}
unshift (data) {
this._readableState.updateNextTick()
return this._readableState.unshift(data)
}
resume () {
this._duplexState |= READ_RESUMED
this._readableState.updateNextTick()
}
pause () {
this._duplexState &= READ_PAUSED
}
static _fromAsyncIterator (ite) {
let destroy
const rs = new Readable({
read (cb) {
ite.next().then(push).then(cb.bind(null, null)).catch(cb)
},
predestroy () {
destroy = ite.return()
},
destroy (cb) {
destroy.then(cb.bind(null, null)).catch(cb)
}
})
return rs
function push (data) {
if (data.done) rs.push(null)
else rs.push(data.value)
}
}
static from (data) {
if (data[asyncIterator]) return this._fromAsyncIterator(data[asyncIterator]())
if (!Array.isArray(data)) data = data === undefined ? [] : [data]
let i = 0
return new Readable({
read (cb) {
this.push(i === data.length ? null : data[i++])
cb(null)
}
})
}
static isBackpressured (rs) {
return (rs._duplexState & READ_BACKPRESSURE_STATUS) !== 0 || rs._readableState.buffered >= rs._readableState.highWaterMark
}
[asyncIterator] () {
const stream = this
let error = null
let ended = false
let promiseResolve
let promiseReject
this.on('error', (err) => { error = err })
this.on('end', () => { ended = true })
this.on('close', () => call(error, null))
this.on('readable', () => call(null, stream.read()))
return {
[asyncIterator] () {
return this
},
next () {
return new Promise(function (resolve, reject) {
promiseResolve = resolve
promiseReject = reject
const data = stream.read()
if (data !== null) call(null, data)
})
},
return () {
stream.destroy()
}
}
function call (err, data) {
if (promiseReject === null) return
if (err) promiseReject(err)
else if (data === null && !ended) promiseReject(STREAM_DESTROYED)
else promiseResolve({ value: data, done: data === null })
promiseReject = promiseResolve = null
}
}
}
class Writable extends Stream {
constructor (opts) {
super(opts)
this._duplexState |= OPENING | READ_DONE
this._writableState = new WritableState(this, opts)
if (opts) {
if (opts.writev) this._writev = opts.writev
if (opts.write) this._write = opts.write
if (opts.final) this._final = opts.final
}
}
_writev (batch, cb) {
cb(null)
}
_write (data, cb) {
this._writableState.autoBatch(data, cb)
}
_final (cb) {
cb(null)
}
static isBackpressured (ws) {
return (ws._duplexState & WRITE_BACKPRESSURE_STATUS) !== 0
}
write (data) {
this._writableState.updateNextTick()
return this._writableState.push(data)
}
end (data) {
this._writableState.updateNextTick()
this._writableState.end(data)
}
}
class Duplex extends Readable { // and Writable
constructor (opts) {
super(opts)
this._duplexState = OPENING
this._writableState = new WritableState(this, opts)
if (opts) {
if (opts.writev) this._writev = opts.writev
if (opts.write) this._write = opts.write
if (opts.final) this._final = opts.final
}
}
_writev (batch, cb) {
cb(null)
}
_write (data, cb) {
this._writableState.autoBatch(data, cb)
}
_final (cb) {
cb(null)
}
write (data) {
this._writableState.updateNextTick()
return this._writableState.push(data)
}
end (data) {
this._writableState.updateNextTick()
this._writableState.end(data)
}
}
class Transform extends Duplex {
constructor (opts) {
super(opts)
this._transformState = new TransformState(this)
if (opts) {
if (opts.transform) this._transform = opts.transform
if (opts.flush) this._flush = opts.flush
}
}
_write (data, cb) {
if (this._readableState.buffered >= this._readableState.highWaterMark) {
this._transformState.data = data
} else {
this._transform(data, this._transformState.afterTransform)
}
}
_read (cb) {
if (this._transformState.data !== null) {
const data = this._transformState.data
this._transformState.data = null
cb(null)
this._transform(data, this._transformState.afterTransform)
} else {
cb(null)
}
}
_transform (data, cb) {
cb(null, data)
}
_flush (cb) {
cb(null)
}
_final (cb) {
this._transformState.afterFinal = cb
this._flush(transformAfterFlush.bind(this))
}
}
function transformAfterFlush (err, data) {
const cb = this._transformState.afterFinal
if (err) return cb(err)
if (data !== null && data !== undefined) this.push(data)
this.push(null)
cb(null)
}
function isStream (stream) {
return !!stream._readableState || !!stream._writableState
}
function isStreamx (stream) {
return typeof stream._duplexState === 'number' && isStream(stream)
}
function defaultByteLength (data) {
return Buffer.isBuffer(data) ? data.length : 1024
}
function noop () {}
module.exports = {
isStream,
isStreamx,
Stream,
Writable,
Readable,
Duplex,
Transform
}

56
node_modules/streamx/package.json generated vendored Normal file
View File

@@ -0,0 +1,56 @@
{
"_from": "streamx@^2.6.0",
"_id": "streamx@2.6.0",
"_inBundle": false,
"_integrity": "sha512-fDIdCkmCjhNt9/IIFL6UdJeqyOpDoX9rHXdRt5hpYQQ1owCSyVieQtTkbXcZAsASS9F5JrlxwKLHkv5w2GHVvA==",
"_location": "/streamx",
"_phantomChildren": {},
"_requested": {
"type": "range",
"registry": true,
"raw": "streamx@^2.6.0",
"name": "streamx",
"escapedName": "streamx",
"rawSpec": "^2.6.0",
"saveSpec": null,
"fetchSpec": "^2.6.0"
},
"_requiredBy": [
"/async-neocities"
],
"_resolved": "https://registry.npmjs.org/streamx/-/streamx-2.6.0.tgz",
"_shasum": "f7aa741edc8142666c12d09479e708fdc2a09c33",
"_spec": "streamx@^2.6.0",
"_where": "/Users/bret/repos/deploy-to-neocities/node_modules/async-neocities",
"author": {
"name": "Mathias Buus",
"url": "@mafintosh"
},
"bugs": {
"url": "https://github.com/mafintosh/streamx/issues"
},
"bundleDependencies": false,
"dependencies": {
"fast-fifo": "^1.0.0",
"nanoassert": "^2.0.0"
},
"deprecated": false,
"description": "An iteration of the Node.js core streams with a series of improvements",
"devDependencies": {
"end-of-stream": "^1.4.1",
"standard": "^14.3.1",
"tape": "^4.11.0"
},
"homepage": "https://github.com/mafintosh/streamx",
"license": "MIT",
"main": "index.js",
"name": "streamx",
"repository": {
"type": "git",
"url": "git+https://github.com/mafintosh/streamx.git"
},
"scripts": {
"test": "standard && tape test/*.js"
},
"version": "2.6.0"
}

21
node_modules/streamx/test/async-iterator.js generated vendored Normal file
View File

@@ -0,0 +1,21 @@
const tape = require('tape')
const { Readable } = require('../')
tape('streams are async iterators', async function (t) {
const data = ['a', 'b', 'c', null]
const expected = data.slice(0)
const r = new Readable({
read (cb) {
this.push(data.shift())
cb(null)
}
})
for await (const chunk of r) {
t.same(chunk, expected.shift())
}
t.same(expected, [null])
t.end()
})

105
node_modules/streamx/test/backpressure.js generated vendored Normal file
View File

@@ -0,0 +1,105 @@
const tape = require('tape')
const { Writable, Readable } = require('../')
tape('write backpressure', function (t) {
const ws = new Writable()
for (let i = 0; i < 15; i++) {
t.ok(ws.write('a'), 'not backpressured')
t.notOk(Writable.isBackpressured(ws), 'static check')
}
t.notOk(ws.write('a'), 'backpressured')
t.ok(Writable.isBackpressured(ws), 'static check')
t.end()
})
tape('write backpressure with drain', function (t) {
const ws = new Writable()
for (let i = 0; i < 15; i++) {
t.ok(ws.write('a'), 'not backpressured')
t.notOk(Writable.isBackpressured(ws), 'static check')
}
t.notOk(ws.write('a'), 'backpressured')
t.ok(Writable.isBackpressured(ws), 'static check')
ws.on('drain', function () {
t.notOk(Writable.isBackpressured(ws))
t.end()
})
})
tape('write backpressure with destroy', function (t) {
const ws = new Writable()
ws.write('a')
ws.destroy()
t.ok(Writable.isBackpressured(ws))
t.end()
})
tape('write backpressure with end', function (t) {
const ws = new Writable()
ws.write('a')
ws.end()
t.ok(Writable.isBackpressured(ws))
t.end()
})
tape('read backpressure', function (t) {
const rs = new Readable()
for (let i = 0; i < 15; i++) {
t.ok(rs.push('a'), 'not backpressured')
t.notOk(Readable.isBackpressured(rs), 'static check')
}
t.notOk(rs.push('a'), 'backpressured')
t.ok(Readable.isBackpressured(rs), 'static check')
t.end()
})
tape('read backpressure with later read', function (t) {
const rs = new Readable()
for (let i = 0; i < 15; i++) {
t.ok(rs.push('a'), 'not backpressured')
t.notOk(Readable.isBackpressured(rs), 'static check')
}
t.notOk(rs.push('a'), 'backpressured')
t.ok(Readable.isBackpressured(rs), 'static check')
rs.once('readable', function () {
rs.read()
t.notOk(Readable.isBackpressured(rs))
t.end()
})
})
tape('read backpressure with destroy', function (t) {
const rs = new Readable()
rs.push('a')
rs.destroy()
t.ok(Readable.isBackpressured(rs))
t.end()
})
tape('read backpressure with push(null)', function (t) {
const rs = new Readable()
rs.push('a')
rs.push(null)
t.ok(Readable.isBackpressured(rs))
t.end()
})

178
node_modules/streamx/test/compat.js generated vendored Normal file
View File

@@ -0,0 +1,178 @@
const eos = require('end-of-stream')
const tape = require('tape')
const stream = require('../')
const finished = require('stream').finished
run(eos)
run(finished)
function run (eos) {
if (!eos) return
const name = eos === finished ? 'nodeStream.finished' : 'eos'
tape(name + ' readable', function (t) {
const r = new stream.Readable()
let ended = false
r.on('end', function () {
ended = true
})
eos(r, function (err) {
t.error(err, 'no error')
t.ok(ended)
t.end()
})
r.push('hello')
r.push(null)
r.resume()
})
tape(name + ' readable destroy', function (t) {
const r = new stream.Readable()
let ended = false
r.on('end', function () {
ended = true
})
eos(r, function (err) {
t.ok(err, 'had error')
t.notOk(ended)
t.end()
})
r.push('hello')
r.push(null)
r.resume()
r.destroy()
})
tape(name + ' writable', function (t) {
const w = new stream.Writable()
let finished = false
w.on('finish', function () {
finished = true
})
eos(w, function (err) {
t.error(err, 'no error')
t.ok(finished)
t.end()
})
w.write('hello')
w.end()
})
tape(name + ' writable destroy', function (t) {
const w = new stream.Writable()
let finished = false
w.on('finish', function () {
finished = true
})
eos(w, function (err) {
t.ok(err, 'had error')
t.notOk(finished)
t.end()
})
w.write('hello')
w.end()
w.destroy()
})
tape(name + ' duplex', function (t) {
const s = new stream.Duplex()
let ended = false
let finished = false
s.on('end', () => { ended = true })
s.on('finish', () => { finished = true })
eos(s, function (err) {
t.error(err, 'no error')
t.ok(ended)
t.ok(finished)
t.end()
})
s.push('hello')
s.push(null)
s.resume()
s.end()
})
tape(name + ' duplex + deferred s.end()', function (t) {
const s = new stream.Duplex()
let ended = false
let finished = false
s.on('end', function () {
ended = true
setImmediate(() => s.end())
})
s.on('finish', () => { finished = true })
eos(s, function (err) {
t.error(err, 'no error')
t.ok(ended)
t.ok(finished)
t.end()
})
s.push('hello')
s.push(null)
s.resume()
})
tape(name + ' duplex + deferred s.push(null)', function (t) {
const s = new stream.Duplex()
let ended = false
let finished = false
s.on('finish', function () {
finished = true
setImmediate(() => s.push(null))
})
s.on('end', () => { ended = true })
eos(s, function (err) {
t.error(err, 'no error')
t.ok(ended)
t.ok(finished)
t.end()
})
s.push('hello')
s.end()
s.resume()
})
tape(name + ' duplex destroy', function (t) {
const s = new stream.Duplex()
let ended = false
let finished = false
s.on('end', () => { ended = true })
s.on('finish', () => { finished = true })
eos(s, function (err) {
t.ok(err, 'had error')
t.notOk(ended)
t.notOk(finished)
t.end()
})
s.push('hello')
s.push(null)
s.resume()
s.end()
s.destroy()
})
}

134
node_modules/streamx/test/pipe.js generated vendored Normal file
View File

@@ -0,0 +1,134 @@
const tape = require('tape')
const compat = require('stream')
const { Readable, Writable } = require('../')
tape('pipe to node stream', function (t) {
const expected = [
'hi',
'ho'
]
const r = new Readable()
const w = new compat.Writable({
objectMode: true,
write (data, enc, cb) {
t.same(data, expected.shift())
cb(null)
}
})
r.push('hi')
r.push('ho')
r.push(null)
r.pipe(w)
w.on('finish', function () {
t.same(expected.length, 0)
t.end()
})
})
tape('pipe with callback - error case', function (t) {
const r = new Readable()
const w = new Writable({
write (data, cb) {
cb(new Error('blerg'))
}
})
r.pipe(w, function (err) {
t.pass('callback called')
t.same(err, new Error('blerg'))
t.end()
})
r.push('hello')
r.push('world')
r.push(null)
})
tape('pipe with callback - error case with destroy', function (t) {
const r = new Readable()
const w = new Writable({
write (data, cb) {
w.destroy(new Error('blerg'))
cb(null)
}
})
r.pipe(w, function (err) {
t.pass('callback called')
t.same(err, new Error('blerg'))
t.end()
})
r.push('hello')
r.push('world')
})
tape('pipe with callback - error case node stream', function (t) {
const r = new Readable()
const w = new compat.Writable({
write (data, enc, cb) {
cb(new Error('blerg'))
}
})
r.pipe(w, function (err) {
t.pass('callback called')
t.same(err, new Error('blerg'))
t.end()
})
r.push('hello')
r.push('world')
r.push(null)
})
tape('simple pipe', function (t) {
const buffered = []
const r = new Readable()
const w = new Writable({
write (data, cb) {
buffered.push(data)
cb(null)
},
final () {
t.pass('final called')
t.same(buffered, ['hello', 'world'])
t.end()
}
})
r.pipe(w)
r.push('hello')
r.push('world')
r.push(null)
})
tape('pipe with callback', function (t) {
const buffered = []
const r = new Readable()
const w = new Writable({
write (data, cb) {
buffered.push(data)
cb(null)
}
})
r.pipe(w, function (err) {
t.pass('callback called')
t.same(err, null)
t.same(buffered, ['hello', 'world'])
t.end()
})
r.push('hello')
r.push('world')
r.push(null)
})

119
node_modules/streamx/test/readable.js generated vendored Normal file
View File

@@ -0,0 +1,119 @@
const tape = require('tape')
const { Readable } = require('../')
tape('ondata', function (t) {
const r = new Readable()
const buffered = []
let ended = 0
r.push('hello')
r.push('world')
r.push(null)
r.on('data', data => buffered.push(data))
r.on('end', () => ended++)
r.on('close', function () {
t.pass('closed')
t.same(buffered, ['hello', 'world'])
t.same(ended, 1)
t.ok(r.destroyed)
t.end()
})
})
tape('resume', function (t) {
const r = new Readable()
let ended = 0
r.push('hello')
r.push('world')
r.push(null)
r.resume()
r.on('end', () => ended++)
r.on('close', function () {
t.pass('closed')
t.same(ended, 1)
t.ok(r.destroyed)
t.end()
})
})
tape('shorthands', function (t) {
t.plan(3 + 1)
const r = new Readable({
read (cb) {
this.push('hello')
cb(null)
},
destroy (cb) {
t.pass('destroyed')
cb(null)
}
})
r.once('readable', function () {
t.same(r.read(), 'hello')
t.same(r.read(), 'hello')
r.destroy()
t.same(r.read(), null)
})
})
tape('both push and the cb needs to be called for re-reads', function (t) {
t.plan(2)
let once = true
const r = new Readable({
read (cb) {
t.ok(once, 'read called once')
once = false
cb(null)
}
})
r.resume()
setTimeout(function () {
once = true
r.push('hi')
}, 100)
})
tape('from array', function (t) {
const inc = []
const r = Readable.from([1, 2, 3])
r.on('data', data => inc.push(data))
r.on('end', function () {
t.same(inc, [1, 2, 3])
t.end()
})
})
tape('from buffer', function (t) {
const inc = []
const r = Readable.from(Buffer.from('hello'))
r.on('data', data => inc.push(data))
r.on('end', function () {
t.same(inc, [Buffer.from('hello')])
t.end()
})
})
tape('from async iterator', function (t) {
async function * test () {
yield 1
yield 2
yield 3
}
const inc = []
const r = Readable.from(test())
r.on('data', data => inc.push(data))
r.on('end', function () {
t.same(inc, [1, 2, 3])
t.end()
})
})

103
node_modules/streamx/test/writable.js generated vendored Normal file
View File

@@ -0,0 +1,103 @@
const tape = require('tape')
const { Writable } = require('../')
tape('opens before writes', function (t) {
t.plan(2)
const trace = []
const stream = new Writable({
open (cb) {
trace.push('open')
return cb(null)
},
write (data, cb) {
trace.push('write')
return cb(null)
}
})
stream.on('close', () => {
t.equals(trace.length, 2)
t.equals(trace[0], 'open')
})
stream.write('data')
stream.end()
})
tape('drain', function (t) {
const stream = new Writable({
highWaterMark: 1,
write (data, cb) {
cb(null)
}
})
t.notOk(stream.write('a'))
stream.on('drain', function () {
t.pass('drained')
t.end()
})
})
tape('drain multi write', function (t) {
t.plan(4)
const stream = new Writable({
highWaterMark: 1,
write (data, cb) {
cb(null)
}
})
t.notOk(stream.write('a'))
t.notOk(stream.write('a'))
t.notOk(stream.write('a'))
stream.on('drain', function () {
t.pass('drained')
t.end()
})
})
tape('drain async write', function (t) {
let flushed = false
const stream = new Writable({
highWaterMark: 1,
write (data, cb) {
setImmediate(function () {
flushed = true
cb(null)
})
}
})
t.notOk(stream.write('a'))
t.notOk(flushed)
stream.on('drain', function () {
t.ok(flushed)
t.end()
})
})
tape('writev', function (t) {
const expected = [[], ['ho']]
const s = new Writable({
writev (batch, cb) {
t.same(batch, expected.shift())
cb(null)
}
})
for (let i = 0; i < 100; i++) {
expected[0].push('hi-' + i)
s.write('hi-' + i)
}
s.on('drain', function () {
s.write('ho')
s.end()
})
s.on('finish', function () {
t.end()
})
})