feat: refined zero-copy transport with trajectory scores and token alignment

This commit is contained in:
RUFFY-369 2026-04-04 02:46:46 +05:30
parent 4f0acead3f
commit 13ed139594

View file

@ -92,7 +92,8 @@ class ZeroCopySHMBuffer:
def write_trajectory(self, tokens: List[int], score: float, metadata: Dict[str, Any] = None):
"""
Writes a trajectory to the buffer without any Python-side copies.
Writes a trajectory, its score, and metadata to the buffer.
Schema: [Score (8 bytes) | TokenLen (4 bytes) | Tokens (EntrySize * 4 bytes)]
"""
read_idx, write_idx, max_size, entry_size = self._get_control()
@ -103,14 +104,22 @@ class ZeroCopySHMBuffer:
return False
# Calculate offset in data segment
offset = SHMBufferConfig.SIZE + (write_idx * entry_size * 4)
slot_size = 8 + 4 + (entry_size * 4)
offset = SHMBufferConfig.SIZE + (write_idx * slot_size)
# Zero-copy write using numpy view
# 1. Write Score (float64, 8 bytes)
struct.pack_into("d", self.buf, offset, float(score))
# 2. Write Token Length (int32, 4 bytes)
token_len = min(len(tokens), entry_size)
struct.pack_into("i", self.buf, offset + 8, token_len)
# 3. Write Tokens (int32 array, EntrySize * 4 bytes)
token_offset = offset + 8 + 4
token_arr = np.array(tokens, dtype=np.int32)
token_len = min(len(token_arr), entry_size)
# View the SHM as a numpy array for the specific slot
shm_slot = np.ndarray((entry_size,), dtype=np.int32, buffer=self.buf, offset=offset)
# View the SHM as a numpy array for the specific token slot
shm_slot = np.ndarray((entry_size,), dtype=np.int32, buffer=self.buf, offset=token_offset)
shm_slot[:token_len] = token_arr[:token_len]
if token_len < entry_size:
shm_slot[token_len:] = 0 # Padding
@ -119,23 +128,36 @@ class ZeroCopySHMBuffer:
self._set_indices(read_idx, next_write)
return True
def read_next(self) -> Optional[np.ndarray]:
def read_next(self) -> Optional[Dict[str, Any]]:
"""
Reads the next available trajectory as a numpy view (no copy).
Reads the next available trajectory with its score and metadata.
"""
read_idx, write_idx, max_size, entry_size = self._get_control()
if read_idx == write_idx:
return None # Buffer empty
offset = SHMBufferConfig.SIZE + (read_idx * entry_size * 4)
slot_size = 8 + 4 + (entry_size * 4)
offset = SHMBufferConfig.SIZE + (read_idx * slot_size)
# Return a view of the memory
data = np.ndarray((entry_size,), dtype=np.int32, buffer=self.buf, offset=offset)
# 1. Read Score (float64)
score = struct.unpack_from("d", self.buf, offset)[0]
# 2. Read Token Length (int32)
token_len = struct.unpack_from("i", self.buf, offset + 8)[0]
token_len = min(token_len, entry_size)
# 3. Read Tokens (Numpy View)
token_offset = offset + 8 + 4
tokens_view = np.ndarray((token_len,), dtype=np.int32, buffer=self.buf, offset=token_offset)
# Advance read index
self._set_indices((read_idx + 1) % max_size, write_idx)
return data
return {
"tokens": tokens_view.tolist(),
"score": score
}
def close(self, unlink: bool = False):
self.shm.close()