Skip to content

Commit

Permalink
fix delay with mysql
Browse files Browse the repository at this point in the history
  • Loading branch information
eze-kiel committed Apr 7, 2021
1 parent b84e6e9 commit 7819111
Showing 1 changed file with 109 additions and 91 deletions.
200 changes: 109 additions & 91 deletions cmd/slowql-replayer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ type results struct {
realDuration time.Duration
}

type job struct {
query string
idle time.Time
}

func main() {
var opt options

Expand Down Expand Up @@ -113,7 +118,7 @@ func main() {
r, err = db.replay(f)
} else {
db.logger.Warn("replaying with dry run")
r, err = db.dryRun(f)
// r, err = db.dryRun(f)
}
if err != nil {
db.logger.Fatalf("cannot replay %s: %s", opt.kind, err)
Expand Down Expand Up @@ -208,87 +213,96 @@ func (o options) createDB() (*database, error) {
db.wrks = o.workers
db.logger.Debugf("workers number set to %d", db.wrks)

return &db, nil
}

func (db *database) dryRun(f io.Reader) (results, error) {
var r results

p := slowql.NewParser(db.kind, f)

queries := make(chan string, 16384)
errors := make(chan error, 16384)
var wg sync.WaitGroup

db.logger.Debug("starting workers pool")
var workersCounter int
for i := 0; i < db.wrks; i++ {
wg.Add(1)
workersCounter++
go db.worker(queries, errors, &wg)
}
db.logger.Debugf("created %d workers successfully", workersCounter)
db.logger.Debug("starting errors collector")
go r.errorsCollector(errors)

firstPass := true
var previousDate, now time.Time
var sleeping time.Duration

db.logger.Infof("replay started on %s", time.Now().Format("Mon Jan 2 15:04:05"))
s := newSpinner(34)
s.Start()

start := time.Now()
for {
q := p.GetNext()
if q == (query.Query{}) {
s.Stop()
break
}
db.logger.Tracef("query: %s", q.Query)

r.queries++
s.Suffix = " queries replayed: " + strconv.Itoa(r.queries)

// We need a reference time
if firstPass {
firstPass = false
previousDate = q.Time
continue
}

now = q.Time
sleeping = now.Sub(previousDate)
db.logger.Tracef("next sleeping time: %s", sleeping)
time.Sleep(sleeping)

// For MariaDB, when there is multiple queries in a short amount of
// time, the Time field is not repeated, so we do not have to update
// the previous date.
if now != (time.Time{}) {
previousDate = now
}
}
close(queries)
db.logger.Debug("closed queries channel")
maxOpen := db.wrks * 2
maxIdle := db.wrks / 4

wg.Wait()
close(errors)
db.logger.Debug("closed errors channel")
db.drv.SetMaxOpenConns(maxOpen)
db.drv.SetMaxIdleConns(maxIdle)
db.drv.SetConnMaxLifetime(50 * time.Millisecond)
db.logger.Debugf("db max open conns: %d", maxOpen)
db.logger.Debugf("db max idle conns: %d", maxIdle)

r.duration = time.Since(start)
db.logger.Infof("replay ended on %s", time.Now().Format("Mon Jan 2 15:04:05"))
return r, nil
return &db, nil
}

// func (db *database) dryRun(f io.Reader) (results, error) {
// var r results

// p := slowql.NewParser(db.kind, f)

// queries := make(chan string, 16384)
// errors := make(chan error, 16384)
// var wg sync.WaitGroup

// db.logger.Debug("starting workers pool")
// var workersCounter int
// for i := 0; i < db.wrks; i++ {
// wg.Add(1)
// workersCounter++
// go db.worker(queries, errors, &wg)
// }
// db.logger.Debugf("created %d workers successfully", workersCounter)
// db.logger.Debug("starting errors collector")
// go r.errorsCollector(errors)

// firstPass := true
// var previousDate, now time.Time
// var sleeping time.Duration

// db.logger.Infof("replay started on %s", time.Now().Format("Mon Jan 2 15:04:05"))
// s := newSpinner(34)
// s.Start()

// start := time.Now()
// for {
// q := p.GetNext()
// if q == (query.Query{}) {
// s.Stop()
// break
// }
// db.logger.Tracef("query: %s", q.Query)

// r.queries++
// s.Suffix = " queries replayed: " + strconv.Itoa(r.queries)

// // We need a reference time
// if firstPass {
// firstPass = false
// previousDate = q.Time
// continue
// }

// now = q.Time
// sleeping = now.Sub(previousDate)
// db.logger.Tracef("next sleeping time: %s", sleeping)
// time.Sleep(sleeping)

// // For MariaDB, when there is multiple queries in a short amount of
// // time, the Time field is not repeated, so we do not have to update
// // the previous date.
// if now != (time.Time{}) {
// previousDate = now
// }
// }
// close(queries)
// db.logger.Debug("closed queries channel")

// wg.Wait()
// close(errors)
// db.logger.Debug("closed errors channel")

// r.duration = time.Since(start)
// db.logger.Infof("replay ended on %s", time.Now().Format("Mon Jan 2 15:04:05"))
// return r, nil
// }

// replay replays the queries from a slow query log file to a database
func (db *database) replay(f io.Reader) (results, error) {
var r results

p := slowql.NewParser(db.kind, f)

queries := make(chan string, 16384)
jobs := make(chan job, 65535)
errors := make(chan error, 16384)
var wg sync.WaitGroup

Expand All @@ -297,7 +311,7 @@ func (db *database) replay(f io.Reader) (results, error) {
for i := 0; i < db.wrks; i++ {
wg.Add(1)
workersCounter++
go db.worker(queries, errors, &wg)
go db.worker(jobs, errors, &wg)
}
db.logger.Debugf("created %d workers successfully", workersCounter)
db.logger.Debug("starting errors collector")
Expand All @@ -308,8 +322,9 @@ func (db *database) replay(f io.Reader) (results, error) {
s.Start()

firstPass := true
var previousDate time.Time

// first timestamp that appears in the log file
var reference time.Time
start := time.Now()
for {
q := p.GetNext()
Expand All @@ -322,28 +337,27 @@ func (db *database) replay(f io.Reader) (results, error) {
r.queries++
s.Suffix = " queries replayed: " + strconv.Itoa(r.queries)

queries <- q.Query

// We need a reference time
if firstPass {
firstPass = false
previousDate = q.Time
continue
reference = q.Time
}

now := q.Time
sleeping := now.Sub(previousDate)
db.logger.Tracef("next sleeping time: %s", sleeping)
time.Sleep(sleeping)

var j job
delta := q.Time.Sub(reference)
j.idle = start.Add(delta)
j.query = q.Query
db.logger.Tracef("next sleeping time: %s", j.idle)
// time.Sleep(sleeping)
jobs <- j
// For MariaDB, when there is multiple queries in a short amount of
// time, the Time field is not repeated, so we do not have to update
// the previous date.
if now != (time.Time{}) {
previousDate = now
}
// if now != (time.Time{}) {
// previousDate = now
// }
}
close(queries)
close(jobs)
db.logger.Debug("closed queries channel")

wg.Wait()
Expand Down Expand Up @@ -421,18 +435,22 @@ func newSpinner(t int) *spinner.Spinner {
return spinner.New(spinner.CharSets[t], 100*time.Millisecond)
}

func (db database) worker(queries chan string, errors chan error, wg *sync.WaitGroup) {
func (db database) worker(jobs chan job, errors chan error, wg *sync.WaitGroup) {
defer wg.Done()
for {
q, ok := <-queries
j, ok := <-jobs
if !ok {
db.logger.Trace("channel closed, worker exiting")
return
}
rows, err := db.drv.Query(q)
sleep := time.Until(j.idle)
if sleep > 0 {
time.Sleep(sleep)
}
rows, err := db.drv.Query(j.query)
if err != nil {
errors <- err
db.logger.Debugf("failed to execute query:\n%s\nerror: %s", q, err)
db.logger.Debugf("failed to execute query:\n%s\nerror: %s", j.query, err)
}
if rows != nil {
rows.Close()
Expand Down

0 comments on commit 7819111

Please sign in to comment.