broadcast.js

import {pump} from './pump'
import {promiseSignal} from './lib/promise_helpers'

/**
```
import {broadcast} from 'async_iter/pipeline/broadcast' # pipeline version
import {broadcast} from 'async_iter/broadcast' # conventional version
```
Returns a generator function that will subscribe to the source iteration
<br/>
Each generator function, will iterate over the same source values
<br/>
> * No queing of values, so each consumer will be made to wait for all other consumers
<br/>
> * The source iteration is not started, until at least one subscription has started consuming
<br/>
> * The source iteration is paused, if all consumers are stopped.  Any new subscriptions will continue from where the source iteraion was iterated to

 * @param  {Iterable}         source        The source iteration to broadcast to all subscribers
 * @return {Function} a generator function to create an iterable of the source items
 <br/>
.return - A function to close all consumer iterators and close the source iteration.
 * @function
 * @memberof module:Operators
 * @name broadcast
 */
export function broadcast(source) {
  let count = 1
  const subscribers = {}
  const pumps = {}
  let firstSubscription = promiseSignal()
  let sourceHasStopped = false
  let lastEmittedValue = undefined
  let broadcastHasStopped = false

  function removeAllSubscribers() {
    for (const key of Object.keys(subscribers)) {
      subscribers[key].return()
      pumps[key].return()
      delete subscribers[key]
      delete pumps[key]
    }
  }

  function pushToAllSubscribers(x) {
    const pending = Object
      .entries(subscribers)
      .map(([key, value]) => value.next(x)
        .then(r => ({...r, key})))

    return Promise.all(pending)
  }

  function throwErrorToAllSubscribers(err) {
    const pending = Object
      .values(subscribers)
      .map(value => value.throw(err))

    return Promise.all(pending)
  }

  /* eslint complexity: ['error', 5] */
  process.nextTick(async () => {
    await firstSubscription.promise
    try {
      for await (const x of source) {
        lastEmittedValue = x
        await pushToAllSubscribers(x)

        if (broadcastHasStopped)
          break

        if (Object.entries(subscribers).length === 0) {
          firstSubscription = promiseSignal()
          await firstSubscription.promise
        }
      }
    } catch (err) {
      await throwErrorToAllSubscribers(err)

    } finally {
      sourceHasStopped = true
      removeAllSubscribers()
    }
  })

  const result = (inclusiveOfPreviousValue = false) => {
    if (sourceHasStopped)
      throw new Error('Source iteration has already completed')

    const key = count++
    pumps[key] = pump(async (target, hasStopped) => {
      hasStopped.then(() => {
        delete subscribers[key]
        delete pumps[key]
      }).catch(() => {
        delete subscribers[key]
        delete pumps[key]
      })
      if (sourceHasStopped)
        throw new Error('Source iteration has already completed')
      await target.next()
      subscribers[key] = target
      firstSubscription.res()
      if (inclusiveOfPreviousValue && lastEmittedValue)
        await target.next(lastEmittedValue)
    })

    return pumps[key]
  }

  result.return = async () => {
    if (broadcastHasStopped)
      return
    removeAllSubscribers()
    broadcastHasStopped = true
  }

  return result
}