Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
46ae49a
Add ZMQLogger
mc-dorzo Jan 6, 2025
27afe2d
Add ZMQLogger and test for subscribing
mc-dorzo Jan 7, 2025
eb22312
Add CLI for streaming logs
mc-dorzo Jan 7, 2025
3b162f8
Add [GuidelineProposer] to logs in guideline proposer
mc-dorzo Jan 7, 2025
58b8a64
Add [MessageEventGenerator] to logs in messge_event_generator
mc-dorzo Jan 7, 2025
cf960c2
Add extra bracketed prefix to tool caller logs
mc-dorzo Jan 7, 2025
ffc6faa
Add extra bracketed prefix to guideline proposer logs
mc-dorzo Jan 7, 2025
f9de41d
Fix default port for log serer
mc-dorzo Jan 7, 2025
4904c0f
Add CompositeLogger and implement in server
mc-dorzo Jan 7, 2025
79ee39f
Add pattern as filters
mc-dorzo Jan 7, 2025
62c7eea
Add patterns as list of strings. now we can use streaming logs i.e. `…
mc-dorzo Jan 8, 2025
2c668b4
Remove douple brackets in ZMQLogger
mc-dorzo Jan 8, 2025
8c57dc8
Move exmaples.modules to tests.modules
mc-dorzo Jan 14, 2025
73ddc75
- Changed logging functionality from non-blocking to blocking, as we …
mc-dorzo Jan 14, 2025
d7f5e01
Use ExitStack in CompositeLogger operation functionality for safer co…
mc-dorzo Jan 14, 2025
d6d243c
Fix e2e test to support moving examples.modules to tests.modules
mc-dorzo Jan 14, 2025
2e7217c
Polishing loggings conventions
mc-dorzo Jan 14, 2025
e6eaed1
update CHANGELOG
mc-dorzo Jan 14, 2025
773845b
update changlog
mc-dorzo Jan 14, 2025
2ce78ed
Add union and intersection pattern flags
mc-dorzo Jan 15, 2025
6d22677
Polishing logs
mc-dorzo Jan 15, 2025
444274e
update changelog
mc-dorzo Jan 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,43 @@

All notable changes to Parlant will be documented here.

### Added
- **CLI Logging Commands:**
- Added a log command group to the CLI, allowing users to view logs in real-time with optional filters.
- Support for filtering logs by specific components:
- `--guideline-proposer` (`-g`) to filter logs by `[GuidelineProposer]`.
- `--tool-caller` (`-t`) to filter logs by `[ToolCaller]`.
- `--message-event-generator` (`-m`) to filter logs by `[MessageEventGenerator]`.
- Added support for combining patterns:
- **Intersection Patterns (`--and`, `-a`)**:
- Specify patterns to intersect with. Multiple patterns can be added by repeating the option.
- Example:
```bash
parlant log --and pattern1 --and pattern2
```
- **Union Patterns (`--or`, `-o`)**:
- Specify patterns to union by. Multiple patterns can be added by repeating the option.
- Example:
```bash
parlant log --or pattern1 --or pattern2
```
- Example usage:
- Stream all logs:
```bash
parlant log
```
- Filter logs by `[GuidelineProposer]`:
```bash
parlant log --guideline-proposer
```
- Combine filters with patterns:
```bash
parlant log -g -a pattern1 -o pattern2
```
- **Log Port Configuration:**
- The log server port can now be configured using the `PARLANT_LOG_PORT` environment variable.


## [Unreleased]
- Add shot creation helper functions under Shot
- Fix mistake in coherence checker few shots
Expand Down
1,107 changes: 619 additions & 488 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ types-aiofiles = "^24.1.0.20240626"
types-croniter = "^4.0.0.20241030"
types-jsonschema = "^4.22.0.20240610"
uvicorn = "^0.32.1"
pyzmq = "^26.2.0"

# --- optional packages ---
anthropic = {version = "^0.37.1", optional = true}
Expand Down
138 changes: 134 additions & 4 deletions src/parlant/bin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import asyncio
import os
from urllib.parse import urlparse
import click
import click.shell_completion
import click_completion # type: ignore
Expand All @@ -29,7 +30,7 @@
from rich.text import Text
import sys
import time
from typing import Any, Optional, cast
from typing import Any, Iterator, Optional, cast

from parlant.client import ParlantClient
from parlant.client.core import ApiError
Expand Down Expand Up @@ -62,6 +63,7 @@
Tag,
ConsumptionOffsetsUpdateParams,
)
import zmq

INDENT = " "

Expand Down Expand Up @@ -846,6 +848,50 @@ def delete_tag(ctx: click.Context, tag_id: str) -> None:
client = cast(ParlantClient, ctx.obj.client)
client.tags.delete(tag_id=tag_id)

@staticmethod
def stream_logs(
ctx: click.Context,
union_patterns: list[str],
intersection_patterns: list[str],
) -> Iterator[dict[str, Any]]:
context = zmq.Context.instance()
sub_socket = context.socket(zmq.SUB)

try:
sub_socket.connect(f"{ctx.obj.log_server_address}")

sub_socket.setsockopt_string(zmq.SUBSCRIBE, "")

rich.print(Text("Streaming logs...", style="bold yellow"))

while True:
message = cast(dict[str, Any], sub_socket.recv_json())
if Actions._log_entry_matches(message, union_patterns, intersection_patterns):
yield message
except KeyboardInterrupt:
rich.print(Text("Log streaming interrupted by user.", style="bold red"))
finally:
sub_socket.close()

@staticmethod
def _log_entry_matches(
log_entry: dict[str, Any], union_patterns: list[str], intersection_patterns: list[str]
) -> bool:
message = log_entry.get("message", "")

if not union_patterns and not intersection_patterns:
return True

if not union_patterns:
return all(p in message for p in intersection_patterns)

if not intersection_patterns:
return any(p in message for p in union_patterns)

return any(p in message for p in union_patterns) and all(
p in message for p in intersection_patterns
)


def raise_for_status_with_detail(response: requests.Response) -> None:
"""Raises :class:`HTTPError`, if one occurred, with detail if exists
Expand Down Expand Up @@ -1966,6 +2012,21 @@ def delete_tag(ctx: click.Context, tag_id: str) -> None:
Interface.write_error(f"Error: {type(e).__name__}: {e}")
set_exit_status(1)

@staticmethod
def stream_logs(
ctx: click.Context,
union_patterns: list[str],
intersection_patterns: list[str],
) -> None:
try:
for log in Actions.stream_logs(ctx, union_patterns, intersection_patterns):
level = log.get("level", "")
message = log.get("message", "")
correlation_id = log.get("correlation_id", "")
rich.print(f"[{level}] [{correlation_id}] {message}")
except Exception as e:
Interface.write_error(f"Error while streaming logs: {e}")


async def async_main() -> None:
click_completion.init() # type: ignore
Expand All @@ -1974,8 +2035,9 @@ async def async_main() -> None:
class Config:
server_address: str
client: ParlantClient
log_server_address: str

@click.group
@click.group()
@click.option(
"-s",
"--server",
Expand All @@ -1984,10 +2046,26 @@ class Config:
metavar="ADDRESS[:PORT]",
default="http://localhost:8800",
)
@click.option(
"--log-port",
type=int,
help="Port for the log server",
metavar="LOG_PORT",
default=8799,
)
@click.pass_context
def cli(ctx: click.Context, server: str) -> None:
def cli(ctx: click.Context, server: str, log_port: int) -> None:
if not ctx.obj:
ctx.obj = Config(server_address=server, client=ParlantClient(base_url=server))
server_url = urlparse(server)
server_host = server_url.hostname or "localhost"

log_server_address = f"tcp://{server_host}:{log_port}"

ctx.obj = Config(
server_address=server,
client=ParlantClient(base_url=server),
log_server_address=log_server_address,
)

@cli.command(help="Generate shell completion code")
@click.option("-s", "--shell", type=str, help="Shell program (bash, zsh, etc.)", required=True)
Expand Down Expand Up @@ -3038,6 +3116,58 @@ def tag_update(ctx: click.Context, id: str, name: str) -> None:
def tag_delete(ctx: click.Context, id: str) -> None:
Interface.delete_tag(ctx, id)

@cli.command(
"log",
help="Stream server logs",
)
@click.option(
"--guideline-proposer", "-g", is_flag=True, help="Filter logs by [GuidelineProposer]"
)
@click.option("--tool-caller", "-t", is_flag=True, help="Filter logs by [ToolCaller]")
@click.option(
"--message-event-generator",
"-m",
is_flag=True,
help="Filter logs by [MessageEventGenerator]",
)
@click.option(
"-a",
"--and",
"intersection_patterns",
multiple=True,
default=[],
metavar="PATTERN",
help="Patterns to intersect with. May be specified multiple times.",
)
@click.option(
"-o",
"--or",
"union_patterns",
multiple=True,
default=[],
metavar="PATTERN",
help="Patterns to union by. May be specified multiple times.",
)
@click.pass_context
def log_view(
ctx: click.Context,
guideline_proposer: bool,
tool_caller: bool,
message_event_generator: bool,
intersection_patterns: tuple[str],
union_patterns: tuple[str],
) -> None:
union_pattern_list = list(union_patterns)

if guideline_proposer:
union_pattern_list.append("[GuidelineProposer]")
if tool_caller:
union_pattern_list.append("[ToolCaller]")
if message_event_generator:
union_pattern_list.append("[MessageEventGenerator]")

Interface.stream_logs(ctx, union_pattern_list, list(intersection_patterns))

@cli.command(
"help",
context_settings={"ignore_unknown_options": True},
Expand Down
14 changes: 12 additions & 2 deletions src/parlant/bin/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
GuidelineConnectionProposer,
GuidelineConnectionPropositionsSchema,
)
from parlant.core.logging import FileLogger, LogLevel, Logger
from parlant.core.logging import CompositeLogger, FileLogger, ZMQLogger, LogLevel, Logger
from parlant.core.application import Application
from parlant.core.version import VERSION

Expand All @@ -126,7 +126,10 @@
sys.path.append(".")

CORRELATOR = ContextualCorrelator()

PARLANT_LOG_PORT = int(os.environ.get("PARLANT_LOG_PORT", "8799"))
LOGGER = FileLogger(PARLANT_HOME_DIR / "parlant.log", CORRELATOR, LogLevel.INFO)

BACKGROUND_TASK_SERVICE = BackgroundTaskService(LOGGER)


Expand Down Expand Up @@ -251,7 +254,14 @@ async def setup_container(nlp_service_name: str) -> AsyncIterator[Container]:
c = Container()

c[ContextualCorrelator] = CORRELATOR
c[Logger] = LOGGER
c[Logger] = CompositeLogger(
[
LOGGER,
await EXIT_STACK.enter_async_context(
ZMQLogger(CORRELATOR, LogLevel.INFO, port=PARLANT_LOG_PORT)
),
]
)

agents_db = await EXIT_STACK.enter_async_context(
JSONFileDocumentDatabase(LOGGER, PARLANT_HOME_DIR / "agents.json")
Expand Down
25 changes: 18 additions & 7 deletions src/parlant/core/engines/alpha/guideline_proposer.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ async def propose_guidelines(
)

with self._logger.operation(
f"Guideline proposal ({len(guidelines)} guidelines processed in {len(batches)} batches)"
f"[GuidelineProposer] Evaluating {len(guidelines)} guidelines ({len(batches)} batches)"
):
batch_tasks = [
self._process_guideline_batch(
Expand Down Expand Up @@ -220,21 +220,32 @@ async def _process_guideline_batch(
)

with self._logger.operation(
f"Guideline evaluation batch ({len(guidelines_dict)} guidelines)"
f"[GuidelineProposer] Evaluating batch ({len(guidelines_dict)} guidelines)"
):
propositions_generation_response = await self._schematic_generator.generate(
self._logger.debug(f"[GuidelineProposer][Prompt] {prompt}")

inference = await self._schematic_generator.generate(
prompt=prompt,
hints={"temperature": 0.3},
)

self._logger.debug(
f"[GuidelineProposer][Completion] {inference.content.model_dump_json(indent=2)}"
)

propositions = []

for proposition in propositions_generation_response.content.checks:
for proposition in inference.content.checks:
guideline = guidelines_dict[int(proposition.guideline_number)]
Copy link
Contributor

@kichanyurd kichanyurd Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add 2 logs here:

[GuidelineProposer][Prompt] (debug)
[GuidelineProposer][Completion] (debug) --> output all of the response's content as indented JSON


self._logger.debug(
f'Guideline evaluation for "when {guideline.content.condition} then {guideline.content.action}":\n' # noqa
f' Score: {proposition.applies_score}/10; Condition rationale: "{proposition.condition_application_rationale}"; Continuous: {proposition.guideline_is_continuous} ; Previously applied: "{proposition.guideline_previously_applied}"; Should reapply: {proposition.guideline_should_reapply};Re-application rationale: "{proposition.guideline_previously_applied_rationale}"'
f'[GuidelineProposer][Evaluation] "When {guideline.content.condition}; Then {guideline.content.action}":\n'
f" Score: {proposition.applies_score}/10\n"
f' ConditionRationale: "{proposition.condition_application_rationale}"\n'
f" IsContinuous: {proposition.guideline_is_continuous}\n"
f' PreviouslyApplied: "{proposition.guideline_previously_applied}"\n'
f" ShouldReapply: {proposition.guideline_should_reapply}\n"
f' ReapplicationRationale: "{proposition.guideline_previously_applied_rationale}"'
)

if (proposition.applies_score >= 6) and (
Expand All @@ -258,7 +269,7 @@ async def _process_guideline_batch(
)
)

return propositions_generation_response.info, propositions
return inference.info, propositions

async def shots(self) -> Sequence[GuidelinePropositionShot]:
return await shot_collection.list()
Expand Down
Loading
Loading