Skip to content

Commit

Permalink
Dapr runtime support
Browse files Browse the repository at this point in the history
- installs Dapr cli in CI
- installs Dapr runtime on allocation of test cluster
- annotates services to enable dapr sidecar integration
- installs redis via helm, enabling state store, pub/sub and distributed
  lock
- integration test added for local invocation
- integration test added for service-to-service invocation via the
  sidecar

Note that Dapr runs metrics on port 9002 so as not to collide with
Knative metrics.
  • Loading branch information
lkingland committed Jan 30, 2023
1 parent a9b4542 commit 92f9cce
Show file tree
Hide file tree
Showing 5 changed files with 307 additions and 42 deletions.
14 changes: 8 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,12 @@ func (c *Client) Runtimes() ([]string, error) {

// Apply (aka upsert)
//
// Invokes all lower-level methods as necessary to create a running function
// whose source code and metadata match that provided by the passed
// function instance.
// The general-purpose high-level method to initiate a synchronization of
// a function's source code and it's deployed instance(s).
//
// Invokes all lower-level methods, including initialization, as necessary to
// create a running function whose source code and metadata match that provided
// by the passed function instance, returning the final route and any errors.
func (c *Client) Apply(ctx context.Context, f Function) (route string, err error) {
if f, err = NewFunction(f.Root); err != nil {
return
Expand All @@ -429,7 +432,7 @@ func (c *Client) Apply(ctx context.Context, f Function) (route string, err error
// source code.
//
// Use Init, Build, Push and Deploy independently for lower level control.
// Returns the primary route to the function or any errors.
// Returns final primary route to the Function and any errors.
func (c *Client) Update(ctx context.Context, root string) (route string, err error) {
f, err := NewFunction(root)
if err != nil {
Expand Down Expand Up @@ -705,8 +708,7 @@ func (c *Client) printBuildActivity(ctx context.Context) {
}()
}

// Deploy the function at path. Errors if the function has not been
// initialized with an image tag.
// Deploy the function at path. Errors if the function has not been built.
func (c *Client) Deploy(ctx context.Context, path string) (err error) {
go func() {
<-ctx.Done()
Expand Down
208 changes: 181 additions & 27 deletions client_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ package function_test

import (
"context"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"reflect"
"testing"
"time"
Expand All @@ -18,32 +22,26 @@ import (
"knative.dev/pkg/ptr"
)

/*
NOTE: Running integration tests locally requires a configured test cluster.
Test failures may require manual removal of dangling resources.
## Integration Cluster
These integration tests require a properly configured cluster,
such as that which is setup and configured in CI (see .github/workflows).
A local KinD cluster can be started via:
./hack/allocate.sh && ./hack/configure.sh
## Integration Testing
These tests can be run via the make target:
make test-integration
or manually by specifying the tag
go test -v -tags integration ./...
## Teardown and Cleanup
// # Integration Tsets
//
// go test -tags integration ./...
//
// ## Cluster Required
//
// These integration tests require a properly configured cluster,
// such as that which is setup and configured in CI (see .github/workflows).
// Linux developers can set up the cluster via:
//
// ./hack/binaries.sh && ./hack/allocate.sh && ./hack/registry.sh
//
// ## Cluster Cleanup
//
// The test cluster and most resources can be removed with:

Tests should clean up after themselves. In the event of failures, one may
need to manually remove files:
rm -rf ./testdata/example.com
The test cluster is not automatically removed, as it can be reused. To remove:
./hack/delete.sh
*/
// ./hack/delete.sh
//
// NOTE: Downloaded images are not removed.
//

const (
// DefaultRegistry must contain both the registry host and
Expand Down Expand Up @@ -237,16 +235,171 @@ func TestRemoteRepositories(t *testing.T) {
}
}

// TestInvoke_ClientToService ensures that the client can invoke a remotely
// deployed service, both by the route returned directly as well as using
// the invocation helper client.Invoke.
func TestInvoke_ClientToService(t *testing.T) {
var (
root, done = Mktemp(t)
verbose = true
ctx = context.Background()
client = newClient(verbose)
route string
err error
)
defer done()

// Create a function
f := fn.Function{Name: "f", Runtime: "go"}
if err = client.Init(f); err != nil {
t.Fatal(err)
}
source := `
package function
import (
"context"
"net/http"
)
func Handle(ctx context.Context, res http.ResponseWriter, req *http.Request) {
res.Write([]byte("TestInvoke_ClientToService OK"))
}
`
os.WriteFile(filepath.Join(root, "handle.go"), []byte(source), os.ModePerm)

if route, err = client.Apply(ctx, f); err != nil {
t.Fatal(err)
}
defer client.Remove(ctx, f, true)

// Invoke via the route
resp, err := http.Get(route)
if err != nil {
t.Fatal(err)
}
b, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()

if string(b) != "TestInvoke_ClientToService OK" {
t.Fatalf("unexpected response from HTTP GET: %v", b)
}

// Invoke using the helper
_, body, err := client.Invoke(ctx, root, "", fn.NewInvokeMessage())
if err != nil {
t.Fatal(err)
}

if body != "TestInvoke_ClientToService OK" {
t.Fatalf("unexpected response from client.Invoke: %v", b)
}
}

// TestInvoke_ServiceToService ensures that a Function can invoke another
// service via localhost service discovery api provided by the Dapr sidecar.
func TestInvoke_ServiceToService(t *testing.T) {
var (
verbose = true
ctx = context.Background()
client = newClient(verbose)
err error
source string
route string
)

// Create function A
// A function which responds to GET requests with a static value.
root, done := Mktemp(t)
defer done()
f := fn.Function{Name: "a", Runtime: "go"}
if err := client.Init(f); err != nil {
t.Fatal(err)
}
source = `
package function
import (
"context"
"net/http"
)
func Handle(ctx context.Context, res http.ResponseWriter, req *http.Request) {
res.Write([]byte("TestInvoke_ServiceToService OK"))
}
`
os.WriteFile(filepath.Join(root, "handle.go"), []byte(source), os.ModePerm)
if _, err = client.Apply(ctx, f); err != nil {
t.Fatal(err)
}
defer client.Remove(ctx, f, true)

// Create Function B
// which responds with the response from an invocation of 'a' via the
// localhost service discovery and invocation API.
root, done = Mktemp(t)
defer done()
f = fn.Function{Name: "b", Runtime: "go"}
if err := client.Init(f); err != nil {
t.Fatal(err)
}

source = `
package function
import (
"context"
"net/http"
"fmt"
"io"
)
func Handle(ctx context.Context, res http.ResponseWriter, req *http.Request) {
r, err := http.Get("http://localhost:3500/v1.0/invoke/a/method/")
if err != nil {
fmt.Printf("unable to invoke function a", err)
http.Error(res, "unable to invoke function a", http.StatusServiceUnavailable)
}
defer r.Body.Close()
io.Copy(res,r.Body)
}
`
os.WriteFile(filepath.Join(root, "handle.go"), []byte(source), os.ModePerm)
if route, err = client.Apply(ctx, f); err != nil {
t.Fatal(err)
}
defer client.Remove(ctx, f, true)

resp, err := http.Get(route)
if err != nil {
t.Fatal(err)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
fmt.Printf("### function a response body: %s\n", body)

if string(body) != "TestInvoke_ServiceToService OK" {
t.Fatalf("Unexpected response from Function B: %v", body)
}
}

// ***********
// Helpers
// ***********

// newClient creates an instance of the func client whose concrete impls
// match those created by the kn func plugin CLI.
// newClient creates an instance of the func client with concrete impls
// sufficient for running integration tests.
func newClient(verbose bool) *fn.Client {
builder := buildpacks.NewBuilder(buildpacks.WithVerbose(verbose))
pusher := docker.NewPusher(docker.WithVerbose(verbose))
deployer := knative.NewDeployer(knative.WithDeployerNamespace(DefaultNamespace), knative.WithDeployerVerbose(verbose))
describer := knative.NewDescriber(DefaultNamespace, verbose)
remover := knative.NewRemover(DefaultNamespace, verbose)
lister := knative.NewLister(DefaultNamespace, verbose)

Expand All @@ -256,6 +409,7 @@ func newClient(verbose bool) *fn.Client {
fn.WithBuilder(builder),
fn.WithPusher(pusher),
fn.WithDeployer(deployer),
fn.WithDescriber(describer),
fn.WithRemover(remover),
fn.WithLister(lister),
)
Expand Down
71 changes: 68 additions & 3 deletions hack/allocate.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ main() {
eventing
networking
registry
configure
namespace
dapr_runtime
next_steps

echo "${em}DONE${me}"
Expand Down Expand Up @@ -195,7 +196,7 @@ data:
EOF
}

configure() {
namespace() {
echo "${em}⑦ Configure Namespace${me}"

# Create Namespace
Expand Down Expand Up @@ -243,10 +244,74 @@ EOF

}

dapr_runtime() {
echo "${em}⑦ Dapr${me}"

# Install Dapr Runtime
dapr init --kubernetes --wait

# Enalble Redis Persistence and Pub/Sub
#
# 1) Redis
# Creates a Redis leader with three replicas
# TODO: helm and the bitnami charts are likely not necessary. The Bitnami
# charts do tweak quite a few settings, but I am skeptical it is necessary
# in a CI/CD environment, as it does add nontrivial support overhead.
# TODO: If the bitnami redis chart seems worth the effort, munge this command
# to only start a single instance rather than four.
# helm repo add bitnami https://charts.bitnami.com/bitnami
echo "${em}- Redis ${me}"
helm repo add bitnami https://charts.bitnami.com/bitnami
helm install redis bitnami/redis --set image.tag=6.2
helm repo update

# 2) Expose a Redis-backed Dapr State Storage component
echo "${em}- State Storage Component${me}"
kubectl apply -f - << EOF
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
namespace: default
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: redis-master.default.svc.cluster.local:6379
- name: redisPassword
secretKeyRef:
name: redis
key: redis-password
EOF

# 3) Expose A Redis-backed Dapr Pub/Sub Component
echo "${em}- Pub/Sub Component${me}"
kubectl apply -f - << EOF
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
namespace: default
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: redis-master.default.svc.cluster.local:6379
- name: redisPassword
secretKeyRef:
name: redis
key: redis-password
EOF

}


next_steps() {
local red=$(tput bold)$(tput setaf 1)

echo "${em}Configure Registry${me}"
echo "${em}Image Registry${me}"
echo "If not in CI (running ci.sh): "
echo " ${red}set registry as insecure${me} in the docker daemon config (/etc/docker/daemon.json on linux or ~/.docker/daemon.json on OSX):"
echo " { \"insecure-registries\": [ \"localhost:50000\" ] }"
Expand Down
Loading

0 comments on commit 92f9cce

Please sign in to comment.