Skip to content

Conversation

jinbiaoyu
Copy link
Collaborator

No description provided.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @jinbiaoyu, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request implements a sophisticated hierarchical caching system for prompt KV data in LightLLM, dubbed 'Asynchicache'. The core objective is to significantly enhance the efficiency of prompt processing by leveraging both fast in-memory (GPU/CPU) and larger, persistent disk-based caching. This new architecture allows for intelligent management of prompt prefixes, reducing redundant computations and optimizing memory usage across different components of the inference pipeline. It introduces new services and communication mechanisms to support this multi-tiered caching approach.

Highlights

  • Hierarchical Asynchronous Caching (HiRadixCache): This PR introduces a new hierarchical caching system for prompt Key-Value (KV) data, named 'Asynchicache' or 'HiRadixCache'. It combines an in-memory radix tree with a disk-backed KV cache to efficiently manage and reuse prompt prefixes, aiming to reduce recomputation and memory pressure.
  • Shared Memory and Disk Integration: New components like RadixMemoryBuffer and RadixBufferManager are added to manage shared memory (both CPU and GPU) for the hierarchical cache. A dedicated DiskCacheService is implemented using RPyC and ZeroMQ for asynchronous read/write operations to persistent storage, allowing for larger and more persistent prompt caching.
  • Enhanced Dynamic Prompt Cache: The existing dynamic prompt cache is significantly extended to leverage this hierarchical structure. It intelligently decides whether to store or retrieve KV data from in-memory (GPU/CPU) or disk, based on cache hits and available resources, improving performance for common and long prompt prefixes.
  • New Configuration Options: Command-line arguments (--use_hiradix_cache, --hiradix_cache_gpu, --hiradix_cache_token_num) are added to enable and configure the hierarchical cache, providing flexibility in deployment and resource utilization.
  • Inter-Process Communication and Status Tracking: New data structures (RadixStatus, ShmReqInfo, GroupReqInfo) and ZeroMQ communication patterns are introduced to facilitate seamless asynchronous communication and status tracking between the HTTP server, router, and model inference processes regarding hierarchical cache operations.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a hierarchical asynchronous caching system (Asynchicache/hiradix_cache) to improve performance, likely for prompt processing. The changes are extensive, adding new services, managers, and data structures, and modifying request handling logic across multiple layers of the application.

While the feature is promising, the current implementation has several critical bugs related to control flow, API usage, and potential race conditions that need to be addressed. I've also noted a significant amount of duplicated code for cache initialization, which should be refactored to improve maintainability. My review provides specific suggestions to fix these issues and improve the overall quality of the code.

values = values[index * self.chunk_size:]
chunks = chunks[index:]
for i, (hash_val, _) in enumerate(chunks):
if hash not in self.radix_buffer.req_mem_index:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There is a variable shadowing a built-in function here. The variable hash is being used, which shadows Python's built-in hash() function. The intended variable is likely hash_val from the loop on the preceding line. This is a critical bug that will cause a TypeError at runtime.

Suggested change
if hash not in self.radix_buffer.req_mem_index:
if hash_val not in self.radix_buffer.req_mem_index:

Comment on lines 93 to 94
def is_finished(self):
self.finished == 1

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The is_finished method is implemented incorrectly. It currently performs a comparison self.finished == 1 but does not return the result. This will cause the method to always return None, which will be evaluated as False in boolean contexts. This is a critical bug.

It should return the boolean result of the comparison.

Suggested change
def is_finished(self):
self.finished == 1
def is_finished(self):
return self.finished == 1

Comment on lines +118 to +122
def all_dp_read_ready_or_nocache(self, indexs: List[int]) -> bool:
return np.all(np.array(self.status)[indexs] == self.READ_READY) or np.all(np.array(self.status)[indexs] == self.NOCACHE)

def all_read_ready_or_nocache(self) -> bool:
return np.all(np.array(self.status) == self.READ_READY) or np.all(np.array(self.status) == self.NOCACHE)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The logic in all_dp_read_ready_or_nocache and all_read_ready_or_nocache is flawed. The current implementation checks if all statuses are READ_READY OR if all statuses are NOCACHE, but it doesn't correctly handle a mix of these two states for the given indices.

For example, [READ_READY, NOCACHE] would incorrectly return False.

The correct approach is to perform an element-wise OR operation and then check if all resulting elements are true.

Suggested change
def all_dp_read_ready_or_nocache(self, indexs: List[int]) -> bool:
return np.all(np.array(self.status)[indexs] == self.READ_READY) or np.all(np.array(self.status)[indexs] == self.NOCACHE)
def all_read_ready_or_nocache(self) -> bool:
return np.all(np.array(self.status) == self.READ_READY) or np.all(np.array(self.status) == self.NOCACHE)
def all_dp_read_ready_or_nocache(self, indexs: List[int]) -> bool:
status_array = np.array(self.status)[indexs]
return np.all((status_array == self.READ_READY) | (status_array == self.NOCACHE))
def all_read_ready_or_nocache(self) -> bool:
status_array = np.array(self.status)
return np.all((status_array == self.READ_READY) | (status_array == self.NOCACHE))

Comment on lines 720 to +729
def can_release(self):
for req in self.group_req_objs.shm_req_objs:
if not req.can_release():
return False
return True
if self.use_hiradix_cache:
if req.can_release() and req.radix_status.is_finished():

return True
else:
if req.can_release():
return True
return False

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The logic in can_release seems to be inverted. The original implementation checked if all requests were releasable before returning True. The new logic returns True if any request is releasable. This is likely a bug and could lead to premature resource release.

The method should only return True if all requests in self.group_req_objs.shm_req_objs are ready to be released.

Suggested change
def can_release(self):
for req in self.group_req_objs.shm_req_objs:
if not req.can_release():
return False
return True
if self.use_hiradix_cache:
if req.can_release() and req.radix_status.is_finished():
return True
else:
if req.can_release():
return True
return False
def can_release(self):
for req in self.group_req_objs.shm_req_objs:
can_release_flag = False
if self.use_hiradix_cache:
if req.can_release() and req.radix_status.is_finished():
can_release_flag = True
else:
if req.can_release():
can_release_flag = True
if not can_release_flag:
return False
return True

)
self.remote_cache_manager.read(read_task)

self.radix_manager.write(keys=keys[:query_len], values=index.tolist())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The call to self.radix_manager.write is incorrect. The method signature in RadixBufferManager is write(self, tokens: List[int], values: torch.Tensor, start_pos: int).

  1. The keys keyword argument is not defined; it should be tokens.
  2. The values argument expects a torch.Tensor, but it's being passed a list (index.tolist()).
  3. The start_pos argument is missing.

This will cause a runtime error.

Suggested change
self.radix_manager.write(keys=keys[:query_len], values=index.tolist())
self.radix_manager.write(tokens=keys[:query_len], values=index, start_pos=0)

Comment on lines 181 to 205
if self.enable_hiradix_cache:
from lightllm.common.radixmem_buffer import RadixMemoryBuffer, init_shared_data, get_shared_data, MemPropties
from lightllm.common.radixmem_manager import RadixBufferManager
mem_propties = MemPropties(
self.hiradix_cache_token_num,
dtype=self.data_type,
head_num=self.config["num_attention_heads"] // self.tp_world_size_,
head_dim=self.config["n_embed"] // self.config["num_attention_heads"],
layer_num=self.config["n_layer"]
)
init_shared_data(
mem_propties=mem_propties,
device="cpu" if not self.hiradix_cache_gpu else "cuda"
)
radix_mem_buffer = RadixMemoryBuffer(
mem_propties,
shared_data=get_shared_data(),
lock=self.radix_lock,
device="cpu" if not self.hiradix_cache_gpu else "cuda"
)
self.radix_manager = RadixBufferManager(radix_buffer=radix_mem_buffer,
radix_mem_data=get_shared_data(),
lock=self.radix_lock)
self.mem_propties = mem_propties
self.shared_mem_data = get_shared_data()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This block of code for initializing the hiradix_cache is duplicated in multiple model files, specifically:

  • lightllm/models/deepseek2/model.py
  • lightllm/models/qwen2/model.py

This duplication makes the code harder to maintain and increases the risk of inconsistencies if changes are needed in the future. To improve maintainability, this logic should be refactored into a single, reusable helper method within the TpPartBaseModel class.

if max_len > tree_node.node_prefix_total_len:
pull_len = max_len - tree_node.node_prefix_total_len
self.disk_cache_match_count.arr[0] += 1
self.disk_cache_match_ratio.arr[0] = self.disk_cache_match_count.arr[0] / self.total_match_count.arr[0]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This line could cause a ZeroDivisionError if self.total_match_count.arr[0] is zero, which can happen on the first request. You should add a check to prevent this.

Suggested change
self.disk_cache_match_ratio.arr[0] = self.disk_cache_match_count.arr[0] / self.total_match_count.arr[0]
if self.total_match_count.arr[0] > 0:
self.disk_cache_match_ratio.arr[0] = self.disk_cache_match_count.arr[0] / self.total_match_count.arr[0]

Comment on lines +118 to +122
if isinstance(free_index, list):
self.mem_state.numpy()[start:end] = free_index
else:
# 从 gpu 到 cpu 的拷贝操作是流内阻塞操作
self.mem_state[start:end] = free_index

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There are a few areas for improvement in this new file for better maintainability and clarity:

  1. Placeholder Docstrings: The docstring for the _free method contains placeholder text like _summary_ and _description_. These should be filled out to properly document the function's purpose, arguments, and behavior.
  2. Chinese Comments: There are comments in Chinese (e.g., line 121). For consistency and to make the code accessible to a wider audience, it's best to use English for all comments and documentation.
  3. TODOs: A TODO comment exists on line 82. It's good practice to either address these during development or create a ticket to track them for future work.

Comment on lines +223 to +225
parser.add_argument("--use_hiradix_cache", action="store_true", help="enable hierachy prompt cache")
parser.add_argument("--hiradix_cache_gpu", action="store_true", help="enable hierachy prompt cache gpu")
parser.add_argument("--hiradix_cache_token_num", type=int, default=None , help="set the number of tokens to use hierachy prompt cache")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There's a typo in the command-line arguments and their help text. "hierachy" should be spelled "hierarchy". Since these are user-facing arguments, it's important to correct this for clarity and professionalism.

Suggested change
parser.add_argument("--use_hiradix_cache", action="store_true", help="enable hierachy prompt cache")
parser.add_argument("--hiradix_cache_gpu", action="store_true", help="enable hierachy prompt cache gpu")
parser.add_argument("--hiradix_cache_token_num", type=int, default=None , help="set the number of tokens to use hierachy prompt cache")
parser.add_argument("--use_hiradix_cache", action="store_true", help="enable hierarchy prompt cache")
parser.add_argument("--hiradix_cache_gpu", action="store_true", help="enable hierarchy prompt cache gpu")
parser.add_argument("--hiradix_cache_token_num", type=int, default=None , help="set the number of tokens to use hierarchy prompt cache")

res = wait_until_ready(t)

if not res:
self.py_cache_service.az5(t)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The method name az5 is highly unconventional and obscure. It's unclear what this method does without looking at the implementation of PyLocalCacheService. Code should be as self-documenting as possible. Please consider renaming this to something more descriptive or adding a comment explaining its purpose.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants