Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix commit status index problem #17061

Merged
merged 9 commits into from
Sep 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 91 additions & 16 deletions models/commit_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,82 @@ type CommitStatus struct {

func init() {
db.RegisterModel(new(CommitStatus))
db.RegisterModel(new(CommitStatusIndex))
}

// upsertCommitStatusIndex the function will not return until it acquires the lock or receives an error.
func upsertCommitStatusIndex(e db.Engine, repoID int64, sha string) (err error) {
// An atomic UPSERT operation (INSERT/UPDATE) is the only operation
// that ensures that the key is actually locked.
switch {
case setting.Database.UseSQLite3 || setting.Database.UsePostgreSQL:
_, err = e.Exec("INSERT INTO `commit_status_index` (repo_id, sha, max_index) "+
"VALUES (?,?,1) ON CONFLICT (repo_id,sha) DO UPDATE SET max_index = `commit_status_index`.max_index+1",
repoID, sha)
case setting.Database.UseMySQL:
_, err = e.Exec("INSERT INTO `commit_status_index` (repo_id, sha, max_index) "+
"VALUES (?,?,1) ON DUPLICATE KEY UPDATE max_index = max_index+1",
repoID, sha)
case setting.Database.UseMSSQL:
// https://weblogs.sqlteam.com/dang/2009/01/31/upsert-race-condition-with-merge/
_, err = e.Exec("MERGE `commit_status_index` WITH (HOLDLOCK) as target "+
"USING (SELECT ? AS repo_id, ? AS sha) AS src "+
"ON src.repo_id = target.repo_id AND src.sha = target.sha "+
"WHEN MATCHED THEN UPDATE SET target.max_index = target.max_index+1 "+
"WHEN NOT MATCHED THEN INSERT (repo_id, sha, max_index) "+
"VALUES (src.repo_id, src.sha, 1);",
repoID, sha)
default:
return fmt.Errorf("database type not supported")
}
return
}

// GetNextCommitStatusIndex retried 3 times to generate a resource index
func GetNextCommitStatusIndex(repoID int64, sha string) (int64, error) {
for i := 0; i < db.MaxDupIndexAttempts; i++ {
idx, err := getNextCommitStatusIndex(repoID, sha)
if err == db.ErrResouceOutdated {
continue
}
if err != nil {
return 0, err
}
return idx, nil
}
return 0, db.ErrGetResourceIndexFailed
}

// getNextCommitStatusIndex return the next index
func getNextCommitStatusIndex(repoID int64, sha string) (int64, error) {
ctx, commiter, err := db.TxContext()
if err != nil {
return 0, err
}
defer commiter.Close()

var preIdx int64
_, err = ctx.Engine().SQL("SELECT max_index FROM `commit_status_index` WHERE repo_id = ? AND sha = ?", repoID, sha).Get(&preIdx)
if err != nil {
return 0, err
}

if err := upsertCommitStatusIndex(ctx.Engine(), repoID, sha); err != nil {
return 0, err
}

var curIdx int64
has, err := ctx.Engine().SQL("SELECT max_index FROM `commit_status_index` WHERE repo_id = ? AND sha = ? AND max_index=?", repoID, sha, preIdx+1).Get(&curIdx)
if err != nil {
return 0, err
}
if !has {
return 0, db.ErrResouceOutdated
}
if err := commiter.Commit(); err != nil {
return 0, err
}
return curIdx, nil
}

func (status *CommitStatus) loadAttributes(e db.Engine) (err error) {
Expand Down Expand Up @@ -142,6 +218,14 @@ func sortCommitStatusesSession(sess *xorm.Session, sortType string) {
}
}

// CommitStatusIndex represents a table for commit status index
type CommitStatusIndex struct {
ID int64
RepoID int64 `xorm:"unique(repo_sha)"`
SHA string `xorm:"unique(repo_sha)"`
MaxIndex int64 `xorm:"index"`
}

// GetLatestCommitStatus returns all statuses with a unique context for a given commit.
func GetLatestCommitStatus(repoID int64, sha string, listOptions ListOptions) ([]*CommitStatus, error) {
return getLatestCommitStatus(db.DefaultContext().Engine(), repoID, sha, listOptions)
Expand Down Expand Up @@ -206,6 +290,12 @@ func NewCommitStatus(opts NewCommitStatusOptions) error {
return fmt.Errorf("NewCommitStatus[%s, %s]: no user specified", repoPath, opts.SHA)
}

// Get the next Status Index
idx, err := GetNextCommitStatusIndex(opts.Repo.ID, opts.SHA)
if err != nil {
return fmt.Errorf("generate commit status index failed: %v", err)
}

ctx, committer, err := db.TxContext()
if err != nil {
return fmt.Errorf("NewCommitStatus[repo_id: %d, user_id: %d, sha: %s]: %v", opts.Repo.ID, opts.Creator.ID, opts.SHA, err)
Expand All @@ -218,22 +308,7 @@ func NewCommitStatus(opts NewCommitStatusOptions) error {
opts.CommitStatus.SHA = opts.SHA
opts.CommitStatus.CreatorID = opts.Creator.ID
opts.CommitStatus.RepoID = opts.Repo.ID

// Get the next Status Index
var nextIndex int64
lastCommitStatus := &CommitStatus{
SHA: opts.SHA,
RepoID: opts.Repo.ID,
}
has, err := ctx.Engine().Desc("index").Limit(1).Get(lastCommitStatus)
if err != nil {
return fmt.Errorf("NewCommitStatus[%s, %s]: %v", repoPath, opts.SHA, err)
}
if has {
log.Debug("NewCommitStatus[%s, %s]: found", repoPath, opts.SHA)
nextIndex = lastCommitStatus.Index
}
opts.CommitStatus.Index = nextIndex + 1
opts.CommitStatus.Index = idx
log.Debug("NewCommitStatus[%s, %s]: %d", repoPath, opts.SHA, opts.CommitStatus.Index)

opts.CommitStatus.ContextHash = hashCommitStatusContext(opts.CommitStatus.Context)
Expand Down
5 changes: 3 additions & 2 deletions models/db/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,13 @@ var (
)

const (
maxDupIndexAttempts = 3
// MaxDupIndexAttempts max retry times to create index
MaxDupIndexAttempts = 3
)

// GetNextResourceIndex retried 3 times to generate a resource index
func GetNextResourceIndex(tableName string, groupID int64) (int64, error) {
for i := 0; i < maxDupIndexAttempts; i++ {
for i := 0; i < MaxDupIndexAttempts; i++ {
idx, err := getNextResourceIndex(tableName, groupID)
if err == ErrResouceOutdated {
continue
Expand Down
5 changes: 5 additions & 0 deletions models/fixtures/commit_status_index.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-
id: 1
repo_id: 1
sha: "1234123412341234123412341234123412341234"
max_index: 5
2 changes: 2 additions & 0 deletions models/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ var migrations = []Migration{
NewMigration("Add repo id column for attachment table", addRepoIDForAttachment),
// v194 -> v195
NewMigration("Add Branch Protection Unprotected Files Column", addBranchProtectionUnprotectedFilesColumn),
// v196 -> v197
lunny marked this conversation as resolved.
Show resolved Hide resolved
NewMigration("Add table commit_status_index", addTableCommitStatusIndex),
}

// GetCurrentDBVersion returns the current db version
Expand Down
47 changes: 47 additions & 0 deletions models/migrations/v195.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2021 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package migrations

import (
"fmt"

"xorm.io/xorm"
)

func addTableCommitStatusIndex(x *xorm.Engine) error {
// CommitStatusIndex represents a table for commit status index
type CommitStatusIndex struct {
ID int64
RepoID int64 `xorm:"unique(repo_sha)"`
SHA string `xorm:"unique(repo_sha)"`
MaxIndex int64 `xorm:"index"`
}
lunny marked this conversation as resolved.
Show resolved Hide resolved

if err := x.Sync2(new(CommitStatusIndex)); err != nil {
return fmt.Errorf("Sync2: %v", err)
}

sess := x.NewSession()
defer sess.Close()

if err := sess.Begin(); err != nil {
return err
}

// Remove data we're goint to rebuild
if _, err := sess.Table("commit_status_index").Where("1=1").Delete(&CommitStatusIndex{}); err != nil {
return err
}

// Create current data for all repositories with issues and PRs
if _, err := sess.Exec("INSERT INTO commit_status_index (repo_id, sha, max_index) " +
"SELECT max_data.repo_id, max_data.sha, max_data.max_index " +
"FROM ( SELECT commit_status.repo_id AS repo_id, commit_status.sha AS sha, max(commit_status.`index`) AS max_index " +
"FROM commit_status GROUP BY commit_status.repo_id, commit_status.sha) AS max_data"); err != nil {
return err
}

return sess.Commit()
}
62 changes: 62 additions & 0 deletions models/migrations/v195_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2021 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package migrations

import (
"testing"

"github.com/stretchr/testify/assert"
)

func Test_addTableCommitStatusIndex(t *testing.T) {
// Create the models used in the migration
type CommitStatus struct {
ID int64 `xorm:"pk autoincr"`
Index int64 `xorm:"INDEX UNIQUE(repo_sha_index)"`
RepoID int64 `xorm:"INDEX UNIQUE(repo_sha_index)"`
SHA string `xorm:"VARCHAR(64) NOT NULL INDEX UNIQUE(repo_sha_index)"`
}

// Prepare and load the testing database
x, deferable := prepareTestEnv(t, 0, new(CommitStatus))
if x == nil || t.Failed() {
defer deferable()
return
}
defer deferable()

// Run the migration
if err := addTableCommitStatusIndex(x); err != nil {
assert.NoError(t, err)
return
}

type CommitStatusIndex struct {
ID int64
RepoID int64 `xorm:"unique(repo_sha)"`
SHA string `xorm:"unique(repo_sha)"`
MaxIndex int64 `xorm:"index"`
}

var start = 0
const batchSize = 1000
for {
var indexes = make([]CommitStatusIndex, 0, batchSize)
err := x.Table("commit_status_index").Limit(batchSize, start).Find(&indexes)
assert.NoError(t, err)

for _, idx := range indexes {
var maxIndex int
has, err := x.SQL("SELECT max(`index`) FROM commit_status WHERE repo_id = ? AND sha = ?", idx.RepoID, idx.SHA).Get(&maxIndex)
assert.NoError(t, err)
assert.True(t, has)
assert.EqualValues(t, maxIndex, idx.MaxIndex)
}
if len(indexes) < batchSize {
break
}
start += len(indexes)
}
}
2 changes: 1 addition & 1 deletion models/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func (pr *PullRequest) SetMerged() (bool, error) {
func NewPullRequest(repo *Repository, issue *Issue, labelIDs []int64, uuids []string, pr *PullRequest) (err error) {
idx, err := db.GetNextResourceIndex("issue_index", repo.ID)
if err != nil {
return fmt.Errorf("generate issue index failed: %v", err)
return fmt.Errorf("generate pull request index failed: %v", err)
}

issue.Index = idx
Expand Down