pump.js

import {promiseSignal} from './lib/promise_helpers'

class ArgumentError extends Error {
  constructor() {
    super('pump callback function has not returned a promise')
    this.name = this.constructor.name

    if (Error.captureStackTrace)
      Error.captureStackTrace(this, this.constructor.name)
  }
}

/**
```
import {pump} from 'async_iter/pump'
```
pump allows for the 'pushing' of values into an async iterator consumer

The `push` operation returns a promise, that resolves when the consuming iteration has consumed the item

This function follows the convention of a pushed iterator interface (next, throw, return).

If the code pushing values, does not await the return promise, the values are then queued
for processing by the consumer as it pulls in the values

The callback is not invoked, until the first item is pulled from the iteration

@param  {pumpCallback} fn        this is a function that will async pump values into the interator
@param  {String}   [marker=]
@return {Iteratable}         A standard async iterator that can consume the generated values

@example
import {pump} from 'async_iter'

// Create a push based iteration set
const items = await pump(target => {
  //Values can be push to the iteration
  await target.next(1) // if you dont 'await' the values will be queued.
  await target.next(2)
  await target.next(3)
  // If you want to push an 'error' to the consumer
  // await target.throw(new Error('This is an error'))
  await target.return()
})

for await (const item of items)
  console.log(item)

@memberof module:Generators
@name pump
@function
*/
export function pump(fn, marker) {
  const myObject = new ArgumentError()
  return _pump(fn, marker, myObject)
}

async function* _pump(fn, marker, myObject) {
  let values = undefined
  let keepAlive
  let latch = promiseSignal()
  const unlatch = []
  const keepAliveTimer = () => keepAlive = setTimeout(keepAliveTimer, 250)
  let hasStopped = false
  const hasStoppedSignal = createStopSignal(fn.length >= 2)

  function createStopSignal(callbackRequestsStopSignal) {
    if (!callbackRequestsStopSignal)
      return
    const hasStoppedSignal = promiseSignal()
    hasStoppedSignal.promise.now = () => {
      return hasStopped
    }

    return hasStoppedSignal
  }

  async function _next(item, options = {}) {
    if (hasStopped)
      return {value: undefined, done: true}

    options = {done: false, ...options}
    const p = unlatch.length === 0 ? {marker: 'none'} : unlatch[unlatch.length - 1]
    const newP = promiseSignal()
    unlatch.push(newP)
    await p.promise

    values = {...options, item}
    latch.res()
  }

  function _return() {
    return _next(undefined, {done: true})
  }

  function _throw(error) {
    return _next(undefined, {error})
  }

  async function untilNextValueAvailable() {
    await latch.promise
    latch = promiseSignal({})
  }

  function unLatchNextValue() {
    const p = unlatch.shift()
    p.res()
  }

  function unlatchAll() {
    latch.res()
    for (const p of unlatch)
      p.res()
    unlatch.length = 0
  }

  function extractNextValue() {
    const v = values
    values = undefined
    if (v.error)
      throw v.error
    return v
  }

  process.nextTick(() => {
    async function* _target() {
      let count = 0
      try {
        while (!hasStopped) {
          const x = await (yield count++)
          await _next(x)
        }
      } catch (err) {
        _throw(err)
      } finally {
        await _return()
      }
    }

    const target = _target()

    let operation
    if (hasStoppedSignal)
      operation = fn(target, hasStoppedSignal.promise)
    else
      operation = fn(target)

    if (operation && operation.catch)
      operation.catch(err => _throw(err))
    else
      return _throw(myObject)
  })

  keepAliveTimer()

  function terminateIteration(err) {
    hasStopped = true
    if (hasStoppedSignal)
      if (err) {
        hasStoppedSignal.res(err)
      }
      else
        hasStoppedSignal.res()

    unlatchAll()
    clearTimeout(keepAlive)
    if (err)
      throw err
  }

  try {
    while (true) {
      await untilNextValueAvailable()
      const {item, done} = extractNextValue()

      if (done)
        break

      yield item

      unLatchNextValue()
    }
  } catch (err) {
    terminateIteration(err)
  } finally {
    terminateIteration()
  }
}