Skip to content

Commit 39a0964

Browse files
authored
feat(mcp): enables remote mcp (#2836)
1 parent 3b213e8 commit 39a0964

File tree

10 files changed

+871
-52
lines changed

10 files changed

+871
-52
lines changed

Cargo.lock

Lines changed: 50 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ winnow = "=0.6.2"
129129
winreg = "0.55.0"
130130
schemars = "1.0.4"
131131
jsonschema = "0.30.0"
132-
rmcp = { version = "0.6.0", features = ["client", "transport-child-process"] }
132+
rmcp = { version = "0.6.3", features = ["client", "transport-sse-client-reqwest", "reqwest", "transport-streamable-http-client-reqwest", "transport-child-process", "tower", "auth"] }
133133

134134
[workspace.lints.rust]
135135
future_incompatible = "warn"

crates/chat-cli/src/cli/chat/server_messenger.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ pub enum UpdateEventMessage {
4444
result: Result<ListResourceTemplatesResult>,
4545
peer: Option<Peer<RoleClient>>,
4646
},
47+
OauthLink {
48+
server_name: String,
49+
link: String,
50+
},
4751
InitStart {
4852
server_name: String,
4953
},
@@ -146,6 +150,17 @@ impl Messenger for ServerMessenger {
146150
.map_err(|e| MessengerError::Custom(e.to_string()))?)
147151
}
148152

153+
async fn send_oauth_link(&self, link: String) -> MessengerResult {
154+
Ok(self
155+
.update_event_sender
156+
.send(UpdateEventMessage::OauthLink {
157+
server_name: self.server_name.clone(),
158+
link,
159+
})
160+
.await
161+
.map_err(|e| MessengerError::Custom(e.to_string()))?)
162+
}
163+
149164
async fn send_init_msg(&self) -> MessengerResult {
150165
Ok(self
151166
.update_event_sender

crates/chat-cli/src/cli/chat/tool_manager.rs

Lines changed: 91 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ use crate::database::settings::Setting;
9090
use crate::mcp_client::messenger::Messenger;
9191
use crate::mcp_client::{
9292
InitializedMcpClient,
93+
InnerService,
9394
McpClientService,
9495
};
9596
use crate::os::Os;
@@ -137,6 +138,11 @@ enum LoadingMsg {
137138
/// This is sent when all tool initialization is complete or when the application is shutting
138139
/// down.
139140
Terminate { still_loading: Vec<String> },
141+
/// Indicates that a server requires user authentication and provides a sign-in link.
142+
/// This message is used to notify the user about authentication requirements for MCP servers
143+
/// that need OAuth or other authentication methods. Contains the server name and the
144+
/// authentication message (typically a URL or instructions).
145+
SignInNotice { name: String },
140146
}
141147

142148
/// Used to denote the loading outcome associated with a server.
@@ -630,19 +636,29 @@ impl ToolManager {
630636
let server_name_clone = server_name.clone();
631637
tokio::spawn(async move {
632638
match handle.await {
633-
Ok(Ok(client)) => match client.cancel().await {
634-
Ok(_) => info!("Server {server_name_clone} evicted due to agent swap"),
635-
Err(e) => error!("Server {server_name_clone} has failed to cancel: {e}"),
639+
Ok(Ok(client)) => {
640+
let InnerService::Original(client) = client.inner_service else {
641+
unreachable!();
642+
};
643+
match client.cancel().await {
644+
Ok(_) => info!("Server {server_name_clone} evicted due to agent swap"),
645+
Err(e) => error!("Server {server_name_clone} has failed to cancel: {e}"),
646+
}
636647
},
637648
Ok(Err(_)) | Err(_) => {
638649
error!("Server {server_name_clone} has failed to cancel");
639650
},
640651
}
641652
});
642653
},
643-
InitializedMcpClient::Ready(running_service) => match running_service.cancel().await {
644-
Ok(_) => info!("Server {server_name} evicted due to agent swap"),
645-
Err(e) => error!("Server {server_name} has failed to cancel: {e}"),
654+
InitializedMcpClient::Ready(running_service) => {
655+
let InnerService::Original(client) = running_service.inner_service else {
656+
unreachable!();
657+
};
658+
match client.cancel().await {
659+
Ok(_) => info!("Server {server_name} evicted due to agent swap"),
660+
Err(e) => error!("Server {server_name} has failed to cancel: {e}"),
661+
}
646662
},
647663
}
648664
}
@@ -869,17 +885,16 @@ impl ToolManager {
869885
});
870886
};
871887

872-
let running_service = (*client.get_running_service().await.map_err(|e| ToolResult {
888+
let running_service = client.get_running_service().await.map_err(|e| ToolResult {
873889
tool_use_id: value.id.clone(),
874890
content: vec![ToolResultContentBlock::Text(format!("Mcp tool client not ready: {e}"))],
875891
status: ToolResultStatus::Error,
876-
})?)
877-
.clone();
892+
})?;
878893

879894
Tool::Custom(CustomTool {
880895
name: tool_name.to_owned(),
881896
server_name: server_name.to_owned(),
882-
client: running_service,
897+
client: running_service.clone(),
883898
params: value.args.as_object().cloned(),
884899
})
885900
},
@@ -1170,6 +1185,15 @@ fn spawn_display_task(
11701185
execute!(output, style::Print("\n"),)?;
11711186
break;
11721187
},
1188+
LoadingMsg::SignInNotice { name } => {
1189+
execute!(
1190+
output,
1191+
cursor::MoveToColumn(0),
1192+
cursor::MoveUp(1),
1193+
terminal::Clear(terminal::ClearType::CurrentLine),
1194+
)?;
1195+
queue_oauth_message(&name, &mut output)?;
1196+
},
11731197
},
11741198
Err(_e) => {
11751199
spinner_logo_idx = (spinner_logo_idx + 1) % SPINNER_CHARS.len();
@@ -1595,6 +1619,35 @@ fn spawn_orchestrator_task(
15951619
},
15961620
UpdateEventMessage::ListResourcesResult { .. } => {},
15971621
UpdateEventMessage::ResourceTemplatesListResult { .. } => {},
1622+
UpdateEventMessage::OauthLink { server_name, link } => {
1623+
let mut buf_writer = BufWriter::new(&mut *record_temp_buf);
1624+
let msg = eyre::eyre!(link);
1625+
let _ = queue_oauth_message_with_link(server_name.as_str(), &msg, &mut buf_writer);
1626+
let _ = buf_writer.flush();
1627+
drop(buf_writer);
1628+
let record_str = String::from_utf8_lossy(record_temp_buf).to_string();
1629+
let record = LoadingRecord::Warn(record_str.clone());
1630+
load_record
1631+
.lock()
1632+
.await
1633+
.entry(server_name.clone())
1634+
.and_modify(|load_record| {
1635+
load_record.push(record.clone());
1636+
})
1637+
.or_insert(vec![record]);
1638+
if let Some(sender) = &loading_status_sender {
1639+
let msg = LoadingMsg::SignInNotice {
1640+
name: server_name.clone(),
1641+
};
1642+
if let Err(e) = sender.send(msg).await {
1643+
warn!(
1644+
"Error sending update message to display task: {:?}\nAssume display task has completed",
1645+
e
1646+
);
1647+
loading_status_sender.take();
1648+
}
1649+
}
1650+
},
15981651
UpdateEventMessage::InitStart { server_name, .. } => {
15991652
pending.write().await.insert(server_name.clone());
16001653
loading_servers.insert(server_name, std::time::Instant::now());
@@ -1876,6 +1929,34 @@ fn queue_failure_message(
18761929
)?)
18771930
}
18781931

1932+
fn queue_oauth_message(name: &str, output: &mut impl Write) -> eyre::Result<()> {
1933+
Ok(queue!(
1934+
output,
1935+
style::SetForegroundColor(style::Color::Yellow),
1936+
style::Print("⚠ "),
1937+
style::SetForegroundColor(style::Color::Blue),
1938+
style::Print(name),
1939+
style::ResetColor,
1940+
style::Print(" requires OAuth authentication. Use /mcp to see the auth link\n"),
1941+
)?)
1942+
}
1943+
1944+
fn queue_oauth_message_with_link(name: &str, msg: &eyre::Report, output: &mut impl Write) -> eyre::Result<()> {
1945+
Ok(queue!(
1946+
output,
1947+
style::SetForegroundColor(style::Color::Yellow),
1948+
style::Print("⚠ "),
1949+
style::SetForegroundColor(style::Color::Blue),
1950+
style::Print(name),
1951+
style::ResetColor,
1952+
style::Print(" requires OAuth authentication. Follow this link to proceed: \n"),
1953+
style::SetForegroundColor(style::Color::Yellow),
1954+
style::Print(msg),
1955+
style::ResetColor,
1956+
style::Print("\n")
1957+
)?)
1958+
}
1959+
18791960
fn queue_warn_message(name: &str, msg: &eyre::Report, time: &str, output: &mut impl Write) -> eyre::Result<()> {
18801961
Ok(queue!(
18811962
output,

crates/chat-cli/src/cli/chat/tools/custom_tool.rs

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use crossterm::{
77
style,
88
};
99
use eyre::Result;
10-
use rmcp::RoleClient;
1110
use rmcp::model::CallToolRequestParam;
1211
use schemars::JsonSchema;
1312
use serde::{
@@ -23,14 +22,39 @@ use crate::cli::agent::{
2322
};
2423
use crate::cli::chat::CONTINUATION_LINE;
2524
use crate::cli::chat::token_counter::TokenCounter;
25+
use crate::mcp_client::RunningService;
2626
use crate::os::Os;
2727
use crate::util::MCP_SERVER_TOOL_DELIMITER;
2828
use crate::util::pattern_matching::matches_any_pattern;
2929

30-
// TODO: support http transport type
30+
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, JsonSchema)]
31+
#[serde(rename_all = "camelCase")]
32+
pub enum TransportType {
33+
/// Standard input/output transport (default)
34+
Stdio,
35+
/// HTTP transport for web-based communication
36+
Http,
37+
}
38+
39+
impl Default for TransportType {
40+
fn default() -> Self {
41+
Self::Stdio
42+
}
43+
}
44+
3145
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, JsonSchema)]
3246
pub struct CustomToolConfig {
47+
/// The type of transport the mcp server is expecting
48+
#[serde(default)]
49+
pub r#type: TransportType,
50+
/// The URL endpoint for HTTP-based MCP servers
51+
#[serde(default)]
52+
pub url: String,
53+
/// HTTP headers to include when communicating with HTTP-based MCP servers
54+
#[serde(default)]
55+
pub headers: HashMap<String, String>,
3356
/// The command string used to initialize the mcp server
57+
#[serde(default)]
3458
pub command: String,
3559
/// A list of arguments to be used to run the command with
3660
#[serde(default)]
@@ -64,20 +88,20 @@ pub struct CustomTool {
6488
/// prefixed to the tool name when presented to the model for disambiguation.
6589
pub server_name: String,
6690
/// Reference to the client that manages communication with the tool's server process.
67-
pub client: rmcp::Peer<RoleClient>,
91+
pub client: RunningService,
6892
/// Optional parameters to pass to the tool when invoking the method.
6993
/// Structured as a JSON value to accommodate various parameter types and structures.
7094
pub params: Option<serde_json::Map<String, serde_json::Value>>,
7195
}
7296

7397
impl CustomTool {
74-
pub async fn invoke(&self, _os: &Os, _updates: impl Write) -> Result<InvokeOutput> {
98+
pub async fn invoke(&self, _os: &Os, _updates: &mut impl Write) -> Result<InvokeOutput> {
7599
let params = CallToolRequestParam {
76100
name: Cow::from(self.name.clone()),
77101
arguments: self.params.clone(),
78102
};
79103

80-
let resp = self.client.call_tool(params).await?;
104+
let resp = self.client.call_tool(params.clone()).await?;
81105

82106
if resp.is_error.is_none_or(|v| !v) {
83107
Ok(InvokeOutput {

0 commit comments

Comments
 (0)