@@ -11,18 +11,18 @@ import (
11
11
"net"
12
12
"os"
13
13
"strings"
14
+ "sync"
14
15
"time"
15
16
17
+ "github.com/ipfs/go-log"
16
18
"github.com/libp2p/go-libp2p/core/peer"
17
19
"github.com/mudler/LocalAI/pkg/utils"
20
+ "github.com/mudler/edgevpn/pkg/config"
18
21
"github.com/mudler/edgevpn/pkg/node"
19
22
"github.com/mudler/edgevpn/pkg/protocol"
23
+ "github.com/mudler/edgevpn/pkg/services"
20
24
"github.com/mudler/edgevpn/pkg/types"
21
25
"github.com/phayes/freeport"
22
-
23
- "github.com/ipfs/go-log"
24
- "github.com/mudler/edgevpn/pkg/config"
25
- "github.com/mudler/edgevpn/pkg/services"
26
26
zlog "github.com/rs/zerolog/log"
27
27
28
28
"github.com/mudler/edgevpn/pkg/logger"
@@ -34,6 +34,15 @@ func GenerateToken() string {
34
34
return newData .Base64 ()
35
35
}
36
36
37
+ func IsP2PEnabled () bool {
38
+ return true
39
+ }
40
+
41
+ func nodeID () string {
42
+ hostname , _ := os .Hostname ()
43
+ return hostname
44
+ }
45
+
37
46
func allocateLocalService (ctx context.Context , node * node.Node , listenAddr , service string ) error {
38
47
39
48
zlog .Info ().Msgf ("Allocating service '%s' on: %s" , service , listenAddr )
@@ -135,14 +144,27 @@ func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) {
135
144
io .Copy (dst , src )
136
145
}
137
146
147
+ var availableNodes = []NodeData {}
148
+ var mu sync.Mutex
149
+
150
+ func GetAvailableNodes () []NodeData {
151
+ mu .Lock ()
152
+ defer mu .Unlock ()
153
+ return availableNodes
154
+ }
155
+
138
156
// This is the main of the server (which keeps the env variable updated)
139
157
// This starts a goroutine that keeps LLAMACPP_GRPC_SERVERS updated with the discovered services
140
158
func LLamaCPPRPCServerDiscoverer (ctx context.Context , token string ) error {
141
159
tunnels , err := discoveryTunnels (ctx , token )
142
160
if err != nil {
143
161
return err
144
162
}
145
-
163
+ // TODO: discoveryTunnels should return all the nodes that are available?
164
+ // In this way we updated availableNodes here instead of appending
165
+ // e.g. we have a LastSeen field in NodeData that is updated in discoveryTunnels
166
+ // each time the node is seen
167
+ // In this case the below function should be idempotent and just keep track of the nodes
146
168
go func () {
147
169
totalTunnels := []string {}
148
170
for {
@@ -151,19 +173,22 @@ func LLamaCPPRPCServerDiscoverer(ctx context.Context, token string) error {
151
173
zlog .Error ().Msg ("Discoverer stopped" )
152
174
return
153
175
case tunnel := <- tunnels :
154
-
155
- totalTunnels = append (totalTunnels , tunnel )
176
+ totalTunnels = append (totalTunnels , tunnel .TunnelAddress )
156
177
os .Setenv ("LLAMACPP_GRPC_SERVERS" , strings .Join (totalTunnels , "," ))
157
178
zlog .Debug ().Msgf ("setting LLAMACPP_GRPC_SERVERS to %s" , strings .Join (totalTunnels , "," ))
179
+ mu .Lock ()
180
+ defer mu .Unlock ()
181
+ availableNodes = append (availableNodes , tunnel )
182
+ zlog .Info ().Msgf ("Node %s available" , tunnel .ID )
158
183
}
159
184
}
160
185
}()
161
186
162
187
return nil
163
188
}
164
189
165
- func discoveryTunnels (ctx context.Context , token string ) (chan string , error ) {
166
- tunnels := make (chan string )
190
+ func discoveryTunnels (ctx context.Context , token string ) (chan NodeData , error ) {
191
+ tunnels := make (chan NodeData )
167
192
168
193
nodeOpts , err := newNodeOpts (token )
169
194
if err != nil {
@@ -196,18 +221,24 @@ func discoveryTunnels(ctx context.Context, token string) (chan string, error) {
196
221
zlog .Debug ().Msg ("Searching for workers" )
197
222
198
223
data := ledger .LastBlock ().Storage ["services_localai" ]
199
- for k := range data {
224
+ for k , v := range data {
200
225
zlog .Info ().Msgf ("Found worker %s" , k )
201
226
if _ , found := emitted [k ]; ! found {
202
227
emitted [k ] = true
228
+ nd := & NodeData {}
229
+ if err := v .Unmarshal (nd ); err != nil {
230
+ zlog .Error ().Msg ("cannot unmarshal node data" )
231
+ continue
232
+ }
203
233
//discoveredPeers <- k
204
234
port , err := freeport .GetFreePort ()
205
235
if err != nil {
206
236
fmt .Print (err )
207
237
}
208
238
tunnelAddress := fmt .Sprintf ("127.0.0.1:%d" , port )
209
239
go allocateLocalService (ctx , n , tunnelAddress , k )
210
- tunnels <- tunnelAddress
240
+ nd .TunnelAddress = tunnelAddress
241
+ tunnels <- * nd
211
242
}
212
243
}
213
244
}
@@ -255,7 +286,10 @@ func BindLLamaCPPWorker(ctx context.Context, host, port, token string) error {
255
286
// If mismatch, update the blockchain
256
287
if ! found {
257
288
updatedMap := map [string ]interface {}{}
258
- updatedMap [name ] = "p2p"
289
+ updatedMap [name ] = & NodeData {
290
+ Name : name ,
291
+ ID : nodeID (),
292
+ }
259
293
ledger .Add ("services_localai" , updatedMap )
260
294
}
261
295
},
0 commit comments