buffer_group_by.js

import {asAsyncIterator} from './lib/get_iterator'

const True = true
const delay = period => new Promise(res => setTimeout(res, period))

/**
```
import {bufferGroupBy} from 'async_iter/pipeline/buffer_group_by' # pipeline version
import {bufferGroupBy} from 'async_iter/buffer_group_by' # conventional version
```
 * Collect and group items as per a trigger function or a time period.  Emits an array of the batched items.
 * <br/>
 * Similar to the `bufferBy` only the batches emitted are as per the `selector` grouping function.
 * @param  {Iterable}         source        The source iteration to buffer
 * @param  {Function}         selector      A function that returns the key to be used to identify grouping
 * @param  {triggerCallback}  trigger       A function to indicate when a grouped items should be emitted
 * @param  {number}           maxWaitTime   The minimum period of time (ms), before any pending grouped items should be emitted
 * @return {Iterable} The buffered items
 * @name bufferGroupBy
 * @function
 * @see also {@link bufferBy}
 * @memberof module:Operators
 */
export function bufferGroupBy(source, selector, trigger, maxWaitTime) {
  const state = {
    buffers: {},
    nextValue: undefined,
  }

  function emit(key) {
    const value = state.buffers[key].data
    delete state.buffers[key]
    return {value}
  }

  /* eslint complexity: ['error', 29] */
  return asAsyncIterator({
    [Symbol.asyncIterator]() {
      return {
        async next() {
          while (True) {
            state.nextValue = state.nextValue || source.next()
            const timers = Object.values(state.buffers).map(t => t.timer)
            const {value, done, timed} = await Promise.race([state.nextValue, ...timers])

            if (done) {
              const keys = Object.keys(state.buffers)
              if (keys.length === 0)
                return {done: true}

              return emit(keys[0])
            }

            if (timed)
              return emit(timed)

            const groupKey = selector(value)
            if (!state.buffers[groupKey])
              state.buffers[groupKey] = {
                timer: delay(maxWaitTime).then(() => ({timed: groupKey})),
                data: []
              }
            state.buffers[groupKey].data.push(value)
            state.nextValue = undefined

            if (trigger(value, state.buffers[groupKey].data))
              return emit(groupKey)
          }
        },
        async return() {
          state.buffers = {}
          state.nextValue = undefined
          source.return()
          return {done: true}
        }
      }
    }
  })
}