-
Notifications
You must be signed in to change notification settings - Fork 21
deduplicate optimizations better #733
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 2 commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
95a38d3
deduplicate optimizations better
misrasaurabh1 a831ee3
fix a bug
misrasaurabh1 47f4d76
Update codeflash/code_utils/deduplicate_code.py
misrasaurabh1 6754b1f
optimize performance
misrasaurabh1 2792c67
Merge remote-tracking branch 'origin/deduplicate-better' into dedupli…
misrasaurabh1 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,235 @@ | ||
import ast | ||
import hashlib | ||
from typing import Dict, Set | ||
|
||
|
||
class VariableNormalizer(ast.NodeTransformer): | ||
"""Normalizes only local variable names in AST to canonical forms like var_0, var_1, etc. | ||
Preserves function names, class names, parameters, built-ins, and imported names. | ||
""" | ||
|
||
def __init__(self): | ||
self.var_counter = 0 | ||
self.var_mapping: Dict[str, str] = {} | ||
self.scope_stack = [] | ||
self.builtins = set(dir(__builtins__)) | ||
self.imports: Set[str] = set() | ||
self.global_vars: Set[str] = set() | ||
self.nonlocal_vars: Set[str] = set() | ||
self.parameters: Set[str] = set() # Track function parameters | ||
|
||
def enter_scope(self): | ||
"""Enter a new scope (function/class)""" | ||
self.scope_stack.append( | ||
{"var_mapping": dict(self.var_mapping), "var_counter": self.var_counter, "parameters": set(self.parameters)} | ||
) | ||
|
||
def exit_scope(self): | ||
"""Exit current scope and restore parent scope""" | ||
if self.scope_stack: | ||
scope = self.scope_stack.pop() | ||
self.var_mapping = scope["var_mapping"] | ||
self.var_counter = scope["var_counter"] | ||
self.parameters = scope["parameters"] | ||
|
||
def get_normalized_name(self, name: str) -> str: | ||
"""Get or create normalized name for a variable""" | ||
# Don't normalize if it's a builtin, import, global, nonlocal, or parameter | ||
if ( | ||
name in self.builtins | ||
or name in self.imports | ||
or name in self.global_vars | ||
or name in self.nonlocal_vars | ||
or name in self.parameters | ||
): | ||
return name | ||
|
||
# Only normalize local variables | ||
if name not in self.var_mapping: | ||
self.var_mapping[name] = f"var_{self.var_counter}" | ||
self.var_counter += 1 | ||
return self.var_mapping[name] | ||
|
||
def visit_Import(self, node): | ||
"""Track imported names""" | ||
for alias in node.names: | ||
name = alias.asname if alias.asname else alias.name | ||
self.imports.add(name.split(".")[0]) | ||
return node | ||
|
||
def visit_ImportFrom(self, node): | ||
"""Track imported names from modules""" | ||
for alias in node.names: | ||
name = alias.asname if alias.asname else alias.name | ||
self.imports.add(name) | ||
misrasaurabh1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return node | ||
|
||
def visit_Global(self, node): | ||
"""Track global variable declarations""" | ||
for name in node.names: | ||
self.global_vars.add(name) | ||
misrasaurabh1 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
return node | ||
|
||
def visit_Nonlocal(self, node): | ||
"""Track nonlocal variable declarations""" | ||
for name in node.names: | ||
self.nonlocal_vars.add(name) | ||
return node | ||
|
||
def visit_FunctionDef(self, node): | ||
"""Process function but keep function name and parameters unchanged""" | ||
self.enter_scope() | ||
|
||
# Track all parameters (don't modify them) | ||
for arg in node.args.args: | ||
self.parameters.add(arg.arg) | ||
if node.args.vararg: | ||
self.parameters.add(node.args.vararg.arg) | ||
if node.args.kwarg: | ||
self.parameters.add(node.args.kwarg.arg) | ||
for arg in node.args.kwonlyargs: | ||
self.parameters.add(arg.arg) | ||
|
||
# Visit function body | ||
node = self.generic_visit(node) | ||
self.exit_scope() | ||
return node | ||
|
||
def visit_AsyncFunctionDef(self, node): | ||
"""Handle async functions same as regular functions""" | ||
return self.visit_FunctionDef(node) | ||
|
||
def visit_ClassDef(self, node): | ||
"""Process class but keep class name unchanged""" | ||
self.enter_scope() | ||
node = self.generic_visit(node) | ||
self.exit_scope() | ||
return node | ||
|
||
def visit_Name(self, node): | ||
"""Normalize variable names in Name nodes""" | ||
if isinstance(node.ctx, (ast.Store, ast.Del)): | ||
# For assignments and deletions, check if we should normalize | ||
if ( | ||
node.id not in self.builtins | ||
and node.id not in self.imports | ||
and node.id not in self.parameters | ||
and node.id not in self.global_vars | ||
and node.id not in self.nonlocal_vars | ||
): | ||
node.id = self.get_normalized_name(node.id) | ||
elif isinstance(node.ctx, ast.Load): | ||
# For loading, use existing mapping if available | ||
if node.id in self.var_mapping: | ||
node.id = self.var_mapping[node.id] | ||
return node | ||
|
||
def visit_ExceptHandler(self, node): | ||
"""Normalize exception variable names""" | ||
if node.name: | ||
node.name = self.get_normalized_name(node.name) | ||
return self.generic_visit(node) | ||
|
||
def visit_comprehension(self, node): | ||
"""Normalize comprehension target variables""" | ||
# Create new scope for comprehension | ||
old_mapping = dict(self.var_mapping) | ||
old_counter = self.var_counter | ||
|
||
# Process the comprehension | ||
node = self.generic_visit(node) | ||
|
||
# Restore scope | ||
self.var_mapping = old_mapping | ||
self.var_counter = old_counter | ||
return node | ||
|
||
def visit_For(self, node): | ||
"""Handle for loop target variables""" | ||
# The target in a for loop is a local variable that should be normalized | ||
return self.generic_visit(node) | ||
|
||
def visit_With(self, node): | ||
"""Handle with statement as variables""" | ||
return self.generic_visit(node) | ||
|
||
|
||
def normalize_code(code: str, remove_docstrings: bool = True) -> str: | ||
"""Normalize Python code by parsing, cleaning, and normalizing only variable names. | ||
Function names, class names, and parameters are preserved. | ||
|
||
Args: | ||
code: Python source code as string | ||
remove_docstrings: Whether to remove docstrings | ||
|
||
Returns: | ||
Normalized code as string | ||
|
||
""" | ||
try: | ||
# Parse the code | ||
tree = ast.parse(code) | ||
|
||
# Remove docstrings if requested | ||
if remove_docstrings: | ||
remove_docstrings_from_ast(tree) | ||
|
||
# Normalize variable names | ||
normalizer = VariableNormalizer() | ||
normalized_tree = normalizer.visit(tree) | ||
|
||
# Fix missing locations in the AST | ||
ast.fix_missing_locations(normalized_tree) | ||
|
||
# Unparse back to code | ||
return ast.unparse(normalized_tree) | ||
except SyntaxError as e: | ||
msg = f"Invalid Python syntax: {e}" | ||
raise ValueError(msg) from e | ||
|
||
|
||
def remove_docstrings_from_ast(node): | ||
"""Remove docstrings from AST nodes.""" | ||
# Process all nodes in the tree, but avoid recursion | ||
for current_node in ast.walk(node): | ||
if isinstance(current_node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef, ast.Module)): | ||
if ( | ||
current_node.body | ||
and isinstance(current_node.body[0], ast.Expr) | ||
and isinstance(current_node.body[0].value, ast.Constant) | ||
and isinstance(current_node.body[0].value.value, str) | ||
): | ||
current_node.body = current_node.body[1:] | ||
|
||
|
||
def get_code_fingerprint(code: str) -> str: | ||
"""Generate a fingerprint for normalized code. | ||
|
||
Args: | ||
code: Python source code | ||
|
||
Returns: | ||
SHA-256 hash of normalized code | ||
|
||
""" | ||
normalized = normalize_code(code) | ||
return hashlib.sha256(normalized.encode()).hexdigest() | ||
|
||
|
||
def are_codes_duplicate(code1: str, code2: str) -> bool: | ||
"""Check if two code segments are duplicates after normalization. | ||
|
||
Args: | ||
code1: First code segment | ||
code2: Second code segment | ||
|
||
Returns: | ||
True if codes are structurally identical (ignoring local variable names) | ||
|
||
""" | ||
try: | ||
normalized1 = normalize_code(code1) | ||
normalized2 = normalize_code(code2) | ||
return normalized1 == normalized2 | ||
except Exception: | ||
return False |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.