Methods
(static) backlogLimit(source, count) → {Iterable}
- Source:
import {map} from 'async_iter/pipeline/backlog_limit' # pipeline version
import {map} from 'async_iter/backlog_limit' # conventional version
Buffers items upto a limit count to pass onto consumer.
Supports only async iterations
Useful when your consumer is slower than your producer, and causes source items to be dropped.
Parameters:
Name | Type | Description |
---|---|---|
source |
Iterable | The source iteration to process |
count |
Number | The max number of items to buffer |
Returns:
The limited items
- Type
- Iterable
(static) broadcast(source) → {function}
- Source:
import {broadcast} from 'async_iter/pipeline/broadcast' # pipeline version
import {broadcast} from 'async_iter/broadcast' # conventional version
Returns a generator function that will subscribe to the source iteration
Each generator function, will iterate over the same source values
- No queing of values, so each consumer will be made to wait for all other consumers
- The source iteration is not started, until at least one subscription has started consuming
- The source iteration is paused, if all consumers are stopped. Any new subscriptions will continue from where the source iteraion was iterated to
Parameters:
Name | Type | Description |
---|---|---|
source |
Iterable | The source iteration to broadcast to all subscribers |
Returns:
a generator function to create an iterable of the source items
.return - A function to close all consumer iterators and close the source iteration.
- Type
- function
(static) bufferBy(source, trigger, maxWaitTime) → {Iterable}
- Source:
- See:
-
- also bufferGroupBy
import {bufferBy} from 'async_iter/pipeline/buffer_by' # pipeline version
import {bufferBy} from 'async_iter/buffer_by' # conventional version
Collect a set of items from source. Emit as an array of those items.
The batch is produced, when the trigger
returns true, or the maxWaitTime
has elasped since the last emitted value
Parameters:
Name | Type | Description |
---|---|---|
source |
Iterable | The source iteration to buffer |
trigger |
triggerCallback | Called for each item in the source iteration. Return true to trigger a batch |
maxWaitTime |
Bumber | period is milliseconds to trigger a batch, if no batch has been emitted and there are pending values |
Returns:
The buffered items
- Type
- Iterable
(static) bufferGroupBy(source, selector, trigger, maxWaitTime) → {Iterable}
- Source:
- See:
-
- also bufferBy
import {bufferGroupBy} from 'async_iter/pipeline/buffer_group_by' # pipeline version
import {bufferGroupBy} from 'async_iter/buffer_group_by' # conventional version
Collect and group items as per a trigger function or a time period. Emits an array of the batched items.
Similar to the bufferBy
only the batches emitted are as per the selector
grouping function.
Parameters:
Name | Type | Description |
---|---|---|
source |
Iterable | The source iteration to buffer |
selector |
function | A function that returns the key to be used to identify grouping |
trigger |
triggerCallback | A function to indicate when a grouped items should be emitted |
maxWaitTime |
number | The minimum period of time (ms), before any pending grouped items should be emitted |
Returns:
The buffered items
- Type
- Iterable
(static) byLines(source) → {Iterable}
- Source:
import {map} from 'async_iter/pipeline/by_lines' # pipeline version
import {map} from 'async_iter/by_lines' # conventional version
Take a source iterations of strings, and split and emit based on the newline character
Supports both sync and async iterations
This operator can be used to re-contructs a set of lines that have been piped thru streams.
Parameters:
Name | Type | Description |
---|---|---|
source |
Iterable | The source iteration to process |
Returns:
The transformed items
- Type
- Iterable
(static) chunk(source, count) → {Iterable}
import {map} from 'async_iter/pipeline/chunk' # pipeline version
import {map} from 'async_iter/chunk' # conventional version
Emit a batch of count
items from the source iterable
Supports both sync and async iterations
Parameters:
Name | Type | Description |
---|---|---|
source |
Iterable | The source iteration to process |
count |
Number | The number of items to be batched together into a single value |
Returns:
The chunked items
- Type
- Iterable
(static) combineWhen(source, fn, combine) → {Iterable}
- Source:
import {map} from 'async_iter/pipeline/combineWhen' # pipeline version
import {map} from 'async_iter/combineWhen' # conventional version
Collect items, combining
them, until the evaluation function return true.
Supports both sync and async iterations
Parameters:
Name | Type | Description |
---|---|---|
source |
Iterable | The source iteration to process |
fn |
evaluateItem | A evaluator function, that upon returing true, causes all items collected to this point to be emitted |
combine |
function | An optional function to combine two items (defaults to (a, b) => a + b) |
Returns:
Emits as per the combindation functions
- Type
- Iterable
(static) concat(sources) → {Iterable}
import {concat} from 'async_iter/pipeline/concat' # pipeline version
import {concat} from 'async_iter/concat' # conventional version
Combines multiple source iterations into a single iteration
Supports both sync and async iterations
Parameters:
Name | Type | Description |
---|---|---|
sources |
Iterable | The source iterations |
Returns:
An iterable that combines all values from the sources
- Type
- Iterable
(static) concatMap(source, fn) → {F}
- Source:
import {concatMap} from 'async_iter/pipeline/concat_map' # pipeline version
import {concatMap} from 'async_iter/concat_map' # conventional version
Alternative name: flatMap
Joins an inner iterable with the use of a mapping function.
Supports both sync and async iterations
Parameters:
Name | Type | Description |
---|---|---|
source |
Iterable | The source iteration |
fn |
function | A function called for each item in the source iteration and returns an iterable |
Returns:
An iterable that has flatten the inner iterable values
- Type
- F
(static) count(source) → {Number}
import {count} from 'async_iter/pipeline/count' # pipeline version
import {count} from 'async_iter/count' # conventional version
Iterates thru the supplied iterable's set and returns the count
Supports both sync and async iterations
Parameters:
Name | Type | Description |
---|---|---|
source |
Iterable | The source iteration to count |
Returns:
The count of items
- Type
- Number
(static) delay(source, period) → {Iterable}
import {delay} from 'async_iter/pipeline/delay' # pipeline version
import {delay} from 'async_iter/delay' # conventional version
Re-emits all values from source, with a time delay between each item
Parameters:
Name | Type | Description |
---|---|---|
source |
Iterable | The source iteration |
period |
Number | The time in miliiseconds to pause between each item |
Returns:
The delayed iterable items
- Type
- Iterable
(static) delayUntil(source, Date) → {Iterable}
- Source:
import {delayUntil} from 'async_iter/pipeline/delay_until' # pipeline version
import {delayUntil} from 'async_iter/delay_until' # conventional version
Waits for the specific time point, and then re-emits the source items
Parameters:
Name | Type | Description |
---|---|---|
source |
Iterable | The source iteration |
Date |
date | The timepoint to wait until the source iterations is returned. |
Returns:
The delayed iterable
- Type
- Iterable
(static) entries(source) → {Iterable}
- Source:
import {entries} from 'async_iter/pipeline/entries' # pipeline version
import {entries} from 'async_iter/entries' # conventional version
Equivalent to Array.prototype.entries
Supports both sync and async iterations
Emits the items with a key/value pair [index, value].
Parameters:
Name | Type | Description |
---|---|---|
source |
Iterable | The source iteration to process |
Returns:
The transformed items
- Type
- Iterable
(static) every(source, fn) → {Boolean}
import {every} from 'async_iter/pipeline/every' # pipeline version
import {every} from 'async_iter/every' # conventional version
Return true if every item satisfies the supplied test function
Supports both sync and async iterations
Parameters:
Name | Type | Description |
---|---|---|
source |
Iterable | The source iteration to be tested |
fn |
itemTest | A function called for each item in the source |
Returns:
True if all items are satisfied
- Type
- Boolean
(static) filter(source, fn, missingValueFnopt) → {Iterable}
The optional missingValueFn
allows for processing the 'filtered' items into a single item.
eg: Given an input source of [1, 2, 3, 4, 5, 6] and a filter of x >= 5, then the missingValueFn will be called with values of (1, 5) and the result of that function will be emitted before item 6.
The examples include use of the missingValueFn
function
Parameters:
Name | Type | Attributes | Description |
---|---|---|---|
source |
Iterable | The source iteration to filter |
|
fn |
function | A funntion invokved for each item, returning false for items to be removed |
|
missingValueFn |
missingValueFn |
<optional> |
When supplied, will be invokved for each group of items that are filter - returns a item to be emitted, representing the filtered items |
Returns:
The filtered items
- Type
- Iterable
(static) find(source, fn) → {*}
import {find} from 'async_iter/pipeline/find' # pipeline version
import {find} from 'async_iter/find' # conventional version
Return the first item in the source iterations, when the supplied function returns true
Supports both sync and async iterations
Parameters:
Name | Type | Description |
---|---|---|
source |
Iterable | The source iteration |
fn |
itemTest | A function invokved for each item until true is returned |
Returns:
The found item or undefined if nothing found
- Type
- *
(static) first(source) → {*}
import {first} from 'async_iter/pipeline/first' # pipeline version
import {first} from 'async_iter/first' # conventional version
Returns the first item of the source iterable
Supports both sync and async iterations
Parameters:
Name | Type | Description |
---|---|---|
source |
Iterable | The source iteration |
Returns:
The first item
- Type
- *
(static) flatMap()
- Source:
- See:
-
also concatMap
Alias for concatMap
(static) forEach(source, fn) → {Iterable}
- Source:
import {forEach} from 'async_iter/pipeline/for_each' # pipeline version
import {forEach} from 'async_iter/for_each' # conventional version
Calls the supplied function for each item within the source iterable
Supports both sync and async iterations
Parameters:
Name | Type | Description |
---|---|---|
source |
Iterable | The source iteration |
fn |
itemCallback | Function to be called for each item |
Returns:
The source iteration
- Type
- Iterable
(static) last(source) → {*}
import {last} from 'async_iter/pipeline/last' # pipeline version
import {last} from 'async_iter/last' # conventional version
Returns the last item of the source iterable
Supports both sync and async iterations
Parameters:
Name | Type | Description |
---|---|---|
source |
Iterable | The source iteration |
Returns:
The last item
- Type
- *
(static) map(source, fn) → {Iterable}
import {map} from 'async_iter/pipeline/map' # pipeline version
import {map} from 'async_iter/map' # conventional version
Transforms each item in the supplied iteration using the supplied function
Supports both sync and async iterations
Parameters:
Name | Type | Description |
---|---|---|
source |
Iterable | The source iteration to map |
fn |
function | The mapping function for each item |
Returns:
The transformed items
- Type
- Iterable
(static) max(source, fnopt) → {*}
import {max} from 'async_iter/pipeline/max' # pipeline version
import {max} from 'async_iter/max' # conventional version
Returns the max value of the source iterable items
Supports both sync and async iterations
Parameters:
Name | Type | Attributes | Description |
---|---|---|---|
source |
Iterable | The source iteration |
|
fn |
minMaxCallback |
<optional> |
An function to determine value order |
Returns:
The max item
- Type
- *
(static) merge(sources) → {Iterable}
import {merge} from 'async_iter/pipeline/merge' # pipeline version
import {merge} from 'async_iter/merge' # conventional version
Merges the source iterables items into a single iterable. Order is as they come.
If all source iterables are stopped, then the returned iteration is stopped
If the consumer stops the iteration, all source iterations are stopped.
Parameters:
Name | Type | Description |
---|---|---|
sources |
Iterable | The source iterations |
Returns:
An iterable that combines all values from the sources
- Type
- Iterable
(static) min(source, fnopt) → {*}
import {min} from 'async_iter/pipeline/min' # pipeline version
import {min} from 'async_iter/min' # conventional version
Returns the min value of the source iterable items
Supports both sync and async iterations
Parameters:
Name | Type | Attributes | Description |
---|---|---|---|
source |
Iterable | The source iteration |
|
fn |
minMaxCallback |
<optional> |
An function to determine value order |
Returns:
The min item
- Type
- *
(static) persisted(source, path, opts) → {Iterable.<PersistedItem>}
- Source:
import {persisted} from 'async_iter/pipeline/persisted' # pipeline version
import {persisted} from 'async_iter/persisted' # conventional version
Persist items of an async iterator to files for later resumption
NB:
Persisted
will consume items as fast as the source will emit. The consumer of the iteration will be 'decoupled' from the source
Example
import {persisted} from 'async_iter'
const items = await persisted(source, './tmp/buffering_example')
for await (const item of items) {
console.log(item.value.toString())
item.completed() // If this function is not called, item will be processed if iterator restarted
}
Parameters:
Name | Type | Description | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
source |
Iterable | The source iteration |
||||||||||||||||||||
path |
String | is a directory for storage of items |
||||||||||||||||||||
opts |
Object | a set of optional flags Properties
|
Returns:
An async iteration, where each item resolves to an object containing
- Type
- Iterable.<PersistedItem>
(static) rateLimit(source, maxAmount, perPeriod, counter=opt) → {Iterable}
- Source:
import {rateLimit} from 'async_iter/pipeline/rateLimit' # pipeline version
import {rateLimit} from 'async_iter/rateLimit' # conventional version
Emits the values from the source iteration at upto a limited rate
Supports both sync and async iterations. Always returns an async iterator
The source iteration will be consumed only as required - there is no queing within rateLimit function
The definition of unit is defined by the optional functioncounter
. The default will be 1 iteration equals 1 unit
Example
import {rateLimit} from 'async_iter/pipeline'
// Emit at no more than 5 characters per 2s
const items = ['first', 'second', 'third', 'fourth', 'fifth']
|> rateLimit(5, 2000, v => v.toString().length)
Parameters:
Name | Type | Attributes | Description |
---|---|---|---|
source |
Iterable | The source iteration to rate limit |
|
maxAmount |
Number | the maxmimum number of 'units' to be emitted within the time of |
|
perPeriod |
Number | the period in milliseconds to be applied |
|
counter= |
function |
<optional> |
an optional callback function, called for each item |
Returns:
The iterable that will not emit at a rate greater than specified
- Type
- Iterable
(static) reduce(source, fn, initialValue) → {*}
import {reduce} from 'async_iter/pipeline/reduce' # pipeline version
import {reduce} from 'async_iter/reduce' # conventional version
Apply reducer function to each item of the source iterable, returning the single reduce value
Supports both sync and async iterations. Always returns an async iterator
Parameters:
Name | Type | Description |
---|---|---|
source |
Iterable | The source iteration to rate limit |
fn |
reducerCallback | The function to call for each item |
initialValue |
* | The initial value to supply to the first call of fn |
Returns:
The final reduced value
- Type
- *
(static) take(source, count) → {Iterable}
import {take} from 'async_iter/pipeline/take' # pipeline version
import {take} from 'async_iter/take' # conventional version
Takes a number of items from the source iteration
Supports both sync and async iterations
Example
import {take} from 'async_iter'
const items = take([1, 2, 3, 4, 5], 3)
for await (const item of items)
console.log(item)
Parameters:
Name | Type | Description |
---|---|---|
source |
Iterable | The source iteration |
count |
Number | The number to take from the source |
Returns:
A set containing upto the count
of items
- Type
- Iterable
(static) takeUntil(source, fn) → {Iterable}
- Source:
import {takeUntil} from 'async_iter/pipeline/take_until' # pipeline version
import {takeUntil} from 'async_iter/take_until' # conventional version
Takes a number of items from the source iteration
Supports both sync and async iterations
Parameters:
Name | Type | Description |
---|---|---|
source |
Iterable | The source iteration |
fn |
evaluateItem | Called for each item. When function returns true, iteration is stopped. |
Returns:
A set containing all items of source, upto when fn returns true (non inclusive)
- Type
- Iterable
(static) takeWhile(source, fn) → {Iterable}
- Source:
import {takeWhile} from 'async_iter/pipeline/take_while' # pipeline version
import {takeWhile} from 'async_iter/take_while' # conventional version
Takes a number of items from the source iteration
Supports both sync and async iterations
Parameters:
Name | Type | Description |
---|---|---|
source |
Iterable | The source iteration |
fn |
evaluateItem | Called for each item. When function returns false, iteration is stopped |
Returns:
A set containing all items of source, upto when fn returns false (non inclusive)
- Type
- Iterable
(static) tap(source, fn) → {void}
import {tap} from 'async_iter/pipeline/tap' # pipeline version
import {tap} from 'async_iter/tap' # conventional version
Call a side effect function for each item in the iteration. The returned value is ignored
Supports both sync and async iterations
Parameters:
Name | Type | Description |
---|---|---|
source |
Iterable | The source iteration |
fn |
itemCallback | Function to be called for each item |
Returns:
- Type
- void
(static) toArray(source) → {Array}
- Source:
import {toArray} from 'async_iter/pipeline/to_array' # pipeline version
import {toArray} from 'async_iter/to_array' # conventional version
Converts an iterable source to an array
Supports both sync and async iterations
Parameters:
Name | Type | Description |
---|---|---|
source |
Iterable | The source iteration |
Returns:
An array containing all the items of the source iteration
- Type
- Array