diff --git a/core/http/endpoints/openai/realtime.go b/core/http/endpoints/openai/realtime.go index bd55b720d50a..82bcc4061c3f 100644 --- a/core/http/endpoints/openai/realtime.go +++ b/core/http/endpoints/openai/realtime.go @@ -16,6 +16,7 @@ import ( "github.com/mudler/LocalAI/core/application" "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/http/endpoints/openai/types" + "github.com/mudler/LocalAI/core/schema" "github.com/mudler/LocalAI/core/templates" laudio "github.com/mudler/LocalAI/pkg/audio" "github.com/mudler/LocalAI/pkg/functions" @@ -75,8 +76,7 @@ func (s *Session) ToServer() types.ServerSession { OutputAudioFormat: types.AudioFormatPcm16, TurnDetection: s.TurnDetection, InputAudioTranscription: s.InputAudioTranscription, - // TODO: Should be constructed from Functions? - Tools: []types.Tool{}, + Tools: []types.Tool{}, // TODO: ToolChoice // TODO: Temperature // TODO: MaxOutputTokens @@ -144,17 +144,6 @@ type ErrorMessage struct { EventID string `json:"event_id,omitempty"` } -// Define a structure for outgoing messages -type OutgoingMessage struct { - Type string `json:"type"` - Session *Session `json:"session,omitempty"` - Conversation *Conversation `json:"conversation,omitempty"` - Item *Item `json:"item,omitempty"` - Content string `json:"content,omitempty"` - Audio string `json:"audio,omitempty"` - Error *ErrorMessage `json:"error,omitempty"` -} - // Map to store sessions (in-memory) var sessions = make(map[string]*Session) var sessionLock sync.Mutex @@ -279,13 +268,28 @@ func registerRealtime(application *application.Application) func(c *websocket.Co done = make(chan struct{}) ) - vadServerStarted := true - wg.Add(1) - go func() { - defer wg.Done() - conversation := session.Conversations[session.DefaultConversationID] - handleVAD(cfg, evaluator, session, conversation, c, done) - }() + vadServerStarted := false + toggleVAD := func() { + if session.TurnDetection.Type == types.ServerTurnDetectionTypeServerVad && !vadServerStarted { + log.Debug().Msg("Starting VAD goroutine...") + wg.Add(1) + go func() { + defer wg.Done() + conversation := session.Conversations[session.DefaultConversationID] + handleVAD(cfg, evaluator, session, conversation, c, done) + }() + vadServerStarted = true + } else if session.TurnDetection.Type != types.ServerTurnDetectionTypeServerVad && vadServerStarted { + log.Debug().Msg("Stopping VAD goroutine...") + + go func() { + done <- struct{}{} + }() + vadServerStarted = false + } + } + + toggleVAD() for { if _, msg, err = c.ReadMessage(); err != nil { @@ -323,6 +327,8 @@ func registerRealtime(application *application.Application) func(c *websocket.Co continue } + toggleVAD() + sendEvent(c, types.SessionUpdatedEvent{ ServerEventBase: types.ServerEventBase{ EventID: "event_TODO", @@ -352,6 +358,8 @@ func registerRealtime(application *application.Application) func(c *websocket.Co continue } + toggleVAD() + sendEvent(c, types.SessionUpdatedEvent{ ServerEventBase: types.ServerEventBase{ EventID: "event_TODO", @@ -360,24 +368,6 @@ func registerRealtime(application *application.Application) func(c *websocket.Co Session: session.ToServer(), }) - if session.TurnDetection.Type == types.ServerTurnDetectionTypeServerVad && !vadServerStarted { - log.Debug().Msg("Starting VAD goroutine...") - wg.Add(1) - go func() { - defer wg.Done() - conversation := session.Conversations[session.DefaultConversationID] - handleVAD(cfg, evaluator, session, conversation, c, done) - }() - vadServerStarted = true - } else if session.TurnDetection.Type != types.ServerTurnDetectionTypeServerVad && vadServerStarted { - log.Debug().Msg("Stopping VAD goroutine...") - - wg.Add(-1) - go func() { - done <- struct{}{} - }() - vadServerStarted = false - } case types.ClientEventTypeInputAudioBufferAppend: // Handle 'input_audio_buffer.append' if incomingMsg.Audio == "" { @@ -700,7 +690,7 @@ func handleVAD(cfg *config.ModelConfig, evaluator *templates.Evaluator, session EventID: "event_TODO", Type: types.ServerEventTypeInputAudioBufferSpeechStarted, }, - AudioStartMs: time.Now().Sub(startTime).Milliseconds(), + AudioStartMs: time.Since(startTime).Milliseconds(), }) speechStarted = true } @@ -722,7 +712,7 @@ func handleVAD(cfg *config.ModelConfig, evaluator *templates.Evaluator, session EventID: "event_TODO", Type: types.ServerEventTypeInputAudioBufferSpeechStopped, }, - AudioEndMs: time.Now().Sub(startTime).Milliseconds(), + AudioEndMs: time.Since(startTime).Milliseconds(), }) speechStarted = false @@ -772,6 +762,7 @@ func commitUtterance(ctx context.Context, utt []byte, cfg *config.ModelConfig, e f.Sync() + var transcript string if session.InputAudioTranscription != nil { tr, err := session.ModelInterface.Transcribe(ctx, &proto.TranscriptRequest{ Dst: f.Name(), @@ -783,6 +774,7 @@ func commitUtterance(ctx context.Context, utt []byte, cfg *config.ModelConfig, e sendError(c, "transcription_failed", err.Error(), "", "event_TODO") } + transcript = tr.GetText() sendEvent(c, types.ResponseAudioTranscriptDoneEvent{ ServerEventBase: types.ServerEventBase{ Type: types.ServerEventTypeResponseAudioTranscriptDone, @@ -793,43 +785,39 @@ func commitUtterance(ctx context.Context, utt []byte, cfg *config.ModelConfig, e ResponseID: "resp_TODO", OutputIndex: 0, ContentIndex: 0, - Transcript: tr.GetText(), + Transcript: transcript, }) // TODO: Update the prompt with transcription result? } - if !session.TranscriptionOnly { - sendNotImplemented(c, "Commiting items to the conversation not implemented") - } - // TODO: Commit the audio and/or transcribed text to the conversation // Commit logic: create item, broadcast item.created, etc. - // item := &Item{ - // ID: generateItemID(), - // Object: "realtime.item", - // Type: "message", - // Status: "completed", - // Role: "user", - // Content: []ConversationContent{ - // { - // Type: "input_audio", - // Audio: base64.StdEncoding.EncodeToString(utt), - // }, - // }, - // } - // conv.Lock.Lock() - // conv.Items = append(conv.Items, item) - // conv.Lock.Unlock() - // - // - // sendEvent(c, OutgoingMessage{ - // Type: "conversation.item.created", - // Item: item, - // }) - // - // - // // trigger the response generation - // generateResponse(cfg, evaluator, session, conv, ResponseCreate{}, c, websocket.TextMessage) + item := &types.MessageItem{ + ID: generateItemID(), + Type: "message", + Status: "completed", + Role: "user", + Content: []types.MessageContentPart{ + { + Type: types.MessageContentTypeInputAudio, + Audio: base64.StdEncoding.EncodeToString(utt), + Transcript: transcript, + }, + }, + } + conv.Lock.Lock() + conv.Items = append(conv.Items, item) + conv.Lock.Unlock() + + sendEvent(c, types.ConversationItemAddedEvent{ + ServerEventBase: types.ServerEventBase{ + Type: types.ServerEventTypeConversationItemAdded, + }, + Item: *item, + }) + + // trigger the response generation + generateResponse(cfg, evaluator, session, conv, ResponseCreate{}, c, websocket.TextMessage) } func runVAD(ctx context.Context, session *Session, adata []int16) ([]*proto.VADSegment, error) { @@ -854,218 +842,218 @@ func runVAD(ctx context.Context, session *Session, adata []int16) ([]*proto.VADS // TODO: Below needed for normal mode instead of transcription only // Function to generate a response based on the conversation -// func generateResponse(config *config.ModelConfig, evaluator *templates.Evaluator, session *Session, conversation *Conversation, responseCreate ResponseCreate, c *websocket.Conn, mt int) { -// -// log.Debug().Msg("Generating realtime response...") -// -// // Compile the conversation history -// conversation.Lock.Lock() -// var conversationHistory []schema.Message -// var latestUserAudio string -// for _, item := range conversation.Items { -// for _, content := range item.Content { -// switch content.Type { -// case "input_text", "text": -// conversationHistory = append(conversationHistory, schema.Message{ -// Role: string(item.Role), -// StringContent: content.Text, -// Content: content.Text, -// }) -// case "input_audio": -// // We do not to turn to text here the audio result. -// // When generating it later on from the LLM, -// // we will also generate text and return it and store it in the conversation -// // Here we just want to get the user audio if there is any as a new input for the conversation. -// if item.Role == "user" { -// latestUserAudio = content.Audio -// } -// } -// } -// } -// -// conversation.Lock.Unlock() -// -// var generatedText string -// var generatedAudio []byte -// var functionCall *FunctionCall -// var err error -// -// if latestUserAudio != "" { -// // Process the latest user audio input -// decodedAudio, err := base64.StdEncoding.DecodeString(latestUserAudio) -// if err != nil { -// log.Error().Msgf("failed to decode latest user audio: %s", err.Error()) -// sendError(c, "invalid_audio_data", "Failed to decode audio data", "", "") -// return -// } -// -// // Process the audio input and generate a response -// generatedText, generatedAudio, functionCall, err = processAudioResponse(session, decodedAudio) -// if err != nil { -// log.Error().Msgf("failed to process audio response: %s", err.Error()) -// sendError(c, "processing_error", "Failed to generate audio response", "", "") -// return -// } -// } else { -// -// if session.Instructions != "" { -// conversationHistory = append([]schema.Message{{ -// Role: "system", -// StringContent: session.Instructions, -// Content: session.Instructions, -// }}, conversationHistory...) -// } -// -// funcs := session.Functions -// shouldUseFn := len(funcs) > 0 && config.ShouldUseFunctions() -// -// // Allow the user to set custom actions via config file -// // to be "embedded" in each model -// noActionName := "answer" -// noActionDescription := "use this action to answer without performing any action" -// -// if config.FunctionsConfig.NoActionFunctionName != "" { -// noActionName = config.FunctionsConfig.NoActionFunctionName -// } -// if config.FunctionsConfig.NoActionDescriptionName != "" { -// noActionDescription = config.FunctionsConfig.NoActionDescriptionName -// } -// -// if (!config.FunctionsConfig.GrammarConfig.NoGrammar) && shouldUseFn { -// noActionGrammar := functions.Function{ -// Name: noActionName, -// Description: noActionDescription, -// Parameters: map[string]interface{}{ -// "properties": map[string]interface{}{ -// "message": map[string]interface{}{ -// "type": "string", -// "description": "The message to reply the user with", -// }}, -// }, -// } -// -// // Append the no action function -// if !config.FunctionsConfig.DisableNoAction { -// funcs = append(funcs, noActionGrammar) -// } -// -// // Update input grammar -// jsStruct := funcs.ToJSONStructure(config.FunctionsConfig.FunctionNameKey, config.FunctionsConfig.FunctionNameKey) -// g, err := jsStruct.Grammar(config.FunctionsConfig.GrammarOptions()...) -// if err == nil { -// config.Grammar = g -// } -// } -// -// // Generate a response based on text conversation history -// prompt := evaluator.TemplateMessages(conversationHistory, config, funcs, shouldUseFn) -// -// generatedText, functionCall, err = processTextResponse(config, session, prompt) -// if err != nil { -// log.Error().Msgf("failed to process text response: %s", err.Error()) -// sendError(c, "processing_error", "Failed to generate text response", "", "") -// return -// } -// log.Debug().Any("text", generatedText).Msg("Generated text response") -// } -// -// if functionCall != nil { -// // The model wants to call a function -// // Create a function_call item and send it to the client -// item := &Item{ -// ID: generateItemID(), -// Object: "realtime.item", -// Type: "function_call", -// Status: "completed", -// Role: "assistant", -// FunctionCall: functionCall, -// } -// -// // Add item to conversation -// conversation.Lock.Lock() -// conversation.Items = append(conversation.Items, item) -// conversation.Lock.Unlock() -// -// // Send item.created event -// sendEvent(c, OutgoingMessage{ -// Type: "conversation.item.created", -// Item: item, -// }) -// -// // Optionally, you can generate a message to the user indicating the function call -// // For now, we'll assume the client handles the function call and may trigger another response -// -// } else { -// // Send response.stream messages -// if generatedAudio != nil { -// // If generatedAudio is available, send it as audio -// encodedAudio := base64.StdEncoding.EncodeToString(generatedAudio) -// outgoingMsg := OutgoingMessage{ -// Type: "response.stream", -// Audio: encodedAudio, -// } -// sendEvent(c, outgoingMsg) -// } else { -// // Send text response (could be streamed in chunks) -// chunks := splitResponseIntoChunks(generatedText) -// for _, chunk := range chunks { -// outgoingMsg := OutgoingMessage{ -// Type: "response.stream", -// Content: chunk, -// } -// sendEvent(c, outgoingMsg) -// } -// } -// -// // Send response.done message -// sendEvent(c, OutgoingMessage{ -// Type: "response.done", -// }) -// -// // Add the assistant's response to the conversation -// content := []ConversationContent{} -// if generatedAudio != nil { -// content = append(content, ConversationContent{ -// Type: "audio", -// Audio: base64.StdEncoding.EncodeToString(generatedAudio), -// }) -// // Optionally include a text transcript -// if generatedText != "" { -// content = append(content, ConversationContent{ -// Type: "text", -// Text: generatedText, -// }) -// } -// } else { -// content = append(content, ConversationContent{ -// Type: "text", -// Text: generatedText, -// }) -// } -// -// item := &Item{ -// ID: generateItemID(), -// Object: "realtime.item", -// Type: "message", -// Status: "completed", -// Role: "assistant", -// Content: content, -// } -// -// // Add item to conversation -// conversation.Lock.Lock() -// conversation.Items = append(conversation.Items, item) -// conversation.Lock.Unlock() -// -// // Send item.created event -// sendEvent(c, OutgoingMessage{ -// Type: "conversation.item.created", -// Item: item, -// }) -// -// log.Debug().Any("item", item).Msg("Realtime response sent") -// } -// } +func generateResponse(config *config.ModelConfig, evaluator *templates.Evaluator, session *Session, conversation *Conversation, responseCreate ResponseCreate, c *websocket.Conn, mt int) { + + log.Debug().Msg("Generating realtime response...") + + // Compile the conversation history + conversation.Lock.Lock() + var conversationHistory []schema.Message + var latestUserAudio string + for _, item := range conversation.Items { + for _, content := range item.Content { + switch content.Type { + case types.MessageContentTypeInputText, types.MessageContentTypeText: + conversationHistory = append(conversationHistory, schema.Message{ + Role: string(item.Role), + StringContent: content.Text, + Content: content.Text, + }) + case types.MessageContentTypeInputAudio, types.MessageContentTypeAudio: + // We do not to turn to text here the audio result. + // When generating it later on from the LLM, + // we will also generate text and return it and store it in the conversation + // Here we just want to get the user audio if there is any as a new input for the conversation. + if item.Role == "user" { + latestUserAudio = content.Audio + } + } + } + } + + conversation.Lock.Unlock() + + var generatedText string + var generatedAudio []byte + var functionCall *FunctionCall + var err error + + if latestUserAudio != "" { + // Process the latest user audio input + decodedAudio, err := base64.StdEncoding.DecodeString(latestUserAudio) + if err != nil { + log.Error().Msgf("failed to decode latest user audio: %s", err.Error()) + sendError(c, "invalid_audio_data", "Failed to decode audio data", "", "") + return + } + + // Process the audio input and generate a response + generatedText, generatedAudio, functionCall, err = processAudioResponse(session, decodedAudio) + if err != nil { + log.Error().Msgf("failed to process audio response: %s", err.Error()) + sendError(c, "processing_error", "Failed to generate audio response", "", "") + return + } + } else { + + if session.Instructions != "" { + conversationHistory = append([]schema.Message{{ + Role: "system", + StringContent: session.Instructions, + Content: session.Instructions, + }}, conversationHistory...) + } + + funcs := session.Functions + shouldUseFn := len(funcs) > 0 && config.ShouldUseFunctions() + + // Allow the user to set custom actions via config file + // to be "embedded" in each model + noActionName := "answer" + noActionDescription := "use this action to answer without performing any action" + + if config.FunctionsConfig.NoActionFunctionName != "" { + noActionName = config.FunctionsConfig.NoActionFunctionName + } + if config.FunctionsConfig.NoActionDescriptionName != "" { + noActionDescription = config.FunctionsConfig.NoActionDescriptionName + } + + if (!config.FunctionsConfig.GrammarConfig.NoGrammar) && shouldUseFn { + noActionGrammar := functions.Function{ + Name: noActionName, + Description: noActionDescription, + Parameters: map[string]interface{}{ + "properties": map[string]interface{}{ + "message": map[string]interface{}{ + "type": "string", + "description": "The message to reply the user with", + }}, + }, + } + + // Append the no action function + if !config.FunctionsConfig.DisableNoAction { + funcs = append(funcs, noActionGrammar) + } + + // Update input grammar + jsStruct := funcs.ToJSONStructure(config.FunctionsConfig.FunctionNameKey, config.FunctionsConfig.FunctionNameKey) + g, err := jsStruct.Grammar(config.FunctionsConfig.GrammarOptions()...) + if err == nil { + config.Grammar = g + } + } + + // Generate a response based on text conversation history + prompt := evaluator.TemplateMessages(conversationHistory, config, funcs, shouldUseFn) + + generatedText, functionCall, err = processTextResponse(config, session, prompt) + if err != nil { + log.Error().Msgf("failed to process text response: %s", err.Error()) + sendError(c, "processing_error", "Failed to generate text response", "", "") + return + } + log.Debug().Any("text", generatedText).Msg("Generated text response") + } + + if functionCall != nil { + // The model wants to call a function + // Create a function_call item and send it to the client + item := &Item{ + ID: generateItemID(), + Object: "realtime.item", + Type: "function_call", + Status: "completed", + Role: "assistant", + FunctionCall: functionCall, + } + + // Add item to conversation + conversation.Lock.Lock() + conversation.Items = append(conversation.Items, item) + conversation.Lock.Unlock() + + // Send item.created event + sendEvent(c, OutgoingMessage{ + Type: "conversation.item.created", + Item: item, + }) + + // Optionally, you can generate a message to the user indicating the function call + // For now, we'll assume the client handles the function call and may trigger another response + + } else { + // Send response.stream messages + if generatedAudio != nil { + // If generatedAudio is available, send it as audio + encodedAudio := base64.StdEncoding.EncodeToString(generatedAudio) + outgoingMsg := OutgoingMessage{ + Type: "response.stream", + Audio: encodedAudio, + } + sendEvent(c, outgoingMsg) + } else { + // Send text response (could be streamed in chunks) + chunks := splitResponseIntoChunks(generatedText) + for _, chunk := range chunks { + outgoingMsg := OutgoingMessage{ + Type: "response.stream", + Content: chunk, + } + sendEvent(c, outgoingMsg) + } + } + + // Send response.done message + sendEvent(c, OutgoingMessage{ + Type: "response.done", + }) + + // Add the assistant's response to the conversation + content := []ConversationContent{} + if generatedAudio != nil { + content = append(content, ConversationContent{ + Type: "audio", + Audio: base64.StdEncoding.EncodeToString(generatedAudio), + }) + // Optionally include a text transcript + if generatedText != "" { + content = append(content, ConversationContent{ + Type: "text", + Text: generatedText, + }) + } + } else { + content = append(content, ConversationContent{ + Type: "text", + Text: generatedText, + }) + } + + item := &Item{ + ID: generateItemID(), + Object: "realtime.item", + Type: "message", + Status: "completed", + Role: "assistant", + Content: content, + } + + // Add item to conversation + conversation.Lock.Lock() + conversation.Items = append(conversation.Items, item) + conversation.Lock.Unlock() + + // Send item.created event + sendEvent(c, OutgoingMessage{ + Type: "conversation.item.created", + Item: item, + }) + + log.Debug().Any("item", item).Msg("Realtime response sent") + } +} // Function to process text response and detect function calls func processTextResponse(config *config.ModelConfig, session *Session, prompt string) (string, *FunctionCall, error) { diff --git a/core/http/endpoints/openai/types/realtime.go b/core/http/endpoints/openai/types/realtime.go index 2da0600b869c..dd222c10297c 100644 --- a/core/http/endpoints/openai/types/realtime.go +++ b/core/http/endpoints/openai/types/realtime.go @@ -210,10 +210,17 @@ type MessageItem struct { Output string `json:"output,omitempty"` } -type ResponseMessageItem struct { - MessageItem - // The object type, must be "realtime.item". - Object string `json:"object,omitempty"` +func (m *MessageItem) MarshalJSON() ([]byte, error) { + type msgItem MessageItem + v := struct { + *msgItem + Object string `json:"object"` + }{ + msgItem: (*msgItem)(m), + Object: "realtime.Item", + } + return json.Marshal(v) + } type Error struct { @@ -358,7 +365,7 @@ type Response struct { // Additional details about the status. StatusDetails any `json:"status_details,omitempty"` // The list of output items generated by the response. - Output []ResponseMessageItem `json:"output"` + Output []MessageItem `json:"output"` // Usage statistics for the response. Usage *Usage `json:"usage,omitempty"` } @@ -710,6 +717,7 @@ const ( ServerEventTypeInputAudioBufferCleared ServerEventType = "input_audio_buffer.cleared" ServerEventTypeInputAudioBufferSpeechStarted ServerEventType = "input_audio_buffer.speech_started" ServerEventTypeInputAudioBufferSpeechStopped ServerEventType = "input_audio_buffer.speech_stopped" + ServerEventTypeConversationItemAdded ServerEventType = "conversation.item.added" ServerEventTypeConversationItemCreated ServerEventType = "conversation.item.created" ServerEventTypeConversationItemInputAudioTranscriptionCompleted ServerEventType = "conversation.item.input_audio_transcription.completed" ServerEventTypeConversationItemInputAudioTranscriptionFailed ServerEventType = "conversation.item.input_audio_transcription.failed" @@ -827,8 +835,14 @@ type InputAudioBufferSpeechStoppedEvent struct { type ConversationItemCreatedEvent struct { ServerEventBase - PreviousItemID string `json:"previous_item_id,omitempty"` - Item ResponseMessageItem `json:"item"` + PreviousItemID string `json:"previous_item_id,omitempty"` + Item MessageItem `json:"item"` +} + +type ConversationItemAddedEvent struct { + ServerEventBase + PreviousItemID string `json:"previous_item_id,omitempty"` + Item MessageItem `json:"item"` } type ConversationItemInputAudioTranscriptionCompletedEvent struct { @@ -885,7 +899,7 @@ type ResponseOutputItemAddedEvent struct { // The index of the output item in the response. OutputIndex int `json:"output_index"` // The item that was added. - Item ResponseMessageItem `json:"item"` + Item MessageItem `json:"item"` } // ResponseOutputItemDoneEvent is the event for response output item done. @@ -898,7 +912,7 @@ type ResponseOutputItemDoneEvent struct { // The index of the output item in the response. OutputIndex int `json:"output_index"` // The completed item. - Item ResponseMessageItem `json:"item"` + Item MessageItem `json:"item"` } // ResponseContentPartAddedEvent is the event for response content part added.