Skip to content

Commit

Permalink
add queue storage for requests
Browse files Browse the repository at this point in the history
  • Loading branch information
reddec committed Jul 15, 2020
1 parent 2789c49 commit 98e4e43
Show file tree
Hide file tree
Showing 10 changed files with 322 additions and 3 deletions.
3 changes: 2 additions & 1 deletion application/lambda/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (local *localLambda) SetCredentials(creds *types.Credential) error {
func (local *localLambda) Invoke(ctx context.Context, request types.Request, response io.Writer, globalEnv map[string]string) error {
local.lock.RLock()
defer local.lock.RUnlock()
defer request.Body.Close()

if !local.passSecurityCheck(&request) {
return fmt.Errorf("security checks failed")
Expand All @@ -85,7 +86,7 @@ func (local *localLambda) Invoke(ctx context.Context, request types.Request, res
ctx = cctx
}

var input = request.Body
var input io.Reader = request.Body

if local.manifest.MaximumPayload > 0 {
input = io.LimitReader(input, local.manifest.MaximumPayload)
Expand Down
2 changes: 1 addition & 1 deletion application/lambda/lambda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestLocalLambda_Invoke(t *testing.T) {

var out bytes.Buffer
err = fn.Invoke(timeout, types.Request{
Body: bytes.NewBufferString("hello world"),
Body: ioutil.NopCloser(bytes.NewBufferString("hello world")),
}, &out, nil)
if !assert.NoError(t, err) {
return
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/google/uuid v1.1.1
github.com/jessevdk/go-flags v1.4.1-0.20180331124232-1c38ed7ad0cc
github.com/philhofer/fwd v1.0.0 // indirect
github.com/reddec/dfq v0.0.0-20200715120610-5e45d6cc3525
github.com/reddec/jsonrpc2 v0.1.18
github.com/robfig/cron v1.2.0
github.com/stretchr/testify v1.5.1
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ github.com/philhofer/fwd v1.0.0 h1:UbZqGr5Y38ApvM/V/jEljVxwocdweyH+vmYvRPBnbqQ=
github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/reddec/dfq v0.0.0-20200715115356-58b64d4e9509 h1:9MoGF3RI9tA78jswJbHMR3TaFjjPIzTHuIz/iP0lTYs=
github.com/reddec/dfq v0.0.0-20200715115356-58b64d4e9509/go.mod h1:bm9WMLIEIvDiGGBnP4I68xEToCNrjwW9h/57coRF85w=
github.com/reddec/dfq v0.0.0-20200715120610-5e45d6cc3525 h1:jg6PJi3G9DxsR8qQ6ISopc0uvL4zLy3sq97SUwMkDO0=
github.com/reddec/dfq v0.0.0-20200715120610-5e45d6cc3525/go.mod h1:bm9WMLIEIvDiGGBnP4I68xEToCNrjwW9h/57coRF85w=
github.com/reddec/godetector v0.0.0-20200420065712-f938e1104afe/go.mod h1:CzQ4Kf0yOsagWbBdC+5pRPJxMnL1uO3/7DimjqEr6Q8=
github.com/reddec/jsonrpc2 v0.1.18 h1:yeVtTlRcbsyAWcXE70E51NKgcFg6T/NqOSNmsEQotws=
github.com/reddec/jsonrpc2 v0.1.18/go.mod h1:heiBKpIJpxXGrQ3W9YKahxgfD6yCsnBsqBedjSOSTQI=
Expand Down
1 change: 1 addition & 0 deletions queue/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/test
69 changes: 69 additions & 0 deletions queue/indir/impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package indir

import (
"context"
"github.com/reddec/dfq"
"github.com/reddec/trusted-cgi/types"
"github.com/tinylib/msgp/msgp"
"io"
)

func New(directory string) (*inDirQueue, error) {
back, err := dfq.Open(directory)
if err != nil {
return nil, err
}
return &inDirQueue{backend: back}, nil
}

type inDirQueue struct {
backend dfq.Queue
}

func (queue *inDirQueue) Put(ctx context.Context, request *types.Request) error {
defer request.Body.Close()
return queue.backend.Stream(func(out io.Writer) error {
w := msgp.NewWriter(out)
err := request.EncodeMsg(w)
if err != nil {
return err
}
_, err = io.Copy(w, request.Body)
if err != nil {
return err
}
return w.Flush()
})
}

func (queue *inDirQueue) Peek(ctx context.Context) (*types.Request, error) {
in, err := queue.backend.Wait(ctx)
if err != nil {
return nil, err
}
reader := msgp.NewReader(in)
var head types.Request
err = head.DecodeMsg(reader)
if err != nil {
_ = in.Close()
return nil, err
}
return head.WithBody(&readCloser{reader: reader.R, closer: in}), nil
}

func (queue *inDirQueue) Commit(ctx context.Context) error {
return queue.backend.Commit()
}

type readCloser struct {
reader io.Reader
closer io.Closer
}

func (rc *readCloser) Read(p []byte) (n int, err error) {
return rc.reader.Read(p)
}

func (rc *readCloser) Close() error {
return rc.closer.Close()
}
109 changes: 109 additions & 0 deletions queue/inmemory/impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package inmemory

import (
"bytes"
"context"
"fmt"
"github.com/reddec/trusted-cgi/types"
"io/ioutil"
"sync"
"sync/atomic"
)

type item struct {
payload types.Request
data []byte
}

func (it *item) makeRequest() *types.Request {
cp := it.payload // shallow copy
cp.Body = ioutil.NopCloser(bytes.NewReader(it.data))
return &cp
}

// Dummy implementation of in-memory queue with pre-allocated channel for buffering.
// It's safe to call Close several times
func New(size int) *memoryQueue {
return &memoryQueue{
closed: make(chan struct{}),
stream: make(chan item, size),
}
}

type memoryQueue struct {
closed chan struct{}
stream chan item
peeked struct {
value item
available bool
}
rlock sync.Mutex
closing int32
}

func (queue *memoryQueue) Put(ctx context.Context, request *types.Request) error {
defer request.Body.Close()
select {
case <-queue.closed:
return fmt.Errorf("put: queue is closed")
default:
}
data, err := ioutil.ReadAll(request.Body)
if err != nil {
return fmt.Errorf("put: read body: %w", err)
}
select {
case <-queue.closed:
return fmt.Errorf("put: queue is closed")
case <-ctx.Done():
return fmt.Errorf("put: context closed: %w", ctx.Err())
case queue.stream <- item{
payload: *request,
data: data,
}:
return nil
}
}

func (queue *memoryQueue) Peek(ctx context.Context) (*types.Request, error) {
select {
case <-queue.closed:
return nil, fmt.Errorf("peek: queue is closed")
default:
}
queue.rlock.Lock()
defer queue.rlock.Unlock()
if !queue.peeked.available {
select {
case <-queue.closed:
return nil, fmt.Errorf("peek: queue is closed")
case <-ctx.Done():
return nil, fmt.Errorf("peek: context closed: %w", ctx.Err())
case item := <-queue.stream:
queue.peeked.value = item
queue.peeked.available = true
}
}
return queue.peeked.value.makeRequest(), nil
}

func (queue *memoryQueue) Commit(ctx context.Context) error {
select {
case <-queue.closed:
return fmt.Errorf("commit: queue is closed")
default:
}
queue.rlock.Lock()
defer queue.rlock.Unlock()
queue.peeked.available = false
return nil
}

func (queue *memoryQueue) Done() <-chan struct{} { return queue.closed }

func (queue *memoryQueue) Close() {
if atomic.CompareAndSwapInt32(&queue.closing, 0, 1) {
close(queue.closed)
close(queue.stream)
}
}
17 changes: 17 additions & 0 deletions queue/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package queue

import (
"context"
"github.com/reddec/trusted-cgi/types"
)

// Thread-safe FIFO queue designed for one multiple concurrent writers and single consumer.
// Queue should store somewhere request body.
type Queue interface {
// Put request to queue
Put(ctx context.Context, request *types.Request) error
// Peek oldest request or wait till new data arrived/context expiration
Peek(ctx context.Context) (*types.Request, error)
// Commit (remove) oldest record
Commit(ctx context.Context) error
}
107 changes: 107 additions & 0 deletions queue/interfaces_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package queue_test

import (
"bytes"
"context"
"github.com/google/uuid"
"github.com/reddec/trusted-cgi/queue"
"github.com/reddec/trusted-cgi/queue/indir"
"github.com/reddec/trusted-cgi/queue/inmemory"
"github.com/reddec/trusted-cgi/types"
"github.com/stretchr/testify/assert"
"io/ioutil"
"testing"
)

func testPutPeek(ctx context.Context, t *testing.T, queue queue.Queue) *types.Request {
payload := uuid.New().String()

req := &types.Request{
Method: "POST",
URL: "http://example.com:8889/sample/" + payload,
Path: "/sample/" + payload,
RemoteAddress: "127.0.0.2:9992",
Form: map[string]string{
"USER": "user1",
},
Headers: map[string]string{
"Content-Type": "text/plain",
},
Body: ioutil.NopCloser(bytes.NewBufferString(payload)),
}
err := queue.Put(ctx, req)
if !assert.NoError(t, err) {
return req
}

v, err := queue.Peek(ctx)
if !assert.NoError(t, err) {
return req
}
data, err := ioutil.ReadAll(v.Body)
if !assert.NoError(t, err) {
return req
}
assert.Equal(t, string(data), payload)
assert.Equal(t, v.WithBody(nil), req.WithBody(nil))
return req
}

func TestInMemory(t *testing.T) {
ctx := context.Background()
q := inmemory.New(1)
defer q.Close()

req := testPutPeek(ctx, t, q)
// try again - should be same result
v2, err := q.Peek(ctx)
if !assert.NoError(t, err) {
return
}
assert.Equal(t, v2.WithBody(nil), req.WithBody(nil))

err = q.Commit(ctx)
if !assert.NoError(t, err) {
return
}
// put again
testPutPeek(ctx, t, q)

q.Close()

_, err = q.Peek(ctx)
assert.Error(t, err)

var closed bool
select {
case <-q.Done():
closed = true
default:

}
assert.True(t, closed, "should be closed")

}

func TestInDir(t *testing.T) {
ctx := context.Background()
q, err := indir.New("test/queue")
if !assert.NoError(t, err) {
return
}

req := testPutPeek(ctx, t, q)
// try again - should be same result
v2, err := q.Peek(ctx)
if !assert.NoError(t, err) {
return
}
assert.Equal(t, v2.WithBody(nil), req.WithBody(nil))

err = q.Commit(ctx)
if !assert.NoError(t, err) {
return
}
// put again
testPutPeek(ctx, t, q)
}
12 changes: 11 additions & 1 deletion types/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Request struct {
RemoteAddress string `json:"remote_address" msg:"remote_address"`
Form map[string]string `json:"form" msg:"form"`
Headers map[string]string `json:"headers" msg:"headers"`
Body io.Reader `json:"-" msg:"-"`
Body io.ReadCloser `json:"-" msg:"-"`
}

// Create request from HTTP request
Expand All @@ -37,3 +37,13 @@ func FromHTTP(r *http.Request) *Request {
Body: r.Body,
}
}

// Returns shallow copy of request with new body
func (z *Request) WithBody(reader io.ReadCloser) *Request {
if z == nil {
return nil
}
cp := *z
cp.Body = reader
return &cp
}

0 comments on commit 98e4e43

Please sign in to comment.