Skip to content

Commit 851c690

Browse files
authored
Merge pull request #226 from emcie-co/feature/mc-514-consume-logs-from-client-side
Feature/mc 514 consume logs from client side
2 parents 3f10966 + 444274e commit 851c690

File tree

13 files changed

+1030
-529
lines changed

13 files changed

+1030
-529
lines changed

CHANGELOG.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,43 @@
22

33
All notable changes to Parlant will be documented here.
44

5+
### Added
6+
- **CLI Logging Commands:**
7+
- Added a log command group to the CLI, allowing users to view logs in real-time with optional filters.
8+
- Support for filtering logs by specific components:
9+
- `--guideline-proposer` (`-g`) to filter logs by `[GuidelineProposer]`.
10+
- `--tool-caller` (`-t`) to filter logs by `[ToolCaller]`.
11+
- `--message-event-generator` (`-m`) to filter logs by `[MessageEventGenerator]`.
12+
- Added support for combining patterns:
13+
- **Intersection Patterns (`--and`, `-a`)**:
14+
- Specify patterns to intersect with. Multiple patterns can be added by repeating the option.
15+
- Example:
16+
```bash
17+
parlant log --and pattern1 --and pattern2
18+
```
19+
- **Union Patterns (`--or`, `-o`)**:
20+
- Specify patterns to union by. Multiple patterns can be added by repeating the option.
21+
- Example:
22+
```bash
23+
parlant log --or pattern1 --or pattern2
24+
```
25+
- Example usage:
26+
- Stream all logs:
27+
```bash
28+
parlant log
29+
```
30+
- Filter logs by `[GuidelineProposer]`:
31+
```bash
32+
parlant log --guideline-proposer
33+
```
34+
- Combine filters with patterns:
35+
```bash
36+
parlant log -g -a pattern1 -o pattern2
37+
```
38+
- **Log Port Configuration:**
39+
- The log server port can now be configured using the `PARLANT_LOG_PORT` environment variable.
40+
41+
542
## [Unreleased]
643
- Add shot creation helper functions under Shot
744
- Fix mistake in coherence checker few shots

poetry.lock

Lines changed: 619 additions & 488 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ types-aiofiles = "^24.1.0.20240626"
5252
types-croniter = "^4.0.0.20241030"
5353
types-jsonschema = "^4.22.0.20240610"
5454
uvicorn = "^0.32.1"
55+
pyzmq = "^26.2.0"
5556

5657
# --- optional packages ---
5758
anthropic = {version = "^0.37.1", optional = true}

src/parlant/bin/client.py

Lines changed: 134 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import asyncio
1818
import os
19+
from urllib.parse import urlparse
1920
import click
2021
import click.shell_completion
2122
import click_completion # type: ignore
@@ -29,7 +30,7 @@
2930
from rich.text import Text
3031
import sys
3132
import time
32-
from typing import Any, Optional, cast
33+
from typing import Any, Iterator, Optional, cast
3334

3435
from parlant.client import ParlantClient
3536
from parlant.client.core import ApiError
@@ -62,6 +63,7 @@
6263
Tag,
6364
ConsumptionOffsetsUpdateParams,
6465
)
66+
import zmq
6567

6668
INDENT = " "
6769

@@ -846,6 +848,50 @@ def delete_tag(ctx: click.Context, tag_id: str) -> None:
846848
client = cast(ParlantClient, ctx.obj.client)
847849
client.tags.delete(tag_id=tag_id)
848850

851+
@staticmethod
852+
def stream_logs(
853+
ctx: click.Context,
854+
union_patterns: list[str],
855+
intersection_patterns: list[str],
856+
) -> Iterator[dict[str, Any]]:
857+
context = zmq.Context.instance()
858+
sub_socket = context.socket(zmq.SUB)
859+
860+
try:
861+
sub_socket.connect(f"{ctx.obj.log_server_address}")
862+
863+
sub_socket.setsockopt_string(zmq.SUBSCRIBE, "")
864+
865+
rich.print(Text("Streaming logs...", style="bold yellow"))
866+
867+
while True:
868+
message = cast(dict[str, Any], sub_socket.recv_json())
869+
if Actions._log_entry_matches(message, union_patterns, intersection_patterns):
870+
yield message
871+
except KeyboardInterrupt:
872+
rich.print(Text("Log streaming interrupted by user.", style="bold red"))
873+
finally:
874+
sub_socket.close()
875+
876+
@staticmethod
877+
def _log_entry_matches(
878+
log_entry: dict[str, Any], union_patterns: list[str], intersection_patterns: list[str]
879+
) -> bool:
880+
message = log_entry.get("message", "")
881+
882+
if not union_patterns and not intersection_patterns:
883+
return True
884+
885+
if not union_patterns:
886+
return all(p in message for p in intersection_patterns)
887+
888+
if not intersection_patterns:
889+
return any(p in message for p in union_patterns)
890+
891+
return any(p in message for p in union_patterns) and all(
892+
p in message for p in intersection_patterns
893+
)
894+
849895

850896
def raise_for_status_with_detail(response: requests.Response) -> None:
851897
"""Raises :class:`HTTPError`, if one occurred, with detail if exists
@@ -1966,6 +2012,21 @@ def delete_tag(ctx: click.Context, tag_id: str) -> None:
19662012
Interface.write_error(f"Error: {type(e).__name__}: {e}")
19672013
set_exit_status(1)
19682014

2015+
@staticmethod
2016+
def stream_logs(
2017+
ctx: click.Context,
2018+
union_patterns: list[str],
2019+
intersection_patterns: list[str],
2020+
) -> None:
2021+
try:
2022+
for log in Actions.stream_logs(ctx, union_patterns, intersection_patterns):
2023+
level = log.get("level", "")
2024+
message = log.get("message", "")
2025+
correlation_id = log.get("correlation_id", "")
2026+
rich.print(f"[{level}] [{correlation_id}] {message}")
2027+
except Exception as e:
2028+
Interface.write_error(f"Error while streaming logs: {e}")
2029+
19692030

19702031
async def async_main() -> None:
19712032
click_completion.init() # type: ignore
@@ -1974,8 +2035,9 @@ async def async_main() -> None:
19742035
class Config:
19752036
server_address: str
19762037
client: ParlantClient
2038+
log_server_address: str
19772039

1978-
@click.group
2040+
@click.group()
19792041
@click.option(
19802042
"-s",
19812043
"--server",
@@ -1984,10 +2046,26 @@ class Config:
19842046
metavar="ADDRESS[:PORT]",
19852047
default="http://localhost:8800",
19862048
)
2049+
@click.option(
2050+
"--log-port",
2051+
type=int,
2052+
help="Port for the log server",
2053+
metavar="LOG_PORT",
2054+
default=8799,
2055+
)
19872056
@click.pass_context
1988-
def cli(ctx: click.Context, server: str) -> None:
2057+
def cli(ctx: click.Context, server: str, log_port: int) -> None:
19892058
if not ctx.obj:
1990-
ctx.obj = Config(server_address=server, client=ParlantClient(base_url=server))
2059+
server_url = urlparse(server)
2060+
server_host = server_url.hostname or "localhost"
2061+
2062+
log_server_address = f"tcp://{server_host}:{log_port}"
2063+
2064+
ctx.obj = Config(
2065+
server_address=server,
2066+
client=ParlantClient(base_url=server),
2067+
log_server_address=log_server_address,
2068+
)
19912069

19922070
@cli.command(help="Generate shell completion code")
19932071
@click.option("-s", "--shell", type=str, help="Shell program (bash, zsh, etc.)", required=True)
@@ -3038,6 +3116,58 @@ def tag_update(ctx: click.Context, id: str, name: str) -> None:
30383116
def tag_delete(ctx: click.Context, id: str) -> None:
30393117
Interface.delete_tag(ctx, id)
30403118

3119+
@cli.command(
3120+
"log",
3121+
help="Stream server logs",
3122+
)
3123+
@click.option(
3124+
"--guideline-proposer", "-g", is_flag=True, help="Filter logs by [GuidelineProposer]"
3125+
)
3126+
@click.option("--tool-caller", "-t", is_flag=True, help="Filter logs by [ToolCaller]")
3127+
@click.option(
3128+
"--message-event-generator",
3129+
"-m",
3130+
is_flag=True,
3131+
help="Filter logs by [MessageEventGenerator]",
3132+
)
3133+
@click.option(
3134+
"-a",
3135+
"--and",
3136+
"intersection_patterns",
3137+
multiple=True,
3138+
default=[],
3139+
metavar="PATTERN",
3140+
help="Patterns to intersect with. May be specified multiple times.",
3141+
)
3142+
@click.option(
3143+
"-o",
3144+
"--or",
3145+
"union_patterns",
3146+
multiple=True,
3147+
default=[],
3148+
metavar="PATTERN",
3149+
help="Patterns to union by. May be specified multiple times.",
3150+
)
3151+
@click.pass_context
3152+
def log_view(
3153+
ctx: click.Context,
3154+
guideline_proposer: bool,
3155+
tool_caller: bool,
3156+
message_event_generator: bool,
3157+
intersection_patterns: tuple[str],
3158+
union_patterns: tuple[str],
3159+
) -> None:
3160+
union_pattern_list = list(union_patterns)
3161+
3162+
if guideline_proposer:
3163+
union_pattern_list.append("[GuidelineProposer]")
3164+
if tool_caller:
3165+
union_pattern_list.append("[ToolCaller]")
3166+
if message_event_generator:
3167+
union_pattern_list.append("[MessageEventGenerator]")
3168+
3169+
Interface.stream_logs(ctx, union_pattern_list, list(intersection_patterns))
3170+
30413171
@cli.command(
30423172
"help",
30433173
context_settings={"ignore_unknown_options": True},

src/parlant/bin/server.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@
105105
GuidelineConnectionProposer,
106106
GuidelineConnectionPropositionsSchema,
107107
)
108-
from parlant.core.logging import FileLogger, LogLevel, Logger
108+
from parlant.core.logging import CompositeLogger, FileLogger, ZMQLogger, LogLevel, Logger
109109
from parlant.core.application import Application
110110
from parlant.core.version import VERSION
111111

@@ -126,7 +126,10 @@
126126
sys.path.append(".")
127127

128128
CORRELATOR = ContextualCorrelator()
129+
130+
PARLANT_LOG_PORT = int(os.environ.get("PARLANT_LOG_PORT", "8799"))
129131
LOGGER = FileLogger(PARLANT_HOME_DIR / "parlant.log", CORRELATOR, LogLevel.INFO)
132+
130133
BACKGROUND_TASK_SERVICE = BackgroundTaskService(LOGGER)
131134

132135

@@ -251,7 +254,14 @@ async def setup_container(nlp_service_name: str) -> AsyncIterator[Container]:
251254
c = Container()
252255

253256
c[ContextualCorrelator] = CORRELATOR
254-
c[Logger] = LOGGER
257+
c[Logger] = CompositeLogger(
258+
[
259+
LOGGER,
260+
await EXIT_STACK.enter_async_context(
261+
ZMQLogger(CORRELATOR, LogLevel.INFO, port=PARLANT_LOG_PORT)
262+
),
263+
]
264+
)
255265

256266
agents_db = await EXIT_STACK.enter_async_context(
257267
JSONFileDocumentDatabase(LOGGER, PARLANT_HOME_DIR / "agents.json")

src/parlant/core/engines/alpha/guideline_proposer.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ async def propose_guidelines(
121121
)
122122

123123
with self._logger.operation(
124-
f"Guideline proposal ({len(guidelines)} guidelines processed in {len(batches)} batches)"
124+
f"[GuidelineProposer] Evaluating {len(guidelines)} guidelines ({len(batches)} batches)"
125125
):
126126
batch_tasks = [
127127
self._process_guideline_batch(
@@ -220,21 +220,32 @@ async def _process_guideline_batch(
220220
)
221221

222222
with self._logger.operation(
223-
f"Guideline evaluation batch ({len(guidelines_dict)} guidelines)"
223+
f"[GuidelineProposer] Evaluating batch ({len(guidelines_dict)} guidelines)"
224224
):
225-
propositions_generation_response = await self._schematic_generator.generate(
225+
self._logger.debug(f"[GuidelineProposer][Prompt] {prompt}")
226+
227+
inference = await self._schematic_generator.generate(
226228
prompt=prompt,
227229
hints={"temperature": 0.3},
228230
)
229231

232+
self._logger.debug(
233+
f"[GuidelineProposer][Completion] {inference.content.model_dump_json(indent=2)}"
234+
)
235+
230236
propositions = []
231237

232-
for proposition in propositions_generation_response.content.checks:
238+
for proposition in inference.content.checks:
233239
guideline = guidelines_dict[int(proposition.guideline_number)]
234240

235241
self._logger.debug(
236-
f'Guideline evaluation for "when {guideline.content.condition} then {guideline.content.action}":\n' # noqa
237-
f' Score: {proposition.applies_score}/10; Condition rationale: "{proposition.condition_application_rationale}"; Continuous: {proposition.guideline_is_continuous} ; Previously applied: "{proposition.guideline_previously_applied}"; Should reapply: {proposition.guideline_should_reapply};Re-application rationale: "{proposition.guideline_previously_applied_rationale}"'
242+
f'[GuidelineProposer][Evaluation] "When {guideline.content.condition}; Then {guideline.content.action}":\n'
243+
f" Score: {proposition.applies_score}/10\n"
244+
f' ConditionRationale: "{proposition.condition_application_rationale}"\n'
245+
f" IsContinuous: {proposition.guideline_is_continuous}\n"
246+
f' PreviouslyApplied: "{proposition.guideline_previously_applied}"\n'
247+
f" ShouldReapply: {proposition.guideline_should_reapply}\n"
248+
f' ReapplicationRationale: "{proposition.guideline_previously_applied_rationale}"'
238249
)
239250

240251
if (proposition.applies_score >= 6) and (
@@ -258,7 +269,7 @@ async def _process_guideline_batch(
258269
)
259270
)
260271

261-
return propositions_generation_response.info, propositions
272+
return inference.info, propositions
262273

263274
async def shots(self) -> Sequence[GuidelinePropositionShot]:
264275
return await shot_collection.list()

0 commit comments

Comments
 (0)