Skip to content

Ractor Source

Use Source.ractor_producer when FiberStream should own a producer Ractor and turn its output into a source.

The producer block receives a RactorProducer context. Each emit call waits for downstream demand before sending one value, so producer output remains tied to stream demand.

Source.ractor_port acknowledgement sequence Downstream demand causes Source.ractor_port to acknowledge the producer Ractor. The producer then sends one element or a terminal message through the data port. One downstream pull Downstream Source.ractor_port Ack port Producer Ractor Data port 1next2 RactorPort::Ack 3 deliver ack 4 RactorPort::Element(value) 5 deliver element 6 emit value Next pull returns terminal state 7next8 RactorPort::Ack 9 deliver ack 10 Complete or Failure 11 deliver terminal message 12 complete or raise

Single producer

ruby
require "fiber_stream"

produce_values =
  Ractor.shareable_proc do |producer, values|
    values.each do |value|
      break unless producer.emit(value)
    end
  end

result =
  FiberStream::Source.ractor_producer(1..5, &produce_values)
    .map { |number| number * number }
    .run_with(FiberStream::Sink.to_a)

result # => [1, 4, 9, 16, 25]

Source.ractor_producer does not require a Fiber.scheduler. Producer work runs inside the Ractor, and FiberStream creates the ports, acknowledgment messages, and cooperative cleanup path.

Values emitted by the producer must obey Ruby Ractor transfer rules. Use transfer: :move only when the producer will not reuse the moved object.

Producer failure

A producer can fail the stream explicitly with producer.fail.

ruby
producer.fail(cause_class_name: "ProducerError", cause_message: "failed")

Failure metadata is producer-provided. Redact paths, secrets, tenant data, or other sensitive values before sending failures across trust boundaries.

Multiple producers

Use Source.ractor_merge_producers when several FiberStream-owned producer Ractors should feed one source.

ruby
produce_tagged_values =
  Ractor.shareable_proc do |producer, tag, values|
    values.each do |value|
      break unless producer.emit([tag, value])
    end
  end

records =
  FiberStream::Source
    .ractor_merge_producers do |group|
      group.producer(:a, [1, 2], &produce_tagged_values)
      group.producer(:b, [3, 4], &produce_tagged_values)
    end
    .run_with(FiberStream::Sink.to_a)

# Example result: [[:a, 1], [:b, 3], [:a, 2], [:b, 4]]

Each producer receives at most one outstanding ack. Values are emitted in coordinator-observed ready order. Each producer's own order is preserved.

Externally owned producers

Use Source.ractor_port and Source.ractor_merge_ports when producer Ractors are owned outside FiberStream or need custom lifecycle handling. The producer owns an acknowledgment port, waits for RactorPort::Ack, sends one RactorPort::Element, RactorPort::Complete, or RactorPort::Failure, then waits for the next ack.

The runnable examples are examples/ractor_producer_sources.rb, examples/ractor_port_source.rb, and examples/ractor_merge_ports_and_map.rb.

v0.4.0. Released under the MIT License.