Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(p2p): Federation and AI swarms #2723

Merged
merged 18 commits into from
Jul 8, 2024
3 changes: 3 additions & 0 deletions .github/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ changelog:
labels:
- bug
- regression
- title: "🖧 P2P area"
labels:
- area/p2p
- title: Exciting New Features 🎉
labels:
- Semver-Minor
Expand Down
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ RANDOM := $(shell bash -c 'echo $$RANDOM')
VERSION?=$(shell git describe --always --tags || echo "dev" )
# go tool nm ./local-ai | grep Commit
LD_FLAGS?=
override LD_FLAGS += -X "github.com/go-skynet/LocalAI/internal.Version=$(VERSION)"
override LD_FLAGS += -X "github.com/go-skynet/LocalAI/internal.Commit=$(shell git rev-parse HEAD)"
override LD_FLAGS += -X "github.com/mudler/LocalAI/internal.Version=$(VERSION)"
override LD_FLAGS += -X "github.com/mudler/LocalAI/internal.Commit=$(shell git rev-parse HEAD)"

OPTIONAL_TARGETS?=

Expand Down Expand Up @@ -147,7 +147,7 @@ endif

# glibc-static or glibc-devel-static required
ifeq ($(STATIC),true)
LD_FLAGS=-linkmode external -extldflags -static
LD_FLAGS+=-linkmode external -extldflags -static
endif

ifeq ($(findstring stablediffusion,$(GO_TAGS)),stablediffusion)
Expand Down
1 change: 1 addition & 0 deletions core/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ var CLI struct {
cliContext.Context `embed:""`

Run RunCMD `cmd:"" help:"Run LocalAI, this the default command if no other command is specified. Run 'local-ai run --help' for more information" default:"withargs"`
Federated FederatedCLI `cmd:"" help:"Run LocalAI in federated mode"`
Models ModelsCMD `cmd:"" help:"Manage LocalAI models and definitions"`
TTS TTSCMD `cmd:"" help:"Convert text to speech"`
Transcript TranscriptCMD `cmd:"" help:"Convert audio to text"`
Expand Down
130 changes: 130 additions & 0 deletions core/cli/federated.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package cli

import (
"context"
"errors"
"fmt"
"io"
"net"
"time"

"math/rand/v2"

cliContext "github.com/mudler/LocalAI/core/cli/context"
"github.com/mudler/LocalAI/core/p2p"
"github.com/mudler/edgevpn/pkg/node"
"github.com/mudler/edgevpn/pkg/protocol"
"github.com/mudler/edgevpn/pkg/types"
"github.com/rs/zerolog/log"
)

type FederatedCLI struct {
Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"`
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
}

func (f *FederatedCLI) Run(ctx *cliContext.Context) error {

n, err := p2p.NewNode(f.Peer2PeerToken)
if err != nil {
return fmt.Errorf("creating a new node: %w", err)
}
err = n.Start(context.Background())
if err != nil {
return fmt.Errorf("creating a new node: %w", err)
}

if err := p2p.ServiceDiscoverer(context.Background(), n, f.Peer2PeerToken, p2p.FederatedID, nil); err != nil {
return err
}

return Proxy(context.Background(), n, f.Address, p2p.FederatedID)
}

func Proxy(ctx context.Context, node *node.Node, listenAddr, service string) error {

log.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr)
// Open local port for listening
l, err := net.Listen("tcp", listenAddr)
if err != nil {
log.Error().Err(err).Msg("Error listening")
return err
}
// ll.Info("Binding local port on", srcaddr)

ledger, _ := node.Ledger()

// Announce ourselves so nodes accepts our connection
ledger.Announce(
ctx,
10*time.Second,
func() {
// Retrieve current ID for ip in the blockchain
//_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String())
// If mismatch, update the blockchain
//if !found {
updatedMap := map[string]interface{}{}
updatedMap[node.Host().ID().String()] = &types.User{
PeerID: node.Host().ID().String(),
Timestamp: time.Now().String(),
}
ledger.Add(protocol.UsersLedgerKey, updatedMap)
// }
},
)

defer l.Close()
for {
select {
case <-ctx.Done():
return errors.New("context canceled")
default:
log.Debug().Msg("New for connection")
// Listen for an incoming connection.
conn, err := l.Accept()
if err != nil {
fmt.Println("Error accepting: ", err.Error())
continue
}

// Handle connections in a new goroutine, forwarding to the p2p service
go func() {
var tunnelAddresses []string
for _, v := range p2p.GetAvailableNodes(p2p.FederatedID) {
if v.IsOnline() {
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
} else {
log.Info().Msgf("Node %s is offline", v.ID)
}
}

// open a TCP stream to one of the tunnels
// chosen randomly
// TODO: optimize this and track usage
tunnelAddr := tunnelAddresses[rand.IntN(len(tunnelAddresses))]
Dismissed Show dismissed Hide dismissed

tunnelConn, err := net.Dial("tcp", tunnelAddr)
if err != nil {
log.Error().Err(err).Msg("Error connecting to tunnel")
return
}

log.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), tunnelConn.RemoteAddr().String())
closer := make(chan struct{}, 2)
go copyStream(closer, tunnelConn, conn)
go copyStream(closer, conn, tunnelConn)
<-closer

tunnelConn.Close()

Check warning

Code scanning / Golang security checks by gosec

Errors unhandled. Warning

Errors unhandled.
conn.Close()

Check warning

Code scanning / Golang security checks by gosec

Errors unhandled. Warning

Errors unhandled.
// ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String())
}()
}
}

}

func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) {
defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy
io.Copy(dst, src)

Check warning

Code scanning / Golang security checks by gosec

Errors unhandled. Warning

Errors unhandled.
}
50 changes: 43 additions & 7 deletions core/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import (
"context"
"fmt"
"net"
"os"
"strings"
"time"

Expand Down Expand Up @@ -50,7 +52,7 @@
DisableWebUI bool `env:"LOCALAI_DISABLE_WEBUI,DISABLE_WEBUI" default:"false" help:"Disable webui" group:"api"`
OpaqueErrors bool `env:"LOCALAI_OPAQUE_ERRORS" default:"false" help:"If true, all error responses are replaced with blank 500 errors. This is intended only for hardening against information leaks and is normally not recommended." group:"api"`
Peer2Peer bool `env:"LOCALAI_P2P,P2P" name:"p2p" default:"false" help:"Enable P2P mode" group:"p2p"`
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
ParallelRequests bool `env:"LOCALAI_PARALLEL_REQUESTS,PARALLEL_REQUESTS" help:"Enable backends to handle multiple requests in parallel if they support it (e.g.: llama.cpp or vllm)" group:"backends"`
SingleActiveBackend bool `env:"LOCALAI_SINGLE_ACTIVE_BACKEND,SINGLE_ACTIVE_BACKEND" help:"Allow only one backend to be run at a time" group:"backends"`
PreloadBackendOnly bool `env:"LOCALAI_PRELOAD_BACKEND_ONLY,PRELOAD_BACKEND_ONLY" default:"false" help:"Do not launch the API services, only the preloaded models / backends are started (useful for multi-node setups)" group:"backends"`
Expand All @@ -59,6 +61,7 @@
WatchdogIdleTimeout string `env:"LOCALAI_WATCHDOG_IDLE_TIMEOUT,WATCHDOG_IDLE_TIMEOUT" default:"15m" help:"Threshold beyond which an idle backend should be stopped" group:"backends"`
EnableWatchdogBusy bool `env:"LOCALAI_WATCHDOG_BUSY,WATCHDOG_BUSY" default:"false" help:"Enable watchdog for stopping backends that are busy longer than the watchdog-busy-timeout" group:"backends"`
WatchdogBusyTimeout string `env:"LOCALAI_WATCHDOG_BUSY_TIMEOUT,WATCHDOG_BUSY_TIMEOUT" default:"5m" help:"Threshold beyond which a busy backend should be stopped" group:"backends"`
Federated bool `env:"LOCALAI_FEDERATED,FEDERATED" help:"Enable federated instance" group:"federated"`
}

func (r *RunCMD) Run(ctx *cliContext.Context) error {
Expand Down Expand Up @@ -91,9 +94,10 @@
config.WithOpaqueErrors(r.OpaqueErrors),
}

token := ""
if r.Peer2Peer || r.Peer2PeerToken != "" {
log.Info().Msg("P2P mode enabled")
token := r.Peer2PeerToken
token = r.Peer2PeerToken
if token == "" {
// IF no token is provided, and p2p is enabled,
// we generate one and wait for the user to pick up the token (this is for interactive)
Expand All @@ -104,14 +108,46 @@

log.Info().Msg("To use the token, you can run the following command in another node or terminal:")
fmt.Printf("export TOKEN=\"%s\"\nlocal-ai worker p2p-llama-cpp-rpc\n", token)
}
opts = append(opts, config.WithP2PToken(token))

// Ask for user confirmation
log.Info().Msg("Press a button to proceed")
var input string
fmt.Scanln(&input)
node, err := p2p.NewNode(token)
if err != nil {
return err
}

log.Info().Msg("Starting P2P server discovery...")
if err := p2p.LLamaCPPRPCServerDiscoverer(context.Background(), token); err != nil {
if err := p2p.ServiceDiscoverer(context.Background(), node, token, "", func() {
var tunnelAddresses []string
for _, v := range p2p.GetAvailableNodes("") {
if v.IsOnline() {
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
} else {
log.Info().Msgf("Node %s is offline", v.ID)
}
}
tunnelEnvVar := strings.Join(tunnelAddresses, ",")

os.Setenv("LLAMACPP_GRPC_SERVERS", tunnelEnvVar)

Check warning

Code scanning / Golang security checks by gosec

Errors unhandled. Warning

Errors unhandled.
log.Debug().Msgf("setting LLAMACPP_GRPC_SERVERS to %s", tunnelEnvVar)
}); err != nil {
return err
}
}

if r.Federated {
_, port, err := net.SplitHostPort(r.Address)
if err != nil {
return err
}
if err := p2p.ExposeService(context.Background(), "localhost", port, token, p2p.FederatedID); err != nil {
return err
}
node, err := p2p.NewNode(token)
if err != nil {
return err
}
if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.FederatedID, nil); err != nil {
return err
}
}
Expand Down
6 changes: 3 additions & 3 deletions core/cli/worker/worker_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

type P2P struct {
WorkerFlags `embed:""`
Token string `env:"LOCALAI_TOKEN,TOKEN" help:"JSON list of galleries"`
Token string `env:"LOCALAI_TOKEN,LOCALAI_P2P_TOKEN,TOKEN" help:"P2P token to use"`
NoRunner bool `env:"LOCALAI_NO_RUNNER,NO_RUNNER" help:"Do not start the llama-cpp-rpc-server"`
RunnerAddress string `env:"LOCALAI_RUNNER_ADDRESS,RUNNER_ADDRESS" help:"Address of the llama-cpp-rpc-server"`
RunnerPort string `env:"LOCALAI_RUNNER_PORT,RUNNER_PORT" help:"Port of the llama-cpp-rpc-server"`
Expand Down Expand Up @@ -59,7 +59,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
p = r.RunnerPort
}

err = p2p.BindLLamaCPPWorker(context.Background(), address, p, r.Token)
err = p2p.ExposeService(context.Background(), address, p, r.Token, "")
if err != nil {
return err
}
Expand Down Expand Up @@ -99,7 +99,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
}
}()

err = p2p.BindLLamaCPPWorker(context.Background(), address, fmt.Sprint(port), r.Token)
err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, "")
if err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions core/config/application_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type ApplicationConfig struct {
CORSAllowOrigins string
ApiKeys []string
OpaqueErrors bool
P2PToken string

ModelLibraryURL string

Expand Down Expand Up @@ -95,6 +96,12 @@ func WithCsrf(b bool) AppOption {
}
}

func WithP2PToken(s string) AppOption {
return func(o *ApplicationConfig) {
o.P2PToken = s
}
}

func WithModelLibraryURL(url string) AppOption {
return func(o *ApplicationConfig) {
o.ModelLibraryURL = url
Expand Down
Loading