Skip to content

Commit

Permalink
some refacto to isolate cause of delay
Browse files Browse the repository at this point in the history
the source of the delay is related to the number of time.Sleep effectued
  • Loading branch information
eze-kiel committed Mar 23, 2021
1 parent cc004c4 commit 629d908
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 25 deletions.
107 changes: 91 additions & 16 deletions cmd/slowql-replayer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/devops-works/slowql/query"
"github.com/olekukonko/tablewriter"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh/terminal"
"golang.org/x/term"

_ "github.com/go-sql-driver/mysql"
)
Expand All @@ -35,14 +35,14 @@ type options struct {
pprof string
workers int
usePass bool
dryRun bool
noDryRun bool
}

type database struct {
kind slowql.Kind
datasource string
drv *sql.DB
dryRun bool
noDryRun bool
logger *logrus.Logger
wrks int
}
Expand All @@ -67,7 +67,7 @@ func main() {
flag.StringVar(&opt.pprof, "pprof", "", "pprof server address")
flag.IntVar(&opt.workers, "w", 100, "Number of maximum simultaneous connections to database")
flag.BoolVar(&opt.usePass, "p", false, "Use a password to connect to database")
flag.BoolVar(&opt.dryRun, "dry", false, "Replay the requests but don't write in the database")
flag.BoolVar(&opt.noDryRun, "no-dry-run", false, "Replay the requests on the database for real")
flag.Parse()

if err := opt.parse(); err != nil {
Expand Down Expand Up @@ -97,18 +97,27 @@ func main() {
db.logger.Infof("pprof started on 'http://%s'", pprofServer.Addr)
}

r, err := db.replay(f)
var r results

db.logger.Infof("%d workers will be created", opt.workers)
if opt.noDryRun {
db.logger.Warn("no-dry-run flag found, replaying for real")
r, err = db.replay(f)
} else {
db.logger.Warn("replaying with dry run")
r, err = db.dryRun(f)
}
if err != nil {
logrus.Fatalf("cannot replay %s: %s", opt.kind, err)
db.logger.Fatalf("cannot replay %s: %s", opt.kind, err)
}

if opt.dryRun {
r.dryRun = "true"
if opt.noDryRun {
r.dryRun = "no"
} else {
r.dryRun = "false"
r.dryRun = "yes"
}

r.kind = opt.kind

r.show()
}

Expand All @@ -129,7 +138,7 @@ func (o *options) parse() error {

if o.usePass {
fmt.Printf("Password: ")
bytes, err := terminal.ReadPassword(syscall.Stdin)
bytes, err := term.ReadPassword(syscall.Stdin)
if err != nil {
return err
}
Expand Down Expand Up @@ -162,7 +171,7 @@ func (o options) createDB() (*database, error) {
if err != nil {
return nil, err
}
db.dryRun = o.dryRun
db.noDryRun = o.noDryRun

if err = db.drv.Ping(); err != nil {
return nil, err
Expand Down Expand Up @@ -196,6 +205,75 @@ func (o options) createDB() (*database, error) {
return &db, nil
}

func (db *database) dryRun(f io.Reader) (results, error) {
var r results

p := slowql.NewParser(db.kind, f)

queries := make(chan string, 16384)
errors := make(chan error, 16384)
var wg sync.WaitGroup

db.logger.Debug("starting workers pool")
for i := 0; i < db.wrks; i++ {
wg.Add(1)
go db.worker(queries, errors, &wg)
}

db.logger.Debug("starting errors collector")
go r.errorsCollector(errors)

firstPass := true
var previousDate, now time.Time
var sleeping time.Duration

db.logger.Infof("replay started on %s", time.Now().Format("Mon Jan 2 15:04:05"))
s := newSpinner(34)
s.Start()

start := time.Now()
for {
q := p.GetNext()
if q == (query.Query{}) {
s.Stop()
break
}
db.logger.Tracef("query: %s", q.Query)

r.queries++
s.Suffix = " queries replayed: " + strconv.Itoa(r.queries)

// We need a reference time
if firstPass {
firstPass = false
previousDate = q.Time
continue
}

now = q.Time
sleeping = now.Sub(previousDate)
db.logger.Tracef("next sleeping time: %s", sleeping)
time.Sleep(sleeping)

// For MariaDB, when there is multiple queries in a short amount of
// time, the Time field is not repeated, so we do not have to update
// the previous date.
if now != (time.Time{}) {
previousDate = now
}
}
close(queries)
db.logger.Debug("closed queries channel")

wg.Wait()
close(errors)
db.logger.Debug("closed errors channel")

r.duration = time.Since(start)
db.logger.Infof("replay ended on %s", time.Now().Format("Mon Jan 2 15:04:05"))
return r, nil
}

// replay replays the queries from a slow query log file to a database
func (db *database) replay(f io.Reader) (results, error) {
var r results
Expand Down Expand Up @@ -233,10 +311,7 @@ func (db *database) replay(f io.Reader) (results, error) {
r.queries++
s.Suffix = " queries replayed: " + strconv.Itoa(r.queries)

if !db.dryRun {
// Feed the workers with the query
queries <- q.Query
}
queries <- q.Query

// We need a reference time
if firstPass {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ require (
github.com/olekukonko/tablewriter v0.0.5
github.com/sirupsen/logrus v1.8.0
github.com/stretchr/testify v1.4.0 // indirect
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae // indirect
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect
)
6 changes: 0 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,13 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 h1:/ZScEX8SfEmUGRHs0gxpqteO5nfNW6axyZbBdw9A12g=
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae h1:Ih9Yo4hSPImZOpfGuA4bR/ORKTAbhZo2AbWNRCnevdo=
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221 h1:/ZHdbVpdR/jk3g30/d4yUL0JU9kksj8+F/bnQUVLGDM=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
4 changes: 2 additions & 2 deletions slowql.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ type Parser struct {
func NewParser(k Kind, r io.Reader) Parser {
var p Parser

p.rawBlocs = make(chan []string, 1024)
p.rawBlocs = make(chan []string, 4096)
p.servermeta = make(chan []string)
p.waitingList = make(chan query.Query, 1024)
p.waitingList = make(chan query.Query, 4096)

go scan(*bufio.NewScanner(r), p.rawBlocs, p.servermeta)

Expand Down

0 comments on commit 629d908

Please sign in to comment.