Skip to content

Commit

Permalink
Added prefix match for consistent hash
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhansen committed Sep 4, 2018
1 parent 24b0969 commit c9e7e8a
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 20 deletions.
62 changes: 54 additions & 8 deletions consistenthash/consistenthash.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,32 @@ package consistenthash

import (
"hash/crc32"
"math/bits"
"sort"
"strconv"
)

type Hash func(data []byte) uint32

type Map struct {
hash Hash
replicas int
keys []int // Sorted
hashMap map[int]string
hash Hash
replicas int
prefixTableExpansion int

keys []int // Sorted
hashMap map[int]string

prefixBits uint32
prefixShift uint32
prefixTable []string
}

func New(replicas int, fn Hash) *Map {
func New(replicas int, tableExpansion int, fn Hash) *Map {
m := &Map{
replicas: replicas,
hash: fn,
hashMap: make(map[int]string),
replicas: replicas,
hash: fn,
hashMap: make(map[int]string),
prefixTableExpansion: tableExpansion,
}
if m.hash == nil {
m.hash = crc32.ChecksumIEEE
Expand All @@ -59,6 +67,37 @@ func (m *Map) Add(keys ...string) {
}
}
sort.Ints(m.keys)

// Find minimum number of bits to hold |keys| * prefixTableExpansion
m.prefixBits = uint32(bits.Len32(uint32(len(m.keys) * m.prefixTableExpansion)))
m.prefixShift = 32 - m.prefixBits

prefixTableSize := 1 << m.prefixBits
m.prefixTable = make([]string, prefixTableSize)

previousKeyPrefix := -1 // Effectively -Inf
currentKeyIdx := 0
currentKeyPrefix := m.keys[currentKeyIdx] >> m.prefixShift

for i := range m.prefixTable {
if previousKeyPrefix < i && currentKeyPrefix > i {
// All keys with this prefix will map to a single value
m.prefixTable[i] = m.hashMap[m.keys[currentKeyIdx]]
} else {
// Several keys might have the same prefix. Walk
// over them until it changes
previousKeyPrefix = currentKeyPrefix
for currentKeyPrefix == previousKeyPrefix {
currentKeyIdx++
if currentKeyIdx < len(m.keys) {
currentKeyPrefix = m.keys[currentKeyIdx] >> m.prefixShift
} else {
currentKeyIdx = 0
currentKeyPrefix = prefixTableSize + 1 // Effectively +Inf
}
}
}
}
}

// Gets the closest item in the hash to the provided key.
Expand All @@ -69,6 +108,13 @@ func (m *Map) Get(key string) string {

hash := int(m.hash([]byte(key)))

// Look for the hash prefix in the prefix table
prefixSlot := hash >> m.prefixShift
tableResult := m.prefixTable[prefixSlot]
if len(tableResult) > 0 {
return tableResult
}

// Binary search for appropriate replica.
idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash })

Expand Down
26 changes: 16 additions & 10 deletions consistenthash/consistenthash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestHashing(t *testing.T) {

// Override the hash function to return easier to reason about values. Assumes
// the keys can be converted to an integer.
hash := New(3, func(key []byte) uint32 {
hash := New(3, 6, func(key []byte) uint32 {
i, err := strconv.Atoi(string(key))
if err != nil {
panic(err)
Expand Down Expand Up @@ -66,8 +66,8 @@ func TestHashing(t *testing.T) {
}

func TestConsistency(t *testing.T) {
hash1 := New(1, nil)
hash2 := New(1, nil)
hash1 := New(1, 6, nil)
hash2 := New(1, 6, nil)

hash1.Add("Bill", "Bob", "Bonny")
hash2.Add("Bob", "Bonny", "Bill")
Expand All @@ -86,25 +86,31 @@ func TestConsistency(t *testing.T) {

}

func BenchmarkGet8(b *testing.B) { benchmarkGet(b, 8) }
func BenchmarkGet32(b *testing.B) { benchmarkGet(b, 32) }
func BenchmarkGet128(b *testing.B) { benchmarkGet(b, 128) }
func BenchmarkGet512(b *testing.B) { benchmarkGet(b, 512) }
func BenchmarkGet8(b *testing.B) { benchmarkGet(b, 8, 6) }
func BenchmarkGet32(b *testing.B) { benchmarkGet(b, 32, 6) }
func BenchmarkGet128(b *testing.B) { benchmarkGet(b, 128, 6) }
func BenchmarkGet512(b *testing.B) { benchmarkGet(b, 512, 6) }

func benchmarkGet(b *testing.B, shards int) {
func benchmarkGet(b *testing.B, shards int, expansion int) {

hash := New(50, nil)
hash := New(50, expansion, nil)

var buckets []string
for i := 0; i < shards; i++ {
buckets = append(buckets, fmt.Sprintf("shard-%d", i))
}

testStringCount := shards
var testStrings []string
for i := 0; i < testStringCount; i++ {
testStrings = append(testStrings, fmt.Sprintf("%d", i))
}

hash.Add(buckets...)

b.ResetTimer()

for i := 0; i < b.N; i++ {
hash.Get(buckets[i&(shards-1)])
hash.Get(testStrings[i&(testStringCount-1)])
}
}
6 changes: 4 additions & 2 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const defaultBasePath = "/_groupcache/"

const defaultReplicas = 50

const defaultHashExpansion = 6

// HTTPPool implements PeerPicker for a pool of HTTP peers.
type HTTPPool struct {
// Context optionally specifies a context for the server to use when it
Expand Down Expand Up @@ -106,7 +108,7 @@ func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {
if p.opts.Replicas == 0 {
p.opts.Replicas = defaultReplicas
}
p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
p.peers = consistenthash.New(p.opts.Replicas, defaultHashExpansion, p.opts.HashFn)

RegisterPeerPicker(func() PeerPicker { return p })
return p
Expand All @@ -118,7 +120,7 @@ func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {
func (p *HTTPPool) Set(peers ...string) {
p.mu.Lock()
defer p.mu.Unlock()
p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
p.peers = consistenthash.New(p.opts.Replicas, defaultHashExpansion, p.opts.HashFn)
p.peers.Add(peers...)
p.httpGetters = make(map[string]*httpGetter, len(peers))
for _, peer := range peers {
Expand Down

0 comments on commit c9e7e8a

Please sign in to comment.