persisted/index.js

import {open} from './storage'

/**
```
import {persisted} from 'async_iter/pipeline/persisted' # pipeline version
import {persisted} from 'async_iter/persisted' # conventional version
```
Persist items of an async iterator to files for later resumption
> NB: `Persisted` will consume items as fast as the source will emit.  The consumer of the iteration will be 'decoupled' from the source

 * @param {Iterable} source   The source iteration
 * @param  {String} path    is a directory for storage of items
 * @param  {Object} opts   a set of optional flags
 * @param  {boolean} [opts.allowRestart=false]   allows a restart of a previously completed iteration
 * @param  {boolean} [opts.maxBytes=0]   limits the number of bytes that can be stored. Zero indicates no limit
 * @param  {overFlowEventCallback} [opts.overFlowEvent=]   callback function invoked when <code>maxBytes</code> exceeded
 * @return {Iterable<PersistedItem>}        An async iteration, where each item resolves to an object containing
 * @name persisted
 * @function
 * @example
import {persisted} from 'async_iter'

const items = await persisted(source, './tmp/buffering_example')

for await (const item of items) {
  console.log(item.value.toString())
  item.completed() // If this function is not called, item will be processed if iterator restarted
}
 * @memberof module:Operators
 * @name persisted
 */
export async function persisted(source, path, opts = {}) {
  opts = {
    allowRestart: false,
    maxBytes: 0,
    overFlowEvent: () => undefined,
    ...opts,

    overflow: false
  }

  const {push, stop, items, consumerHasStopped} = await open(path, opts)

  process.nextTick(async () => {
    let item = await source.next()
    try {
      while (!item.done) {
        if (consumerHasStopped())
          break

        await push(item.value)

        item = await source.next()
      }
    } catch (err) {
      source.throw(err)
    } finally {
      source.return()
      await stop()
    }
  })

  return items
}