Skip to content

Server

The high-level server. Handles pytree flattening on submit(), unflattening on sample(), and owns the underlying Rust _Server.

The example pytree passed to Server defines the shape and dtype of every leaf, and the server will only accept samples that match it exactly. The matching client must be constructed with a pytree of the same structure and per-leaf shape/dtype, and every client.send(...) call must pass a pytree with those exact shapes too. Mismatches are rejected at handshake (by the client) or produce a hard size error inside send().

Server

Server that handles pytree flattening/unflattening on top of the Rust core.

Parameters:

Name Type Description Default
example Any

Example pytree with numpy arrays (used to infer structure)

required
batch_size int

Number of samples per emitted batch

required
transport TcpTransport | None

Optional TcpTransport. Omit for in-process use; call submit() to feed samples directly. Custom transports (gRPC, RDMA, …) can be wired by implementing the Rust Transport trait.

None
num_buffers int

Number of ring buffer batches (min 2, default 3)

3
num_drainers int

Number of threads draining from producer queues

8
producer_queue_size int

Per-connection queue size

8

Lifetime: the numpy arrays returned by sample() are views into Rust-owned ring-buffer memory. They are invalidated as soon as the next batch reuses those slots, and the memory is freed when this Server is garbage-collected. Don't keep references to a batch beyond the next sample() call without np.copy, and don't drop the Server while you still hold sample arrays.

Source code in python/echo/server.py
class Server:
    """
    Server that handles pytree flattening/unflattening on top of the Rust core.

    Args:
        example: Example pytree with numpy arrays (used to infer structure)
        batch_size: Number of samples per emitted batch
        transport: Optional TcpTransport. Omit for in-process use; call
            ``submit()`` to feed samples directly. Custom transports
            (gRPC, RDMA, …) can be wired by implementing the Rust
            ``Transport`` trait.
        num_buffers: Number of ring buffer batches (min 2, default 3)
        num_drainers: Number of threads draining from producer queues
        producer_queue_size: Per-connection queue size

    Lifetime: the numpy arrays returned by ``sample()`` are views into
    Rust-owned ring-buffer memory. They are invalidated as soon as the next
    batch reuses those slots, and the memory is freed when this Server is
    garbage-collected. Don't keep references to a batch beyond the next
    sample() call without ``np.copy``, and don't drop the Server while you
    still hold sample arrays.
    """

    def __init__(
        self,
        example: Any,
        batch_size: int,
        transport: TcpTransport | None = None,
        num_buffers: int = 3,
        num_drainers: int = 8,
        producer_queue_size: int = 8,
    ):
        leaves, self._treedef = optree.tree_flatten(example)

        if not all(isinstance(leaf, np.ndarray) for leaf in leaves):
            raise TypeError("All leaves must be numpy arrays")

        self._shapes = [leaf.shape for leaf in leaves]
        dtype_sizes = [leaf.dtype.itemsize for leaf in leaves]
        self._dtypes = [leaf.dtype for leaf in leaves]
        self._batch_size = batch_size

        self._server = _Server(
            shapes=list(self._shapes),
            dtype_sizes=list(dtype_sizes),
            batch_size=batch_size,
            transport=transport,
            num_buffers=num_buffers,
            num_drainers=num_drainers,
            producer_queue_size=producer_queue_size,
        )

    def start(self) -> None:
        """Start the transport (bind port, accept connections)."""
        self._server.start()

    def sample(self) -> Sample | None:
        """
        Block until a batch is ready. Returns None on shutdown.

        Each leaf of ``Sample.batch`` is a zero-copy view into Rust-owned
        memory and is invalidated on the next ``sample()`` call. Use
        ``np.copy`` (or ``dataset_iter(copy=True)``) if you need to retain a
        batch beyond the next call.
        """
        result = self._server.sample()
        if result is None:
            return None
        flat_arrays, info = result

        reshaped = [
            np.frombuffer(b, dtype=d).reshape((self._batch_size,) + tuple(s))
            for b, d, s in zip(flat_arrays, self._dtypes, self._shapes)
        ]

        batch = optree.tree_unflatten(self._treedef, reshaped)
        return Sample(batch=batch, info=info)

    def dataset_iter(self, *, copy: bool = False) -> Generator[Sample, None, None]:
        """Yield Samples as batches become ready; returns on shutdown.

        Pass ``copy=True`` to deep-copy each batch and free the underlying
        ring slots for reuse on the next iteration.
        """
        while True:
            sample = self.sample()
            if sample is None:
                return
            if copy:
                sample = Sample(
                    batch=optree.tree_map(np.copy, sample.batch),
                    info=sample.info,
                )
            yield sample

    def submit(self, data: Any) -> None:
        """Submit a single sample directly to the store (in-process / tests)."""
        leaves, _ = optree.tree_flatten(data)
        self._server.submit([leaf.tobytes() for leaf in leaves])

    def close(self) -> None:
        """Signal shutdown, causing sample() to return None."""
        self._server.shutdown()

    def reset_histograms(self) -> None:
        """Reset the histogram fields on subsequent SampleInfo snapshots.
        Counters and gauges are unchanged.
        """
        self._server.reset_histograms()

start

start() -> None

Start the transport (bind port, accept connections).

Source code in python/echo/server.py
def start(self) -> None:
    """Start the transport (bind port, accept connections)."""
    self._server.start()

sample

sample() -> Sample | None

Block until a batch is ready. Returns None on shutdown.

Each leaf of Sample.batch is a zero-copy view into Rust-owned memory and is invalidated on the next sample() call. Use np.copy (or dataset_iter(copy=True)) if you need to retain a batch beyond the next call.

Source code in python/echo/server.py
def sample(self) -> Sample | None:
    """
    Block until a batch is ready. Returns None on shutdown.

    Each leaf of ``Sample.batch`` is a zero-copy view into Rust-owned
    memory and is invalidated on the next ``sample()`` call. Use
    ``np.copy`` (or ``dataset_iter(copy=True)``) if you need to retain a
    batch beyond the next call.
    """
    result = self._server.sample()
    if result is None:
        return None
    flat_arrays, info = result

    reshaped = [
        np.frombuffer(b, dtype=d).reshape((self._batch_size,) + tuple(s))
        for b, d, s in zip(flat_arrays, self._dtypes, self._shapes)
    ]

    batch = optree.tree_unflatten(self._treedef, reshaped)
    return Sample(batch=batch, info=info)

dataset_iter

dataset_iter(*, copy: bool = False) -> Generator[Sample, None, None]

Yield Samples as batches become ready; returns on shutdown.

Pass copy=True to deep-copy each batch and free the underlying ring slots for reuse on the next iteration.

Source code in python/echo/server.py
def dataset_iter(self, *, copy: bool = False) -> Generator[Sample, None, None]:
    """Yield Samples as batches become ready; returns on shutdown.

    Pass ``copy=True`` to deep-copy each batch and free the underlying
    ring slots for reuse on the next iteration.
    """
    while True:
        sample = self.sample()
        if sample is None:
            return
        if copy:
            sample = Sample(
                batch=optree.tree_map(np.copy, sample.batch),
                info=sample.info,
            )
        yield sample

submit

submit(data: Any) -> None

Submit a single sample directly to the store (in-process / tests).

Source code in python/echo/server.py
def submit(self, data: Any) -> None:
    """Submit a single sample directly to the store (in-process / tests)."""
    leaves, _ = optree.tree_flatten(data)
    self._server.submit([leaf.tobytes() for leaf in leaves])

close

close() -> None

Signal shutdown, causing sample() to return None.

Source code in python/echo/server.py
def close(self) -> None:
    """Signal shutdown, causing sample() to return None."""
    self._server.shutdown()

reset_histograms

reset_histograms() -> None

Reset the histogram fields on subsequent SampleInfo snapshots. Counters and gauges are unchanged.

Source code in python/echo/server.py
def reset_histograms(self) -> None:
    """Reset the histogram fields on subsequent SampleInfo snapshots.
    Counters and gauges are unchanged.
    """
    self._server.reset_histograms()

Sample

Bases: NamedTuple

A batch paired with a snapshot of backpressure metrics.

Source code in python/echo/server.py
class Sample(NamedTuple):
    """A batch paired with a snapshot of backpressure metrics."""

    batch: Any
    info: SampleInfo