Operators

Methods

(static) backlogLimit(source, count) → {Iterable}

Source:
import {map} from 'async_iter/pipeline/backlog_limit' # pipeline version
import {map} from 'async_iter/backlog_limit' # conventional version

Buffers items upto a limit count to pass onto consumer.

Supports only async iterations

Useful when your consumer is slower than your producer, and causes source items to be dropped.

Parameters:
Name Type Description
source Iterable

The source iteration to process

count Number

The max number of items to buffer

Returns:

The limited items

Type
Iterable

(static) broadcast(source) → {function}

Source:
import {broadcast} from 'async_iter/pipeline/broadcast' # pipeline version
import {broadcast} from 'async_iter/broadcast' # conventional version

Returns a generator function that will subscribe to the source iteration
Each generator function, will iterate over the same source values

  • No queing of values, so each consumer will be made to wait for all other consumers
  • The source iteration is not started, until at least one subscription has started consuming
  • The source iteration is paused, if all consumers are stopped. Any new subscriptions will continue from where the source iteraion was iterated to
Parameters:
Name Type Description
source Iterable

The source iteration to broadcast to all subscribers

Returns:

a generator function to create an iterable of the source items
.return - A function to close all consumer iterators and close the source iteration.

Type
function

(static) bufferBy(source, trigger, maxWaitTime) → {Iterable}

Source:
See:
  • also bufferGroupBy
import {bufferBy} from 'async_iter/pipeline/buffer_by' # pipeline version
import {bufferBy} from 'async_iter/buffer_by' # conventional version

Collect a set of items from source. Emit as an array of those items.
The batch is produced, when the trigger returns true, or the maxWaitTime has elasped since the last emitted value

Parameters:
Name Type Description
source Iterable

The source iteration to buffer

trigger triggerCallback

Called for each item in the source iteration. Return true to trigger a batch

maxWaitTime Bumber

period is milliseconds to trigger a batch, if no batch has been emitted and there are pending values

Returns:

The buffered items

Type
Iterable

(static) bufferGroupBy(source, selector, trigger, maxWaitTime) → {Iterable}

Source:
See:
  • also bufferBy
import {bufferGroupBy} from 'async_iter/pipeline/buffer_group_by' # pipeline version
import {bufferGroupBy} from 'async_iter/buffer_group_by' # conventional version

Collect and group items as per a trigger function or a time period. Emits an array of the batched items.
Similar to the bufferBy only the batches emitted are as per the selector grouping function.

Parameters:
Name Type Description
source Iterable

The source iteration to buffer

selector function

A function that returns the key to be used to identify grouping

trigger triggerCallback

A function to indicate when a grouped items should be emitted

maxWaitTime number

The minimum period of time (ms), before any pending grouped items should be emitted

Returns:

The buffered items

Type
Iterable

(static) byLines(source) → {Iterable}

Source:
import {map} from 'async_iter/pipeline/by_lines' # pipeline version
import {map} from 'async_iter/by_lines' # conventional version

Take a source iterations of strings, and split and emit based on the newline character

Supports both sync and async iterations

This operator can be used to re-contructs a set of lines that have been piped thru streams.

Parameters:
Name Type Description
source Iterable

The source iteration to process

Returns:

The transformed items

Type
Iterable

(static) chunk(source, count) → {Iterable}

Source:
import {map} from 'async_iter/pipeline/chunk' # pipeline version
import {map} from 'async_iter/chunk' # conventional version

Emit a batch of count items from the source iterable

Supports both sync and async iterations

Parameters:
Name Type Description
source Iterable

The source iteration to process

count Number

The number of items to be batched together into a single value

Returns:

The chunked items

Type
Iterable

(static) combineWhen(source, fn, combine) → {Iterable}

Source:
import {map} from 'async_iter/pipeline/combineWhen' # pipeline version
import {map} from 'async_iter/combineWhen' # conventional version

Collect items, combining them, until the evaluation function return true.

Supports both sync and async iterations

Parameters:
Name Type Description
source Iterable

The source iteration to process

fn evaluateItem

A evaluator function, that upon returing true, causes all items collected to this point to be emitted

combine function

An optional function to combine two items (defaults to (a, b) => a + b)

Returns:

Emits as per the combindation functions

Type
Iterable

(static) concat(sources) → {Iterable}

Source:
import {concat} from 'async_iter/pipeline/concat' # pipeline version
import {concat} from 'async_iter/concat' # conventional version

Combines multiple source iterations into a single iteration

Supports both sync and async iterations

Parameters:
Name Type Description
sources Iterable

The source iterations

Returns:

An iterable that combines all values from the sources

Type
Iterable

(static) concatMap(source, fn) → {F}

Source:
import {concatMap} from 'async_iter/pipeline/concat_map' # pipeline version
import {concatMap} from 'async_iter/concat_map' # conventional version
Alternative name: flatMap

Joins an inner iterable with the use of a mapping function.

Supports both sync and async iterations

Parameters:
Name Type Description
source Iterable

The source iteration

fn function

A function called for each item in the source iteration and returns an iterable

Returns:

An iterable that has flatten the inner iterable values

Type
F

(static) count(source) → {Number}

Source:
import {count} from 'async_iter/pipeline/count' # pipeline version
import {count} from 'async_iter/count' # conventional version

Iterates thru the supplied iterable's set and returns the count

Supports both sync and async iterations

Parameters:
Name Type Description
source Iterable

The source iteration to count

Returns:

The count of items

Type
Number

(static) delay(source, period) → {Iterable}

Source:
import {delay} from 'async_iter/pipeline/delay' # pipeline version
import {delay} from 'async_iter/delay' # conventional version

Re-emits all values from source, with a time delay between each item

Parameters:
Name Type Description
source Iterable

The source iteration

period Number

The time in miliiseconds to pause between each item

Returns:

The delayed iterable items

Type
Iterable

(static) delayUntil(source, Date) → {Iterable}

Source:
import {delayUntil} from 'async_iter/pipeline/delay_until' # pipeline version
import {delayUntil} from 'async_iter/delay_until' # conventional version

Waits for the specific time point, and then re-emits the source items

Parameters:
Name Type Description
source Iterable

The source iteration

Date date

The timepoint to wait until the source iterations is returned.

Returns:

The delayed iterable

Type
Iterable

(static) entries(source) → {Iterable}

Source:
import {entries} from 'async_iter/pipeline/entries' # pipeline version
import {entries} from 'async_iter/entries' # conventional version

Equivalent to Array.prototype.entries

Supports both sync and async iterations

Emits the items with a key/value pair [index, value].

Parameters:
Name Type Description
source Iterable

The source iteration to process

Returns:

The transformed items

Type
Iterable

(static) every(source, fn) → {Boolean}

Source:
import {every} from 'async_iter/pipeline/every' # pipeline version
import {every} from 'async_iter/every' # conventional version

Return true if every item satisfies the supplied test function

Supports both sync and async iterations

Parameters:
Name Type Description
source Iterable

The source iteration to be tested

fn itemTest

A function called for each item in the source

Returns:

True if all items are satisfied

Type
Boolean

(static) filter(source, fn, missingValueFnopt) → {Iterable}

Source:

The optional missingValueFn allows for processing the 'filtered' items into a single item.

eg: Given an input source of [1, 2, 3, 4, 5, 6] and a filter of x >= 5, then the missingValueFn will be called with values of (1, 5) and the result of that function will be emitted before item 6.

The examples include use of the missingValueFn function

Parameters:
Name Type Attributes Description
source Iterable

The source iteration to filter

fn function

A funntion invokved for each item, returning false for items to be removed

missingValueFn missingValueFn <optional>

When supplied, will be invokved for each group of items that are filter - returns a item to be emitted, representing the filtered items

Returns:

The filtered items

Type
Iterable

(static) find(source, fn) → {*}

Source:
import {find} from 'async_iter/pipeline/find' # pipeline version
import {find} from 'async_iter/find' # conventional version

Return the first item in the source iterations, when the supplied function returns true

Supports both sync and async iterations

Parameters:
Name Type Description
source Iterable

The source iteration

fn itemTest

A function invokved for each item until true is returned

Returns:

The found item or undefined if nothing found

Type
*

(static) first(source) → {*}

Source:
import {first} from 'async_iter/pipeline/first' # pipeline version
import {first} from 'async_iter/first' # conventional version

Returns the first item of the source iterable

Supports both sync and async iterations

Parameters:
Name Type Description
source Iterable

The source iteration

Returns:

The first item

Type
*

(static) flatMap()

Source:
See:

Alias for concatMap

(static) forEach(source, fn) → {Iterable}

Source:
import {forEach} from 'async_iter/pipeline/for_each' # pipeline version
import {forEach} from 'async_iter/for_each' # conventional version

Calls the supplied function for each item within the source iterable

Supports both sync and async iterations

Parameters:
Name Type Description
source Iterable

The source iteration

fn itemCallback

Function to be called for each item

Returns:

The source iteration

Type
Iterable

(static) last(source) → {*}

Source:
import {last} from 'async_iter/pipeline/last' # pipeline version
import {last} from 'async_iter/last' # conventional version

Returns the last item of the source iterable

Supports both sync and async iterations

Parameters:
Name Type Description
source Iterable

The source iteration

Returns:

The last item

Type
*

(static) map(source, fn) → {Iterable}

Source:
import {map} from 'async_iter/pipeline/map' # pipeline version
import {map} from 'async_iter/map' # conventional version

Transforms each item in the supplied iteration using the supplied function

Supports both sync and async iterations

Parameters:
Name Type Description
source Iterable

The source iteration to map

fn function

The mapping function for each item

Returns:

The transformed items

Type
Iterable

(static) max(source, fnopt) → {*}

Source:
import {max} from 'async_iter/pipeline/max' # pipeline version
import {max} from 'async_iter/max' # conventional version

Returns the max value of the source iterable items

Supports both sync and async iterations

Parameters:
Name Type Attributes Description
source Iterable

The source iteration

fn minMaxCallback <optional>

An function to determine value order

Returns:

The max item

Type
*

(static) merge(sources) → {Iterable}

Source:
import {merge} from 'async_iter/pipeline/merge' # pipeline version
import {merge} from 'async_iter/merge' # conventional version

Merges the source iterables items into a single iterable. Order is as they come.

If all source iterables are stopped, then the returned iteration is stopped

If the consumer stops the iteration, all source iterations are stopped.

Parameters:
Name Type Description
sources Iterable

The source iterations

Returns:

An iterable that combines all values from the sources

Type
Iterable

(static) min(source, fnopt) → {*}

Source:
import {min} from 'async_iter/pipeline/min' # pipeline version
import {min} from 'async_iter/min' # conventional version

Returns the min value of the source iterable items

Supports both sync and async iterations

Parameters:
Name Type Attributes Description
source Iterable

The source iteration

fn minMaxCallback <optional>

An function to determine value order

Returns:

The min item

Type
*

(static) persisted(source, path, opts) → {Iterable.<PersistedItem>}

Source:
import {persisted} from 'async_iter/pipeline/persisted' # pipeline version
import {persisted} from 'async_iter/persisted' # conventional version

Persist items of an async iterator to files for later resumption

NB: Persisted will consume items as fast as the source will emit. The consumer of the iteration will be 'decoupled' from the source

Example
import {persisted} from 'async_iter'

const items = await persisted(source, './tmp/buffering_example')

for await (const item of items) {
  console.log(item.value.toString())
  item.completed() // If this function is not called, item will be processed if iterator restarted
}
Parameters:
Name Type Description
source Iterable

The source iteration

path String

is a directory for storage of items

opts Object

a set of optional flags

Properties
Name Type Attributes Default Description
allowRestart boolean <optional>
false

allows a restart of a previously completed iteration

maxBytes boolean <optional>
0

limits the number of bytes that can be stored. Zero indicates no limit

overFlowEvent= overFlowEventCallback <optional>

callback function invoked when maxBytes exceeded

Returns:

An async iteration, where each item resolves to an object containing

Type
Iterable.<PersistedItem>

(static) rateLimit(source, maxAmount, perPeriod, counter=opt) → {Iterable}

Source:
import {rateLimit} from 'async_iter/pipeline/rateLimit' # pipeline version
import {rateLimit} from 'async_iter/rateLimit' # conventional version

Emits the values from the source iteration at upto a limited rate

Supports both sync and async iterations. Always returns an async iterator
The source iteration will be consumed only as required - there is no queing within rateLimit function
The definition of unit is defined by the optional function counter. The default will be 1 iteration equals 1 unit

Example
import {rateLimit} from 'async_iter/pipeline'

// Emit at no more than 5 characters per 2s
const items = ['first', 'second', 'third', 'fourth', 'fifth']
  |> rateLimit(5, 2000, v => v.toString().length)
Parameters:
Name Type Attributes Description
source Iterable

The source iteration to rate limit

maxAmount Number

the maxmimum number of 'units' to be emitted within the time of perPeriod

perPeriod Number

the period in milliseconds to be applied

counter= function <optional>

an optional callback function, called for each item

Returns:

The iterable that will not emit at a rate greater than specified

Type
Iterable

(static) reduce(source, fn, initialValue) → {*}

Source:
import {reduce} from 'async_iter/pipeline/reduce' # pipeline version
import {reduce} from 'async_iter/reduce' # conventional version

Apply reducer function to each item of the source iterable, returning the single reduce value

Supports both sync and async iterations. Always returns an async iterator

Parameters:
Name Type Description
source Iterable

The source iteration to rate limit

fn reducerCallback

The function to call for each item

initialValue *

The initial value to supply to the first call of fn

Returns:

The final reduced value

Type
*

(static) take(source, count) → {Iterable}

Source:
import {take} from 'async_iter/pipeline/take' # pipeline version
import {take} from 'async_iter/take' # conventional version

Takes a number of items from the source iteration

Supports both sync and async iterations

Example
import {take} from 'async_iter'

const items = take([1, 2, 3, 4, 5], 3)

for await (const item of items)
  console.log(item)
Parameters:
Name Type Description
source Iterable

The source iteration

count Number

The number to take from the source

Returns:

A set containing upto the count of items

Type
Iterable

(static) takeUntil(source, fn) → {Iterable}

Source:
import {takeUntil} from 'async_iter/pipeline/take_until' # pipeline version
import {takeUntil} from 'async_iter/take_until' # conventional version

Takes a number of items from the source iteration

Supports both sync and async iterations

Parameters:
Name Type Description
source Iterable

The source iteration

fn evaluateItem

Called for each item. When function returns true, iteration is stopped.

Returns:

A set containing all items of source, upto when fn returns true (non inclusive)

Type
Iterable

(static) takeWhile(source, fn) → {Iterable}

Source:
import {takeWhile} from 'async_iter/pipeline/take_while' # pipeline version
import {takeWhile} from 'async_iter/take_while' # conventional version

Takes a number of items from the source iteration

Supports both sync and async iterations

Parameters:
Name Type Description
source Iterable

The source iteration

fn evaluateItem

Called for each item. When function returns false, iteration is stopped

Returns:

A set containing all items of source, upto when fn returns false (non inclusive)

Type
Iterable

(static) tap(source, fn) → {void}

Source:
import {tap} from 'async_iter/pipeline/tap' # pipeline version
import {tap} from 'async_iter/tap' # conventional version

Call a side effect function for each item in the iteration. The returned value is ignored

Supports both sync and async iterations

Parameters:
Name Type Description
source Iterable

The source iteration

fn itemCallback

Function to be called for each item

Returns:
Type
void

(static) toArray(source) → {Array}

Source:
import {toArray} from 'async_iter/pipeline/to_array' # pipeline version
import {toArray} from 'async_iter/to_array' # conventional version

Converts an iterable source to an array

Supports both sync and async iterations

Parameters:
Name Type Description
source Iterable

The source iteration

Returns:

An array containing all the items of the source iteration

Type
Array