Backpressure
FiberStream's default runtime is pull-based. Downstream demand asks for one element. Each flow pulls only what it needs. The source advances only when demand reaches it.
Early completion
Sink.first completes after one element:
FiberStream::Source.each([1, 2, 3])
.run_with(FiberStream::Sink.first)
# => 1Flow.take closes upstream when the limit is reached:
FiberStream::Source.each([1, 2, 3])
.take(2)
.run_with(FiberStream::Sink.to_a)
# => [1, 2]Bounded prefetch
Flow.buffer(count) allows bounded prefetch. It requires a scheduler when the stage is demanded.
Async do
FiberStream::Source.each(1..10)
.buffer(4)
.run_with(FiberStream::Sink.to_a)
end.waitThe buffer stores at most count messages. Closing downstream requests producer cancellation.
Async mapping
Use parallel_map when each mapping operation waits on scheduler-aware IO. Results are emitted in input order.
Async do
FiberStream::Source.each([1, 2, 3])
.parallel_map(concurrency: 3) { |id| fetch_record(id) }
.run_with(FiberStream::Sink.to_a)
end.waitparallel_map is not a CPU parallelism primitive. Use ractor_map for CPU-bound work that can run in Ractors.
Use parallel_unordered_map when each mapped result can be handled independently. It uses the same scheduler-backed concurrency model but emits values as mapping operations finish, so input order is not preserved.
Ractor boundaries
ractor_map, ractor_producer, ractor_merge_producers, ractor_port, and ractor_merge_ports do not require a Fiber.scheduler. They use Ruby Ractors for isolation and CPU parallelism.
Ractor APIs are experimental in Ruby. Treat Ractor-facing code as an explicit boundary and keep transferred objects shareable or move-only by design.