Skip to content

Commit 4e6e481

Browse files
committed
shared memory based object store
Signed-off-by: donglu <[email protected]>
1 parent 1dba2c4 commit 4e6e481

File tree

11 files changed

+1059
-23
lines changed

11 files changed

+1059
-23
lines changed
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3+
4+
import unittest
5+
6+
from vllm.distributed.device_communicators.shm_object_storage import (
7+
SingleWriterShmRingBuffer)
8+
9+
10+
class TestSingleWriterShmRingBuffer(unittest.TestCase):
11+
"""Test suite for the ring buffer implementation"""
12+
13+
def setUp(self):
14+
"""Set up test fixtures"""
15+
self.buffer_size = 4096
16+
self.ring_buffer = None
17+
18+
def tearDown(self):
19+
"""Clean up after tests"""
20+
if self.ring_buffer:
21+
del self.ring_buffer
22+
23+
def test_buffer_opening(self):
24+
"""Test opening an existing buffer"""
25+
# First create a buffer
26+
self.ring_buffer = SingleWriterShmRingBuffer(
27+
data_buffer_size=self.buffer_size, create=True)
28+
29+
try:
30+
# Then open it with another instance
31+
reader_buffer = SingleWriterShmRingBuffer(
32+
*self.ring_buffer.handle())
33+
self.assertFalse(reader_buffer.is_writer)
34+
self.assertEqual(reader_buffer.shared_memory.name,
35+
self.ring_buffer.shared_memory.name)
36+
finally:
37+
del reader_buffer
38+
39+
def test_buffer_access(self):
40+
"""Test accessing allocated buffers"""
41+
self.ring_buffer = SingleWriterShmRingBuffer(
42+
data_buffer_size=self.buffer_size, create=True)
43+
44+
size = 100
45+
address, monotonic_id = self.ring_buffer.allocate_buf(size)
46+
47+
# Write some test data
48+
test_data = b"Hello, World!" * 7 # 91 bytes
49+
with self.ring_buffer.access_buf(address) as (data_buf, metadata):
50+
data_buf[0:len(test_data)] = test_data
51+
52+
# Read it back
53+
with self.ring_buffer.access_buf(address) as (data_buf2, metadata2):
54+
read_data = bytes(data_buf2[0:len(test_data)])
55+
read_id = metadata2[0]
56+
57+
self.assertEqual(read_data, test_data)
58+
self.assertEqual(read_id, monotonic_id)
59+
60+
def test_memory_error_on_full_buffer(self):
61+
"""Test that MemoryError is raised when buffer is full"""
62+
small_buffer_size = 200
63+
self.ring_buffer = SingleWriterShmRingBuffer(
64+
data_buffer_size=small_buffer_size, create=True)
65+
66+
# Fill up the buffer
67+
self.ring_buffer.allocate_buf(100)
68+
self.ring_buffer.allocate_buf(80) # Total: 196 bytes used
69+
70+
# This should fail
71+
with self.assertRaises(MemoryError):
72+
self.ring_buffer.allocate_buf(1) # Would exceed buffer capacity
73+
74+
def test_allocation_and_free(self):
75+
"""Test allocation and freeing of buffers"""
76+
small_buffer_size = 200
77+
self.ring_buffer = SingleWriterShmRingBuffer(
78+
data_buffer_size=small_buffer_size, create=True)
79+
80+
size = 80
81+
# Write some data
82+
test_data = b"Repeated test data"
83+
for i in range(5):
84+
address, monotonic_id = self.ring_buffer.allocate_buf(size)
85+
with self.ring_buffer.access_buf(address) as (data_buf, metadata):
86+
data_buf[0:4] = (0).to_bytes(4, "little") # 0 for not in-use
87+
data_buf[4:len(test_data) + 4] = test_data
88+
print(self.ring_buffer.metadata)
89+
freed_id_start, freed_id_end = (self.ring_buffer.free_buf()
90+
) # Reset buffer state
91+
print(f" Freed IDs: {freed_id_start} to {freed_id_end}")
92+
self.assertEqual(freed_id_start, i)
93+
94+
95+
def main():
96+
"""Main function demonstrating usage and running tests"""
97+
print("=== SingleWriterShmRingBuffer Test Suite ===\n")
98+
99+
# Run unit tests
100+
print("Running unit tests...")
101+
unittest.main(argv=[""], exit=False, verbosity=2)
102+
103+
print("\n" + "=" * 50)
104+
print("=== Manual Demo ===\n")
105+
106+
# Manual demonstration
107+
try:
108+
print("Creating ring buffer...")
109+
writer_buffer = SingleWriterShmRingBuffer(data_buffer_size=2048,
110+
create=True)
111+
reader_buffer = SingleWriterShmRingBuffer(*writer_buffer.handle())
112+
113+
print(f"Buffer created with name: {writer_buffer.shared_memory.name}")
114+
115+
# Allocate some buffers
116+
print("\nAllocating buffers...")
117+
address_array = []
118+
for i in range(3):
119+
size = 100 + i * 50
120+
try:
121+
freed_id_start, freed_id_end = writer_buffer.free_buf()
122+
address, monotonic_id = writer_buffer.allocate_buf(size)
123+
address_array.append((address, size, monotonic_id))
124+
125+
# Write some test data
126+
with writer_buffer.access_buf(address) as (data_buf, metadata):
127+
test_message = f"Test message {i}".encode()
128+
data_buf[0:len(test_message)] = test_message
129+
130+
except MemoryError as e:
131+
print(f" Failed to allocate {size} bytes: {e}")
132+
133+
print("\nBuffer state:")
134+
print(f" Data buffer start: {writer_buffer.data_buffer_start}")
135+
print(f" Data buffer end: {writer_buffer.data_buffer_end}")
136+
print(f" Monotonic ID start: {writer_buffer.monotonic_id_start}")
137+
print(f" Monotonic ID end: {writer_buffer.monotonic_id_end}")
138+
print(f" Metadata entries: {len(writer_buffer.metadata)}")
139+
140+
# Try to read back the data
141+
print("\nReading back data...")
142+
for address, size, monotonic_id in address_array:
143+
with reader_buffer.access_buf(address) as (data_buf, metadata):
144+
# Find null terminator or read first 50 chars
145+
data_bytes = bytes(data_buf[0:size])
146+
message = data_bytes.decode()
147+
print(f" ID {monotonic_id}: '{message}'")
148+
149+
except Exception as e:
150+
print(f"Demo error: {e}")
151+
import traceback
152+
153+
traceback.print_exc()
154+
155+
print("\n=== Demo Complete ===")
156+
157+
158+
if __name__ == "__main__":
159+
main()

0 commit comments

Comments
 (0)