Skip to content

Commit

Permalink
[wip] implementing goroutines
Browse files Browse the repository at this point in the history
  • Loading branch information
eze-kiel committed Feb 26, 2021
1 parent d1adee2 commit 86979d1
Showing 1 changed file with 63 additions and 9 deletions.
72 changes: 63 additions & 9 deletions cmd/slowql-replayer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"strconv"
"strings"
"sync"
"syscall"
"time"

Expand All @@ -29,6 +30,7 @@ type options struct {
kind string
database string
loglvl string
workers int
usePass bool
dryRun bool
}
Expand All @@ -39,6 +41,7 @@ type database struct {
drv *sql.DB
dryRun bool
logger *logrus.Logger
wrks int
}

type results struct {
Expand All @@ -58,6 +61,7 @@ func main() {
flag.StringVar(&opt.kind, "k", "", "Kind of the database (mysql, mariadb...)")
flag.StringVar(&opt.database, "db", "", "Name of the database to use")
flag.StringVar(&opt.loglvl, "l", "info", "Logging level")
flag.IntVar(&opt.workers, "w", 100, "Number of maximum simultaneous connections to database")
flag.BoolVar(&opt.usePass, "p", false, "Use a password to connect to database")
flag.BoolVar(&opt.dryRun, "dry", false, "Replay the requests but don't write in the database")
flag.Parse()
Expand Down Expand Up @@ -173,6 +177,9 @@ func (o options) createDB() (*database, error) {
}
db.logger.Debugf("log level set to %s", db.logger.GetLevel())

db.wrks = o.workers
db.logger.Debugf("workers number set to %s", db.wrks)

return &db, nil
}

Expand All @@ -182,6 +189,19 @@ func (db *database) replay(f io.Reader) (results, error) {

p := slowql.NewParser(db.kind, f)

queries := make(chan string, 16384)
errors := make(chan error, 16384)
var wg sync.WaitGroup

db.logger.Debug("starting workers pool")
for i := 0; i < db.wrks; i++ {
wg.Add(1)
go db.worker(queries, errors, &wg)
}

db.logger.Debug("starting errors collector")
go r.errorsCollector(errors)

start := time.Now()
db.logger.Infof("replay started on %s", time.Now().Format("Mon Jan 2 15:04:05"))
s := newSpinner(34)
Expand All @@ -192,6 +212,7 @@ func (db *database) replay(f io.Reader) (results, error) {
for {
q := p.GetNext()
if q == (slowql.Query{}) {
s.Stop()
break
}
db.logger.Tracef("query: %s", q.Query)
Expand All @@ -200,14 +221,8 @@ func (db *database) replay(f io.Reader) (results, error) {
s.Suffix = " queries replayed: " + strconv.Itoa(r.queries)

if !db.dryRun {
rows, err := db.drv.Query(q.Query)
if err != nil {
r.errors++
db.logger.Debugf("failed to execute query:\n%s\nerror: %s", q.Query, err)
}
if rows != nil {
rows.Close()
}
// Feed the workers with the query
queries <- q.Query
}

// We need a reference time
Expand All @@ -229,8 +244,13 @@ func (db *database) replay(f io.Reader) (results, error) {
previousDate = now
}
}
close(queries)
db.logger.Debug("closed queries channel")

wg.Wait()
close(errors)
db.logger.Debug("closed errors channel")

s.Stop()
r.duration = time.Since(start)
db.logger.Infof("replay ended on %s", time.Now().Format("Mon Jan 2 15:04:05"))
return r, nil
Expand Down Expand Up @@ -259,3 +279,37 @@ func newTable() *tablewriter.Table {
table.SetHeader([]string{"DB", "dry run", "Queries", "Errors", "Duration"})
return table
}

func (db database) worker(queries chan string, errors chan error, wg *sync.WaitGroup) {
for {
select {
case q, ok := <-queries:
if !ok {
db.logger.Trace("channel closed, worker exiting")
wg.Done()
return
}
rows, err := db.drv.Query(q)
if err != nil {
errors <- err
db.logger.Debugf("failed to execute query:\n%s\nerror: %s", q, err)
}
if rows != nil {
rows.Close()

}
}
}
}

func (r *results) errorsCollector(errors chan error) {
for {
select {
case _, ok := <-errors:
if !ok {
return
}
r.errors++
}
}
}

0 comments on commit 86979d1

Please sign in to comment.