@@ -11,18 +11,18 @@ import (
11
11
"net"
12
12
"os"
13
13
"strings"
14
+ "sync"
14
15
"time"
15
16
17
+ "github.com/ipfs/go-log"
16
18
"github.com/libp2p/go-libp2p/core/peer"
17
19
"github.com/mudler/LocalAI/pkg/utils"
20
+ "github.com/mudler/edgevpn/pkg/config"
18
21
"github.com/mudler/edgevpn/pkg/node"
19
22
"github.com/mudler/edgevpn/pkg/protocol"
23
+ "github.com/mudler/edgevpn/pkg/services"
20
24
"github.com/mudler/edgevpn/pkg/types"
21
25
"github.com/phayes/freeport"
22
-
23
- "github.com/ipfs/go-log"
24
- "github.com/mudler/edgevpn/pkg/config"
25
- "github.com/mudler/edgevpn/pkg/services"
26
26
zlog "github.com/rs/zerolog/log"
27
27
28
28
"github.com/mudler/edgevpn/pkg/logger"
@@ -34,6 +34,11 @@ func GenerateToken() string {
34
34
return newData.Base64()
35
35
}
36
36
37
+ func nodeID() string {
38
+ hostname, _ := os.Hostname()
39
+ return hostname
40
+ }
41
+
37
42
func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, service string) error {
38
43
39
44
zlog.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr)
@@ -135,6 +140,15 @@ func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) {
135
140
io.Copy(dst, src)
136
141
}
137
142
143
+ var availableNodes = []NodeData{}
144
+ var mu sync.Mutex
145
+
146
+ func GetAvailableNodes() []NodeData {
147
+ mu.Lock()
148
+ defer mu.Unlock()
149
+ return availableNodes
150
+ }
151
+
138
152
// This is the main of the server (which keeps the env variable updated)
139
153
// This starts a goroutine that keeps LLAMACPP_GRPC_SERVERS updated with the discovered services
140
154
func LLamaCPPRPCServerDiscoverer(ctx context.Context, token string) error {
@@ -151,19 +165,22 @@ func LLamaCPPRPCServerDiscoverer(ctx context.Context, token string) error {
151
165
zlog.Error().Msg("Discoverer stopped")
152
166
return
153
167
case tunnel := <-tunnels:
154
-
155
- totalTunnels = append(totalTunnels, tunnel)
168
+ totalTunnels = append(totalTunnels, tunnel.TunnelAddress)
156
169
os.Setenv("LLAMACPP_GRPC_SERVERS", strings.Join(totalTunnels, ","))
157
170
zlog.Debug().Msgf("setting LLAMACPP_GRPC_SERVERS to %s", strings.Join(totalTunnels, ","))
171
+ mu.Lock()
172
+ defer mu.Unlock()
173
+ availableNodes = append(availableNodes, tunnel)
174
+ zlog.Info().Msgf("Node %s available", tunnel.ID)
158
175
}
159
176
}
160
177
}()
161
178
162
179
return nil
163
180
}
164
181
165
- func discoveryTunnels(ctx context.Context, token string) (chan string , error) {
166
- tunnels := make(chan string )
182
+ func discoveryTunnels(ctx context.Context, token string) (chan NodeData , error) {
183
+ tunnels := make(chan NodeData )
167
184
168
185
nodeOpts, err := newNodeOpts(token)
169
186
if err != nil {
@@ -196,18 +213,24 @@ func discoveryTunnels(ctx context.Context, token string) (chan string, error) {
196
213
zlog.Debug().Msg("Searching for workers")
197
214
198
215
data := ledger.LastBlock().Storage["services_localai"]
199
- for k := range data {
216
+ for k, v := range data {
200
217
zlog.Info().Msgf("Found worker %s", k)
201
218
if _, found := emitted[k]; !found {
202
219
emitted[k] = true
220
+ nd := &NodeData{}
221
+ if err := v.Unmarshal(nd); err != nil {
222
+ zlog.Error().Msg("cannot unmarshal node data")
223
+ continue
224
+ }
203
225
//discoveredPeers <- k
204
226
port, err := freeport.GetFreePort()
205
227
if err != nil {
206
228
fmt.Print(err)
207
229
}
208
230
tunnelAddress := fmt.Sprintf("127.0.0.1:%d", port)
209
231
go allocateLocalService(ctx, n, tunnelAddress, k)
210
- tunnels <- tunnelAddress
232
+ nd.TunnelAddress = tunnelAddress
233
+ tunnels <- *nd
211
234
}
212
235
}
213
236
}
@@ -217,6 +240,12 @@ func discoveryTunnels(ctx context.Context, token string) (chan string, error) {
217
240
return tunnels, err
218
241
}
219
242
243
+ type NodeData struct {
244
+ Name string
245
+ ID string
246
+ TunnelAddress string
247
+ }
248
+
220
249
// This is the P2P worker main
221
250
func BindLLamaCPPWorker(ctx context.Context, host, port, token string) error {
222
251
llger := logger.New(log.LevelFatal)
@@ -255,7 +284,10 @@ func BindLLamaCPPWorker(ctx context.Context, host, port, token string) error {
255
284
// If mismatch, update the blockchain
256
285
if !found {
257
286
updatedMap := map[string]interface{}{}
258
- updatedMap[name] = "p2p"
287
+ updatedMap[name] = &NodeData{
288
+ Name: name,
289
+ ID: nodeID(),
290
+ }
259
291
ledger.Add("services_localai", updatedMap)
260
292
}
261
293
},
0 commit comments