Skip to content

Commit

Permalink
feat(httpit): wip
Browse files Browse the repository at this point in the history
  • Loading branch information
kiyonlin committed Feb 27, 2021
1 parent 5064b10 commit f48a3a6
Show file tree
Hide file tree
Showing 8 changed files with 460 additions and 16 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ require (
github.com/spf13/cobra v1.1.3
github.com/stretchr/testify v1.3.0
github.com/valyala/fasthttp v1.21.0
golang.org/x/net v0.0.0-20201016165138-7b1cca2348c0
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201016165138-7b1cca2348c0 h1:5kGOVHlq0euqwzgTC9Vu15p6fV1Wi0ArVi8da2urnVg=
golang.org/x/net v0.0.0-20201016165138-7b1cca2348c0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand All @@ -243,6 +244,7 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
248 changes: 248 additions & 0 deletions internal/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
package internal

import (
"bytes"
"crypto/tls"
"fmt"
"io/ioutil"
"net"
"net/http"
"strconv"
"strings"
"time"

"golang.org/x/net/http2"

"github.com/valyala/fasthttp"
)

type client interface {
do(int) (int, time.Duration, error)
}

type clientDoer interface {
Do(req *fasthttp.Request, resp *fasthttp.Response) error
}

type clientConfig struct {
method string
url string
headers Headers
host string
stream bool
body []byte
http2 bool
maxConns int
timeout time.Duration
tlsConfig *tls.Config
disableKeepAlives bool
throughput *int64
}

type fasthttpClient struct {
doer clientDoer
reqs []*fasthttp.Request
resps []*fasthttp.Response
readers []*bytes.Reader
body []byte
}

func newFasthttpClient(cc clientConfig) (client, error) {
c := &fasthttpClient{
reqs: make([]*fasthttp.Request, cc.maxConns, cc.maxConns),
resps: make([]*fasthttp.Response, cc.maxConns, cc.maxConns),
readers: make([]*bytes.Reader, cc.maxConns, cc.maxConns),
body: cc.body,
}

isTLS, addr, err := getIsTLSAndAddr(cc.url)
if err != nil {
return nil, err
}

for i := 0; i < cc.maxConns; i++ {
req := fasthttp.AcquireRequest()
req.Header.SetMethod(cc.method)
req.SetRequestURI(cc.url)
cc.headers.WriteToFasthttp(req)
if cc.stream {
c.readers[i] = bytes.NewReader(nil)
} else {
// set constant body
req.SetBody(cc.body)
}
if cc.disableKeepAlives {
req.Header.ConnectionClose()
}
if cc.host != "" {
req.URI().SetHost(cc.host)
}
c.reqs[i] = req
c.resps[i] = fasthttp.AcquireResponse()
}

if cc.disableKeepAlives {
c.doer = &fasthttp.HostClient{
Addr: addr,
NoDefaultUserAgentHeader: false,
Dial: fasthttpDialFunc(cc.throughput, cc.timeout),
DialDualStack: false,
IsTLS: isTLS,
TLSConfig: cc.tlsConfig,
MaxConns: cc.maxConns,
ReadTimeout: cc.timeout,
DisableHeaderNamesNormalizing: true,
DisablePathNormalizing: true,
}
} else {
c.doer = &fasthttp.PipelineClient{
Addr: addr,
MaxConns: cc.maxConns,
Dial: fasthttpDialFunc(cc.throughput, cc.timeout),
IsTLS: isTLS,
TLSConfig: cc.tlsConfig,
ReadTimeout: cc.timeout,
}
}

return c, nil
}

func (c *fasthttpClient) do(i int) (code int, latency time.Duration, err error) {
var (
req = c.reqs[i]
resp = c.resps[i]
reader = c.readers[i]
)

if reader != nil {
reader.Reset(c.body)
req.SetBodyStream(reader, -1)
}

start := time.Now()
if err = c.doer.Do(req, resp); err != nil {
return
}

code = resp.StatusCode()
latency = time.Since(start)

return
}

var (
strHTTP = []byte("http")
strHTTPS = []byte("https")
)

func getIsTLSAndAddr(url string) (isTLS bool, addr string, err error) {
req := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(req)
req.SetRequestURI(url)
uri := req.URI()
host := uri.Host()

scheme := uri.Scheme()
if bytes.Equal(scheme, strHTTPS) {
isTLS = true
} else if !bytes.Equal(scheme, strHTTP) {
err = fmt.Errorf("unsupported protocol %q. http and https are supported", scheme)
return
}

addr = addMissingPort(string(host), isTLS)

return
}

func addMissingPort(addr string, isTLS bool) string {
n := strings.Index(addr, ":")
if n >= 0 {
return addr
}
port := 80
if isTLS {
port = 443
}
return net.JoinHostPort(addr, strconv.Itoa(port))
}

type httpClient struct {
client *http.Client
reqs []*http.Request
readers []*bytes.Reader
body []byte
}

func newHttpClient(cc clientConfig) (client, error) {
c := &httpClient{
reqs: make([]*http.Request, 0, cc.maxConns),
readers: make([]*bytes.Reader, 0, cc.maxConns),
}

var err error

for i := 0; i < cc.maxConns; i++ {
if c.reqs[i], err = http.NewRequest(cc.method, cc.url, nil); err != nil {
return nil, fmt.Errorf("failed to new request: %w", err)
}
req := c.reqs[i]

cc.headers.WriteToHttp(req)

if cc.host != "" {
req.Host = cc.host
}
}

transport := &http.Transport{
TLSClientConfig: cc.tlsConfig,
MaxIdleConnsPerHost: cc.maxConns,
DisableKeepAlives: cc.disableKeepAlives,
}
transport.DialContext = httpDialContextFunc(cc.throughput)
if cc.http2 {
if err = http2.ConfigureTransport(transport); err != nil {
return nil, fmt.Errorf("failed to setup http2: %w", err)
}
} else {
transport.TLSNextProto = make(
map[string]func(authority string, c *tls.Conn) http.RoundTripper,
)
}

c.client = &http.Client{
Transport: transport,
Timeout: cc.timeout,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}

return c, nil
}

func (c *httpClient) do(i int) (code int, latency time.Duration, err error) {
req := c.reqs[i]

reader := c.readers[i]
reader.Reset(c.body)
req.Body = ioutil.NopCloser(reader)

var resp *http.Response
start := time.Now()

if resp, err = c.client.Do(req); err != nil {
return
}

if err = resp.Body.Close(); err != nil {
return
}

code = resp.StatusCode
latency = time.Since(start)

return
}
23 changes: 12 additions & 11 deletions internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ type Config struct {
Duration time.Duration
Timeout time.Duration

Method string
Headers Headers
Body string
File string
Cert string
Key string
Stream bool
DisableKeepAlive bool
Insecure bool
Http1 bool
Http2 bool
Method string
Headers Headers
Host string
Body string
File string
Cert string
Key string
Stream bool
DisableKeepAlives bool
Insecure bool
Http1 bool
Http2 bool
}
70 changes: 70 additions & 0 deletions internal/dialer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package internal

import (
"context"
"net"
"sync/atomic"
"time"

"github.com/valyala/fasthttp"
)

// counterConn counts read bytes and written bytes
type counterConn struct {
net.Conn
n *int64
}

func (cc *counterConn) Read(b []byte) (n int, err error) {
n, err = cc.Conn.Read(b)

if err == nil {
atomic.AddInt64(cc.n, int64(n))
}

return
}

func (cc *counterConn) Write(b []byte) (n int, err error) {
n, err = cc.Conn.Write(b)

if err == nil {
atomic.AddInt64(cc.n, int64(n))
}

return
}

var fasthttpDialFunc = func(throughput *int64, timeout time.Duration) func(string) (net.Conn, error) {
dialer := &fasthttp.TCPDialer{}
return func(address string) (net.Conn, error) {
conn, err := dialer.DialDualStackTimeout(address, timeout)
if err != nil {
return nil, err
}

cc := &counterConn{
Conn: conn,
n: throughput,
}

return cc, nil
}
}

var httpDialContextFunc = func(throughput *int64) func(context.Context, string, string) (net.Conn, error) {
dialer := &net.Dialer{}
return func(ctx context.Context, network, address string) (net.Conn, error) {
conn, err := dialer.DialContext(ctx, network, address)
if err != nil {
return nil, err
}

cc := &counterConn{
Conn: conn,
n: throughput,
}

return cc, nil
}
}
18 changes: 18 additions & 0 deletions internal/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package internal

import (
"sync"
)

type errorMap sync.Map

func (e errorMap) add(err error) {
}

func (e errorMap) get(err error) uint64 {
return 0
}

func (e errorMap) sum() uint64 {
return 0
}
Loading

0 comments on commit f48a3a6

Please sign in to comment.