-
Notifications
You must be signed in to change notification settings - Fork 281
Asynchicache #977
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Asynchicache #977
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @jinbiaoyu, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request implements a sophisticated hierarchical caching system for prompt KV data in LightLLM, dubbed 'Asynchicache'. The core objective is to significantly enhance the efficiency of prompt processing by leveraging both fast in-memory (GPU/CPU) and larger, persistent disk-based caching. This new architecture allows for intelligent management of prompt prefixes, reducing redundant computations and optimizing memory usage across different components of the inference pipeline. It introduces new services and communication mechanisms to support this multi-tiered caching approach.
Highlights
- Hierarchical Asynchronous Caching (HiRadixCache): This PR introduces a new hierarchical caching system for prompt Key-Value (KV) data, named 'Asynchicache' or 'HiRadixCache'. It combines an in-memory radix tree with a disk-backed KV cache to efficiently manage and reuse prompt prefixes, aiming to reduce recomputation and memory pressure.
- Shared Memory and Disk Integration: New components like
RadixMemoryBuffer
andRadixBufferManager
are added to manage shared memory (both CPU and GPU) for the hierarchical cache. A dedicatedDiskCacheService
is implemented using RPyC and ZeroMQ for asynchronous read/write operations to persistent storage, allowing for larger and more persistent prompt caching. - Enhanced Dynamic Prompt Cache: The existing dynamic prompt cache is significantly extended to leverage this hierarchical structure. It intelligently decides whether to store or retrieve KV data from in-memory (GPU/CPU) or disk, based on cache hits and available resources, improving performance for common and long prompt prefixes.
- New Configuration Options: Command-line arguments (
--use_hiradix_cache
,--hiradix_cache_gpu
,--hiradix_cache_token_num
) are added to enable and configure the hierarchical cache, providing flexibility in deployment and resource utilization. - Inter-Process Communication and Status Tracking: New data structures (
RadixStatus
,ShmReqInfo
,GroupReqInfo
) and ZeroMQ communication patterns are introduced to facilitate seamless asynchronous communication and status tracking between the HTTP server, router, and model inference processes regarding hierarchical cache operations.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command>
or @gemini-code-assist <command>
. Below is a summary of the supported commands.
Feature | Command | Description |
---|---|---|
Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/
folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a hierarchical asynchronous caching system (Asynchicache
/hiradix_cache
) to improve performance, likely for prompt processing. The changes are extensive, adding new services, managers, and data structures, and modifying request handling logic across multiple layers of the application.
While the feature is promising, the current implementation has several critical bugs related to control flow, API usage, and potential race conditions that need to be addressed. I've also noted a significant amount of duplicated code for cache initialization, which should be refactored to improve maintainability. My review provides specific suggestions to fix these issues and improve the overall quality of the code.
lightllm/common/radixmem_manager.py
Outdated
values = values[index * self.chunk_size:] | ||
chunks = chunks[index:] | ||
for i, (hash_val, _) in enumerate(chunks): | ||
if hash not in self.radix_buffer.req_mem_index: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a variable shadowing a built-in function here. The variable hash
is being used, which shadows Python's built-in hash()
function. The intended variable is likely hash_val
from the loop on the preceding line. This is a critical bug that will cause a TypeError
at runtime.
if hash not in self.radix_buffer.req_mem_index: | |
if hash_val not in self.radix_buffer.req_mem_index: |
lightllm/server/core/objs/req.py
Outdated
def is_finished(self): | ||
self.finished == 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The is_finished
method is implemented incorrectly. It currently performs a comparison self.finished == 1
but does not return the result. This will cause the method to always return None
, which will be evaluated as False
in boolean contexts. This is a critical bug.
It should return the boolean result of the comparison.
def is_finished(self): | |
self.finished == 1 | |
def is_finished(self): | |
return self.finished == 1 |
def all_dp_read_ready_or_nocache(self, indexs: List[int]) -> bool: | ||
return np.all(np.array(self.status)[indexs] == self.READ_READY) or np.all(np.array(self.status)[indexs] == self.NOCACHE) | ||
|
||
def all_read_ready_or_nocache(self) -> bool: | ||
return np.all(np.array(self.status) == self.READ_READY) or np.all(np.array(self.status) == self.NOCACHE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic in all_dp_read_ready_or_nocache
and all_read_ready_or_nocache
is flawed. The current implementation checks if all statuses are READ_READY
OR if all statuses are NOCACHE
, but it doesn't correctly handle a mix of these two states for the given indices.
For example, [READ_READY, NOCACHE]
would incorrectly return False
.
The correct approach is to perform an element-wise OR
operation and then check if all resulting elements are true.
def all_dp_read_ready_or_nocache(self, indexs: List[int]) -> bool: | |
return np.all(np.array(self.status)[indexs] == self.READ_READY) or np.all(np.array(self.status)[indexs] == self.NOCACHE) | |
def all_read_ready_or_nocache(self) -> bool: | |
return np.all(np.array(self.status) == self.READ_READY) or np.all(np.array(self.status) == self.NOCACHE) | |
def all_dp_read_ready_or_nocache(self, indexs: List[int]) -> bool: | |
status_array = np.array(self.status)[indexs] | |
return np.all((status_array == self.READ_READY) | (status_array == self.NOCACHE)) | |
def all_read_ready_or_nocache(self) -> bool: | |
status_array = np.array(self.status) | |
return np.all((status_array == self.READ_READY) | (status_array == self.NOCACHE)) |
def can_release(self): | ||
for req in self.group_req_objs.shm_req_objs: | ||
if not req.can_release(): | ||
return False | ||
return True | ||
if self.use_hiradix_cache: | ||
if req.can_release() and req.radix_status.is_finished(): | ||
|
||
return True | ||
else: | ||
if req.can_release(): | ||
return True | ||
return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic in can_release
seems to be inverted. The original implementation checked if all requests were releasable before returning True
. The new logic returns True
if any request is releasable. This is likely a bug and could lead to premature resource release.
The method should only return True
if all requests in self.group_req_objs.shm_req_objs
are ready to be released.
def can_release(self): | |
for req in self.group_req_objs.shm_req_objs: | |
if not req.can_release(): | |
return False | |
return True | |
if self.use_hiradix_cache: | |
if req.can_release() and req.radix_status.is_finished(): | |
return True | |
else: | |
if req.can_release(): | |
return True | |
return False | |
def can_release(self): | |
for req in self.group_req_objs.shm_req_objs: | |
can_release_flag = False | |
if self.use_hiradix_cache: | |
if req.can_release() and req.radix_status.is_finished(): | |
can_release_flag = True | |
else: | |
if req.can_release(): | |
can_release_flag = True | |
if not can_release_flag: | |
return False | |
return True |
) | ||
self.remote_cache_manager.read(read_task) | ||
|
||
self.radix_manager.write(keys=keys[:query_len], values=index.tolist()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to self.radix_manager.write
is incorrect. The method signature in RadixBufferManager
is write(self, tokens: List[int], values: torch.Tensor, start_pos: int)
.
- The
keys
keyword argument is not defined; it should betokens
. - The
values
argument expects atorch.Tensor
, but it's being passed a list (index.tolist()
). - The
start_pos
argument is missing.
This will cause a runtime error.
self.radix_manager.write(keys=keys[:query_len], values=index.tolist()) | |
self.radix_manager.write(tokens=keys[:query_len], values=index, start_pos=0) |
if self.enable_hiradix_cache: | ||
from lightllm.common.radixmem_buffer import RadixMemoryBuffer, init_shared_data, get_shared_data, MemPropties | ||
from lightllm.common.radixmem_manager import RadixBufferManager | ||
mem_propties = MemPropties( | ||
self.hiradix_cache_token_num, | ||
dtype=self.data_type, | ||
head_num=self.config["num_attention_heads"] // self.tp_world_size_, | ||
head_dim=self.config["n_embed"] // self.config["num_attention_heads"], | ||
layer_num=self.config["n_layer"] | ||
) | ||
init_shared_data( | ||
mem_propties=mem_propties, | ||
device="cpu" if not self.hiradix_cache_gpu else "cuda" | ||
) | ||
radix_mem_buffer = RadixMemoryBuffer( | ||
mem_propties, | ||
shared_data=get_shared_data(), | ||
lock=self.radix_lock, | ||
device="cpu" if not self.hiradix_cache_gpu else "cuda" | ||
) | ||
self.radix_manager = RadixBufferManager(radix_buffer=radix_mem_buffer, | ||
radix_mem_data=get_shared_data(), | ||
lock=self.radix_lock) | ||
self.mem_propties = mem_propties | ||
self.shared_mem_data = get_shared_data() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block of code for initializing the hiradix_cache
is duplicated in multiple model files, specifically:
lightllm/models/deepseek2/model.py
lightllm/models/qwen2/model.py
This duplication makes the code harder to maintain and increases the risk of inconsistencies if changes are needed in the future. To improve maintainability, this logic should be refactored into a single, reusable helper method within the TpPartBaseModel
class.
if max_len > tree_node.node_prefix_total_len: | ||
pull_len = max_len - tree_node.node_prefix_total_len | ||
self.disk_cache_match_count.arr[0] += 1 | ||
self.disk_cache_match_ratio.arr[0] = self.disk_cache_match_count.arr[0] / self.total_match_count.arr[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line could cause a ZeroDivisionError
if self.total_match_count.arr[0]
is zero, which can happen on the first request. You should add a check to prevent this.
self.disk_cache_match_ratio.arr[0] = self.disk_cache_match_count.arr[0] / self.total_match_count.arr[0] | |
if self.total_match_count.arr[0] > 0: | |
self.disk_cache_match_ratio.arr[0] = self.disk_cache_match_count.arr[0] / self.total_match_count.arr[0] |
if isinstance(free_index, list): | ||
self.mem_state.numpy()[start:end] = free_index | ||
else: | ||
# 从 gpu 到 cpu 的拷贝操作是流内阻塞操作 | ||
self.mem_state[start:end] = free_index |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a few areas for improvement in this new file for better maintainability and clarity:
- Placeholder Docstrings: The docstring for the
_free
method contains placeholder text like_summary_
and_description_
. These should be filled out to properly document the function's purpose, arguments, and behavior. - Chinese Comments: There are comments in Chinese (e.g., line 121). For consistency and to make the code accessible to a wider audience, it's best to use English for all comments and documentation.
- TODOs: A
TODO
comment exists on line 82. It's good practice to either address these during development or create a ticket to track them for future work.
parser.add_argument("--use_hiradix_cache", action="store_true", help="enable hierachy prompt cache") | ||
parser.add_argument("--hiradix_cache_gpu", action="store_true", help="enable hierachy prompt cache gpu") | ||
parser.add_argument("--hiradix_cache_token_num", type=int, default=None , help="set the number of tokens to use hierachy prompt cache") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a typo in the command-line arguments and their help text. "hierachy" should be spelled "hierarchy". Since these are user-facing arguments, it's important to correct this for clarity and professionalism.
parser.add_argument("--use_hiradix_cache", action="store_true", help="enable hierachy prompt cache") | |
parser.add_argument("--hiradix_cache_gpu", action="store_true", help="enable hierachy prompt cache gpu") | |
parser.add_argument("--hiradix_cache_token_num", type=int, default=None , help="set the number of tokens to use hierachy prompt cache") | |
parser.add_argument("--use_hiradix_cache", action="store_true", help="enable hierarchy prompt cache") | |
parser.add_argument("--hiradix_cache_gpu", action="store_true", help="enable hierarchy prompt cache gpu") | |
parser.add_argument("--hiradix_cache_token_num", type=int, default=None , help="set the number of tokens to use hierarchy prompt cache") |
res = wait_until_ready(t) | ||
|
||
if not res: | ||
self.py_cache_service.az5(t) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method name az5
is highly unconventional and obscure. It's unclear what this method does without looking at the implementation of PyLocalCacheService
. Code should be as self-documenting as possible. Please consider renaming this to something more descriptive or adding a comment explaining its purpose.
No description provided.