rate_limit.js

import {promiseSignal} from './lib/promise_helpers'
import {asAsyncIterator} from './lib/get_iterator'

/* eslint complexity: ['error', 4] */
/**
```
import {rateLimit} from 'async_iter/pipeline/rateLimit' # pipeline version
import {rateLimit} from 'async_iter/rateLimit' # conventional version
```
Emits the values from the source iteration at upto a limited rate

> Supports both **sync** and  **async** iterations.  Always returns an **async** iterator
<br/>
> The source iteration will be consumed only as required - there is no queing within rateLimit function
<br/>
> The definition of unit is defined by the optional function `counter`. The default will be 1 iteration equals 1 unit

 * @param  {Iterable}         source        The source iteration to rate limit
 * @param {Number}   maxAmount     the maxmimum number of 'units' to be emitted within the time of `perPeriod`
 * @param {Number}   perPeriod     the period in milliseconds to be applied
 * @param {Function} [counter=]    an optional callback function, called for each item
 * @return {Iterable} The iterable that will not emit at a rate greater than specified
 * @function
 * @example
import {rateLimit} from 'async_iter/pipeline'

// Emit at no more than 5 characters per 2s
const items = ['first', 'second', 'third', 'fourth', 'fifth']
  |> rateLimit(5, 2000, v => v.toString().length)
 * @memberof module:Operators
 * @name rateLimit
 */
export async function* rateLimit(source, maxAmount, perPeriod, counter = () => 1) {
  source = await asAsyncIterator(source)

  let forBucketAboveZero = promiseSignal()
  forBucketAboveZero.res()

  let bucket = maxAmount
  const inc = (100 / perPeriod) * maxAmount
  const interval = setInterval(() => {
    bucket = Math.min(maxAmount, bucket + inc)
    if (bucket >= 0)
      forBucketAboveZero.res()

  }, 100)

  try {
    while (true) {
      const item = await source.next()
      if (item.done)
        return
      bucket = bucket - counter(item.value)
      if (bucket < 0) {
        forBucketAboveZero = promiseSignal()
        await forBucketAboveZero.promise
      }
      yield item.value
    }
  }
  finally {
    clearInterval(interval)
  }
}