Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4126,6 +4126,8 @@ def _process_module_builtin_defaults():
"newrelic.hooks.framework_azurefunctions",
"instrument_azure_functions_worker_dispatcher",
)
_process_module_definition("pyzeebe.client.client", "newrelic.hooks.external_pyzeebe", "instrument_pyzeebe_client_client")
_process_module_definition("pyzeebe.worker.job_executor", "newrelic.hooks.external_pyzeebe", "instrument_pyzeebe_worker_job_executor")


def _process_module_entry_points():
Expand Down
6 changes: 6 additions & 0 deletions newrelic/core/attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@
"response.headers.contentType",
"response.status",
"server.address",
"zeebe.client.bpmnProcessId",
"zeebe.client.messageName",
"zeebe.client.correlationKey",
"zeebe.client.messageId",
"zeebe.client.resourceCount",
"zeebe.client.resourceFile",
}

MAX_NUM_USER_ATTRIBUTES = 128
Expand Down
116 changes: 116 additions & 0 deletions newrelic/hooks/external_pyzeebe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import logging

from newrelic.api.application import application_instance
from newrelic.api.web_transaction import WebTransaction
from newrelic.api.function_trace import FunctionTrace
from newrelic.api.transaction import current_transaction
from newrelic.common.object_wrapper import wrap_function_wrapper

_logger = logging.getLogger(__name__)

CLIENT_ATTRIBUTES_DEPLOY_RESOURCE_LOG_MSG = "Exception occurred in PyZeebe instrumentation: Failed to extract resource count/file for method `deploy_resource`. Report this issue to New Relic support."


# Adds client method params as txn or span attributes
def _add_client_input_attributes(method_name, trace, args, kwargs):
bpmn_id = extract_agent_attribute_from_methods(args, kwargs, method_name, ("run_process", "run_process_with_result"), "bpmn_process_id", 0)
if bpmn_id:
trace._add_agent_attribute("zeebe.client.bpmnProcessId", bpmn_id)

msg_name = extract_agent_attribute_from_methods(args, kwargs, method_name, ("publish_message"), "name", 0)
if msg_name:
trace._add_agent_attribute("zeebe.client.messageName", msg_name)

correlation_key = extract_agent_attribute_from_methods(args, kwargs, method_name, ("publish_message"), "correlation_key", 1)
if correlation_key:
trace._add_agent_attribute("zeebe.client.correlationKey", correlation_key)

message_id = extract_agent_attribute_from_methods(args, kwargs, method_name, ("publish_message"), "message_id", 4)
if message_id:
trace._add_agent_attribute("zeebe.client.messageId", message_id)

resource = extract_agent_attribute_from_methods(args, {}, method_name, ("deploy_resource"), None, 0)
if resource:
try:
trace._add_agent_attribute("zeebe.client.resourceFile", resource)
trace._add_agent_attribute("zeebe.client.resourceCount", len(list(args)))
except Exception:
_logger.warning(CLIENT_ATTRIBUTES_DEPLOY_RESOURCE_LOG_MSG, exc_info=True)


def extract_agent_attribute_from_methods(args, kwargs, method_name, methods, param, index):
try:
if method_name in methods:
value = kwargs.get(param)
if not value and args and len(args) > index:
value = args[index]
return value
except Exception:
_logger.warning("Exception occurred in PyZeebe instrumentation: failed to extract %s from %s. Report this issue to New Relic support.", param, method_name, exc_info=True)

# Async wrapper that instruments router/worker annotations`
async def _nr_wrapper_execute_one_job(wrapped, instance, args, kwargs):
job = args[0] if args else kwargs.get("job")
process_id = getattr(job, "bpmn_process_id", None) or "UnknownProcess"
task_type = getattr(job, "type", None) or "UnknownType"
txn_name = f"{process_id}/{task_type}"

with WebTransaction(application_instance(), txn_name, group="ZeebeTask") as txn:
if job is not None:
if hasattr(job, "key"):
txn.add_custom_attribute("zeebe.job.key", job.key)
if hasattr(job, "type"):
txn.add_custom_attribute("zeebe.job.type", job.type)
if hasattr(job, "bpmn_process_id"):
txn.add_custom_attribute("zeebe.job.bpmnProcessId", job.bpmn_process_id)
if hasattr(job, "process_instance_key"):
txn.add_custom_attribute("zeebe.job.processInstanceKey", job.process_instance_key)
if hasattr(job, "element_id"):
txn.add_custom_attribute("zeebe.job.elementId", job.element_id)

return await wrapped(*args, **kwargs)


# Async wrapper that instruments a ZeebeClient method.
def _nr_client_wrapper(method_name):
async def _client_wrapper(wrapped, instance, args, kwargs):
txn = current_transaction()
if not txn:
return await wrapped(*args, **kwargs)

with FunctionTrace(name=method_name, group="ZeebeClient") as trace:
_add_client_input_attributes(method_name, trace, args, kwargs)
return await wrapped(*args, **kwargs)

return _client_wrapper


# Instrument JobExecutor.execute_one_job to create a background transaction per job (invoked from @router.task or @worker.task annotations)
def instrument_pyzeebe_worker_job_executor(module):
if hasattr(module, "JobExecutor"):
wrap_function_wrapper(module, "JobExecutor.execute_one_job", _nr_wrapper_execute_one_job)


# Instrument ZeebeClient methods to trace client calls.
def instrument_pyzeebe_client_client(module):
target_methods = ("run_process", "run_process_with_result", "deploy_resource", "publish_message")

for method_name in target_methods:
if hasattr(module, "ZeebeClient"):
if hasattr(module.ZeebeClient, method_name):
wrap_function_wrapper(module, f"ZeebeClient.{method_name}", _nr_client_wrapper(method_name))
102 changes: 102 additions & 0 deletions tests/external_pyzeebe/_mocks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from types import SimpleNamespace

from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter

# Dummy response objects with only required fields
DummyCreateProcessInstanceResponse = SimpleNamespace(process_instance_key=12345)

DummyCreateProcessInstanceWithResultResponse = SimpleNamespace(
process_instance_key=45678, variables={"result": "success"}
)

DummyDeployResourceResponse = SimpleNamespace(key=67890, deployments=[], tenant_id=None)

DummyPublishMessageResponse = SimpleNamespace(key=99999, tenant_id=None)


# Dummy RPC stub coroutines
async def dummy_create_process_instance(
self, bpmn_process_id: str, variables: dict = None, version: int = -1, tenant_id: str = None # noqa: RUF013
):
"""Simulate ZeebeAdapter.create_process_instance"""
return DummyCreateProcessInstanceResponse


async def dummy_create_process_instance_with_result(
self,
bpmn_process_id: str,
variables: dict = None, # noqa: RUF013
version: int = -1,
timeout: int = 0,
variables_to_fetch=None,
tenant_id: str = None, # noqa: RUF013
):
"""Simulate ZeebeAdapter.create_process_instance_with_result"""
return DummyCreateProcessInstanceWithResultResponse


async def dummy_deploy_resource(*resource_file_path: str, tenant_id: str = None): # noqa: RUF013
"""Simulate ZeebeAdapter.deploy_resource"""
# Create dummy deployment metadata for each provided resource path
deployments = [
SimpleNamespace(
resource_name=str(path),
bpmn_process_id="dummy_process",
process_definition_key=123,
version=1,
tenant_id=tenant_id if tenant_id is not None else None,
)
for path in resource_file_path
]
# Create a dummy response with a list of deployments
return SimpleNamespace(
deployment_key=333333, deployments=deployments, tenant_id=tenant_id if tenant_id is not None else None
)


async def dummy_publish_message(
self,
name: str,
correlation_key: str,
variables: dict = None, # noqa: RUF013
time_to_live_in_milliseconds: int = 60000,
message_id: str = None, # noqa: RUF013
tenant_id: str = None, # noqa: RUF013
):
"""Simulate ZeebeAdapter.publish_message"""
# Return the dummy response (contains message key)
return SimpleNamespace(key=999999, tenant_id=tenant_id if tenant_id is not None else None)


async def dummy_complete_job(self, job_key: int, variables: dict):
"""Simulate JobExecutor.complete_job"""
self._last_complete = {"job_key": job_key, "variables": variables}
return None


class DummyZeebeAdapter(ZeebeAdapter):
"""Simulate a ZeebeAdapter so JobExecutor can be instatiated w/o gRPC channel"""

def __init__(self):
self.completed_job_key = None
self.completed_job_vars = None

async def complete_job(self, job_key: int, variables: dict):
self.completed_job_key = job_key
self.completed_job_vars = variables
return None
29 changes: 29 additions & 0 deletions tests/external_pyzeebe/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from testing_support.fixture.event_loop import event_loop as loop
from testing_support.fixtures import collector_agent_registration_fixture, collector_available_fixture

_default_settings = {
"package_reporting.enabled": False, # Turn off package reporting for testing as it causes slow downs.
"transaction_tracer.explain_threshold": 0.0,
"transaction_tracer.transaction_threshold": 0.0,
"transaction_tracer.stack_trace_threshold": 0.0,
"debug.log_data_collector_payloads": True,
"debug.record_transaction_failure": True,
}

collector_agent_registration = collector_agent_registration_fixture(
app_name="Python Agent Test (external_pyzeebe)", default_settings=_default_settings
)
28 changes: 28 additions & 0 deletions tests/external_pyzeebe/test.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<definitions
xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI"
xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC"
xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI"
targetNamespace="http://example.com/bpmn"
xsi:schemaLocation="http://www.omg.org/spec/BPMN/20100524/MODEL BPMN20.xsd">

<!-- Define the process with a unique id and name -->
<process id="dummyProcess" name="Dummy Process" isExecutable="true">
<!-- Start Event -->
<startEvent id="StartEvent_1" name="Start"/>

<!-- A simple Service Task representing work -->
<serviceTask id="ServiceTask_1" name="Perform Work"/>

<!-- End Event -->
<endEvent id="EndEvent_1" name="End"/>

<!-- Sequence Flows connecting Start → Service Task → End -->
<sequenceFlow id="Flow_1" sourceRef="StartEvent_1" targetRef="ServiceTask_1"/>
<sequenceFlow id="Flow_2" sourceRef="ServiceTask_1" targetRef="EndEvent_1"/>
</process>

<!-- (Optional) BPMNDiagram section can be added for graphical layout, but omitted here -->
</definitions>
Loading