Go语言整体替换式Map的Lock-Free实现
今天来分享一个写业务过程中,一种特殊场景下的 Lock-Free Map 的实现。
来,我们先抛开看了三遍可能都还看不懂的题目,讲讲故事的背景。
背景/需求
假设我们需要一个用户黑名单,map[string]struct{}
。其中,key
是用户的 id
,value
是一个空结构体,一个典型的 Set 实现,并且利用了 struct{}
占用空间为 0 的特性。
其实我们完全可以使用 Redis 来实现,但我们为了省钱,不想为了这么一个小功能就再买个 Redis。同时黑名单的用户毕竟很少,所以我们把整个黑名单放到内存,存到 map
里,也是可以接受的。
但黑名单需要动态地去更新,比如从数据库隔几分钟拉一次数据,然后更新黑名单的 map
。
问题来了,更新 map
并不是原子操作,怎么保证线程安全?
sync.Map
你可能会突然想到,sync.Map
啊!
但它其实并不是我们想要的。抛开性能不谈,每次在更新数据库的时候,使用 sync.Map
还是要去分析两个 map
的 key
的差异,然后去更新。完全不是我们需要的整体替换。
sync.RWMutex
然后我们又想到了 sync.RWMutex
,在写的时候加写锁,读的时候加读锁,代码如下:
package maps
import "sync"
// ConcurrentMapOnlyReplace is a concurrent map that only supports Get specific key and Replace the entire map.
type ConcurrentMapOnlyReplace[K comparable, V any] interface {
Get(key K) (value V, exist bool)
Replace(newMap map[K]V)
ToMap() map[K]V
}
// MapRWMutex is a thread-safe map, it can be read or replace entire map concurrently.
// Note: it doesn't support edit specific key's value
type MapRWMutex[K comparable, V any] struct {
m map[K]V
// We simply update the entire map each time,
// instead of concurrently modifying the values within the map.
// So we don't need to use sync.Map
rw sync.RWMutex
}
func NewMapRWMutex[K comparable, V any]() *MapRWMutex[K, V] {
return &MapRWMutex[K, V]{
m: make(map[K]V),
}
}
// Get returns the value and exist status for the given key.
func (m *MapRWMutex[K, V]) Get(key K) (value V, exist bool) {
m.rw.RLock()
defer m.rw.RUnlock()
value, exist = m.m[key]
return
}
// ToMap returns a copy of the data as a regular map.
func (m *MapRWMutex[K, V]) ToMap() map[K]V {
m.rw.RLock()
defer m.rw.RUnlock()
newMap := make(map[K]V, len(m.m))
for k, v := range m.m {
newMap[k] = v
}
return newMap
}
// Replace replaces the entire map with a new map.
func (m *MapRWMutex[K, V]) Replace(newMap map[K]V) {
m.rw.Lock()
defer m.rw.Unlock()
m.m = newMap
return
}
var _ ConcurrentMapOnlyReplace[int, int] = (*MapRWMutex[int, int])(nil)
- 这里用到了泛型,需要 Go 1.18+。能够支持任意类型的
key
和value
。 - 抽象出了一个
ConcurrentMapOnlyReplace
接口,方便后续扩展不同的实现。
使用方式大概是这样:
package oauth
import (
"time"
"xxx/maps"
"xxx/models"
"xxx/utils"
)
var ubr *UserBanReader
func init() {
// hungry singleton
ubr = NewUserBanReader()
}
type userId = string
type UserBanReader struct {
bans maps.ConcurrentMapOnlyReplace[userId, struct{}]
// db
}
func GetUserBanReader() *UserBanReader {
return ubr
}
func NewUserBanReader() *UserBanReader {
umr := &UserBanReader{
bans: maps.NewMapRWMutex[userId, struct{}](),
}
umr.syncBanList()
go utils.NewSimpleCronJob(1*time.Minute, umr.syncBanList)
return umr
}
func (ubr *UserBanReader) syncBanList() {
users := ubr.readBannedUsersFromDB()
newBans := make(map[userId]struct{}, len(users))
for _, user := range users {
newBans[user.ID.Hex()] = struct{}{}
}
// update the entire map
ubr.bans.Replace(newBans)
return
}
func (ubr *UserBanReader) readBannedUsersFromDB() []models.User {
// todo: read banned users from db
// just user id is enough, no need to read the entire user info
return nil
}
func (ubr *UserBanReader) IsBanned(userId userId) bool {
_, banned := ubr.bans.Get(userId)
return banned
}
可以看到,在读取数据库的过程中,是不用加锁的,只有在读取数据库结束,替换 map
的那一刻,才需要加锁。所以总体上,系统「卡顿」的时间几乎为零,没有毛刺。
其中,
NewSimpleCronJob
是我写的一个简单的cron job
实现,比cronjob
库用起来更简单高效,详见附录。
Lock-Free Map 的实现
虽然我们已经用了 RWMutex
比 Mutex
好很多了。但我们的场景其实,读»写,而在每次读的时候,都需要有一次锁的开销,能否做到 Lock-Free 呢?
当然可以!我们可以用 atomic.Value
来实现。庆幸的是,atomic.Pointer
也支持泛型,我们可以实现一个满血版的 Lock-Free Map!(完全替换 map
场景)。
package maps
import "sync/atomic"
type MapLockFree[K comparable, V any] struct {
m *atomic.Pointer[map[K]V]
}
func NewMapLockFree[K comparable, V any]() *MapLockFree[K, V] {
mapLockFree := atomic.Pointer[map[K]V]{}
return &MapLockFree[K, V]{
m: &mapLockFree,
}
}
// Get returns the value and exist status for the given key.
func (m *MapLockFree[K, V]) Get(key K) (value V, exist bool) {
mm := m.m.Load()
if mm == nil {
return
}
value, exist = (*mm)[key]
return
}
// ToMap returns a copy of the data as a regular map.
func (m *MapLockFree[K, V]) ToMap() map[K]V {
mm := m.m.Load()
if mm == nil {
return nil
}
newMap := make(map[K]V, len(*mm))
for k, v := range *mm {
newMap[k] = v
}
return newMap
}
// Replace replaces the entire map with a new map.
func (m *MapLockFree[K, V]) Replace(newMap map[K]V) {
m.m.Store(&newMap)
return
}
var _ ConcurrentMapOnlyReplace[int, int] = (*MapLockFree[int, int])(nil)
测试!!!
虽然目测,Lock-Free Map 的性能会更好,但是写代码不能只靠感觉,真男人,就来 Benchmark 一下!
(其实还要先测一下两个 map
的正确性,相对简单,这里就不贴代码了)
我们再来测一下,不检验正确性下,两个 map
的性能:
func BenchmarkMapRWMutex_GetAndReplace(b *testing.B) {
// Create a new MapRWMutex and populate it with initial data
m := NewMapRWMutex[string, string]()
benchmarkMapGetAndReplace(b, m)
}
func BenchmarkMapLockFree_GetAndReplace(b *testing.B) {
// Create a new MapLockFree and populate it with initial data
m := NewMapLockFree[string, string]()
benchmarkMapGetAndReplace(b, m)
}
func benchmarkMapGetAndReplace(b *testing.B, m ConcurrentMapOnlyReplace[string, string]) {
const (
mapLength = 10000
testBatchNum = 10
)
keysBatch := make([][]string, testBatchNum)
mapBatch := make([]map[string]string, testBatchNum)
for i := 0; i < testBatchNum; i++ {
keys, maps := generateKeyValues(mapLength)
keysBatch[i] = keys
mapBatch[i] = maps
}
b.ResetTimer()
// Run concurrent Get and Replace operations
b.RunParallel(func(pb *testing.PB) {
i := 0
m.Replace(mapBatch[0])
for pb.Next() {
batchCur := i % testBatchNum
keys := keysBatch[batchCur]
// Perform Get operation
for _, key := range keys {
_, _ = m.Get(key)
}
// Perform Replace operation
m.Replace(mapBatch[batchCur])
i++
}
})
}
// todo: use file to store the data to make sure the data is the same in the two benchmark
func generateKeyValues(length int) ([]string, map[string]string) {
keys := make([]string, length)
maps := make(map[string]string, length)
for i := 0; i < length; i++ {
key := uuid.New().String()
value := uuid.New().String()
keys[i] = key
maps[key] = value
}
return keys, maps
}
可以看到:
- 抽象出了共用的接口后,测试两个不同的 Map 实现,就非常简单了
- 写了一个生成随机数据的函数,
key
和value
都是uuid
。这里其实最好把生成的数据存到文件里,这样测试更精准。不过我们只需要测数量级,粗糙点也没关系。 - 我们每次生成包含 10000 个
key
的map
,然后并发地去读取所有的key
,模拟极端的读取场景。 - 使用了
ResetTimer
,规避了生成数据的时间。提前准备了 10 组测试数据,然后在RunParallel
中,通过取余来循环使用。(没有使用随机数也是为了防止随机数生成的时间影响测试结果)
GoLand
的 Run
按钮,很香。结果如下:
- RW Mutex
BenchmarkMapRWMutex_GetAndReplace-8 1318 918932 ns/op
在 1 秒内,可以完成 1318 次操作,每次操作耗时 918932 纳秒。
- Lock-Free
BenchmarkMapLockFree_GetAndReplace-8 15111 75722 ns/op
在 1 秒内,可以完成 15111 次操作,每次操作耗时 75722 纳秒。
总体上,Lock-Free 的性能要好10倍以上!(如果数据量和测试方法不一样,可能会有差异,但差了一个数量级是肯定的)
附录 - SimpleCronJob
package utils
import (
"context"
"time"
)
// SimpleCronJob will run a job every duration
type SimpleCronJob struct {
Job []func()
duration time.Duration
}
func NewSimpleCronJob(duration time.Duration, jobs ...func()) *SimpleCronJob {
if duration <= 0 {
duration = 5 * time.Minute
}
return &SimpleCronJob{
Job: jobs,
duration: duration,
}
}
func (scj *SimpleCronJob) Run(ctx context.Context) {
if scj.Job == nil {
return
}
ticker := time.NewTicker(scj.duration)
for {
select {
case <-ticker.C:
for _, job := range scj.Job {
job()
}
case <-ctx.Done():
ticker.Stop()
return
}
}
}