Store¶
src/store.rs is the composition layer. It owns the pre-allocated
PytreeRingBuf, a boxed Sampler, and a
boxed Remover, and exposes the actual insertion strategy used by both
the drainer pool and the direct submit() path. The path from SPSC pop
to consumer wake lives here.
Cursors¶
Three cursors govern the ring:
| Cursor | Owner | Role |
|---|---|---|
write_cursor (atomic) |
drainers (and submit) |
Next absolute slot to reserve. Advanced by CAS. |
commit counters (atomic, in FifoSampler) |
drainers | Per-batch-slot fill counter; wake when full. |
read_cursor (in FifoRemover) |
the single consumer | Next absolute slot to read. |
Cursors are absolute (usize that monotonically grows) and only modded
into [0, capacity) when actually addressing the ring. That keeps wrap-
around bookkeeping out of the comparisons.
The reserve-memcpy-commit dance¶
insert_batch is the hot path for drainers. One round of the loop:
- Reserve. Call
try_reserve_slots(want). This does a single CAS onwrite_cursorto claim up towantcontiguous slots, capped at the batch boundary so the reservation never spans two batch-slots. Returns(start, n)on success. - Memcpy. For each of the
nslots, copy each array of the sample straight into the ring slot.slot_mutreturns a raw pointer; nothing synchronises because the reservation guarantees disjoint access. - Commit. Call
sampler.commit_batch(start, n), onefetch_add(n)on the batch-slot counter. The drainer whose increment lands the counter atbatch_sizewakes the consumer (see Selector).
If try_reserve_slots returns None (ring is full), the caller awaits on
space_available (a tokio::sync::Notify) and tries again. The consumer
wakes producers by calling space_available.notify_waiters() from
release_previous_batch.
Why cap reservations at the batch boundary¶
try_reserve_slots reserves min(want, available, in_batch_remain) slots.
That last term is what keeps reservations from crossing a batch boundary.
The reason: commit is a fetch_add(n) on one batch-slot counter. A
reservation that straddled two batches would require two fetch_adds and
the wake detection (which drainer's commit brings the counter to
batch_size) would become ambiguous. Two drainers could each commit
half of a batch and neither would observe counter == batch_size for
the half already finished by the other drainer. Capping at the boundary
means one reservation, one batch-slot, one commit, unambiguous wake.
The cost is that a drainer with 2*batch_size samples queued might do
two reservations instead of one. Each is still a single CAS, and the
second one starts from write + n which is the next batch-slot's first
position, so the CAS retries are bounded by genuine contention, not by
this rule.
Two insert paths¶
insert_batch (async) is the drainer path. It awaits on space_available
when the ring is full.
insert_sync is the in-process submit() path. It spins on a CAS retry
loop without going through tokio. This is fine because in-process use is
single-threaded by construction, so the ring is never full long enough
for spinning to matter.
Both go through the same write_slot / try_reserve_slots CAS, so the
correctness story is identical.
Releasing slots¶
The consumer doesn't release slots when it reads; it releases the
previous batch's slots on entry to the next sample() call. That's why
Store carries has_previous_batch: Cell<bool>: it's how the consumer
remembers there's a batch outstanding from last time.
This matters because the numpy views from sample() are still pointing
into those slots until the next sample() is called; see
Zero-copy batches. Releasing eagerly would let a
drainer overwrite a slot that the learner is still reading.
release_previous_batch calls remover.remove(batch_size), which advances
read_cursor. If the ring was full at the time of release, it also calls
space_available.notify_waiters() to unpark any blocked inserters.
Why Cell<bool> is sound¶
has_previous_batch is a Cell<bool>, non-Sync. That's only OK because
only the consumer touches it, and there's only one consumer. The
unsafe impl Sync for Store is conditional on this invariant. If
multi-consumer sampling is ever added, this field needs to become an
atomic or move into the consumer's stack frame.