目录

Go语言整体替换式Map的Lock-Free实现

今天来分享一个写业务过程中,一种特殊场景下的 Lock-Free Map 的实现。

来,我们先抛开看了三遍可能都还看不懂的题目,讲讲故事的背景。

背景/需求

假设我们需要一个用户黑名单,map[string]struct{}。其中,key 是用户的 idvalue 是一个空结构体,一个典型的 Set 实现,并且利用了 struct{} 占用空间为 0 的特性。

其实我们完全可以使用 Redis 来实现,但我们为了省钱,不想为了这么一个小功能就再买个 Redis。同时黑名单的用户毕竟很少,所以我们把整个黑名单放到内存,存到 map 里,也是可以接受的。

但黑名单需要动态地去更新,比如从数据库隔几分钟拉一次数据,然后更新黑名单的 map

问题来了,更新 map 并不是原子操作,怎么保证线程安全?

sync.Map

你可能会突然想到,sync.Map 啊!

但它其实并不是我们想要的。抛开性能不谈,每次在更新数据库的时候,使用 sync.Map 还是要去分析两个 mapkey 的差异,然后去更新。完全不是我们需要的整体替换。

sync.RWMutex

然后我们又想到了 sync.RWMutex,在写的时候加写锁,读的时候加读锁,代码如下:

package maps

import "sync"

// ConcurrentMapOnlyReplace is a concurrent map that only supports Get specific key and Replace the entire map.
type ConcurrentMapOnlyReplace[K comparable, V any] interface {
	Get(key K) (value V, exist bool)
	Replace(newMap map[K]V)
	ToMap() map[K]V
}

// MapRWMutex is a thread-safe map, it can be read or replace entire map concurrently.
// Note: it doesn't support edit specific key's value
type MapRWMutex[K comparable, V any] struct {
	m map[K]V

	// We simply update the entire map each time,
	// instead of concurrently modifying the values within the map.
	// So we don't need to use sync.Map
	rw sync.RWMutex
}

func NewMapRWMutex[K comparable, V any]() *MapRWMutex[K, V] {
	return &MapRWMutex[K, V]{
		m: make(map[K]V),
	}
}

// Get returns the value and exist status for the given key.
func (m *MapRWMutex[K, V]) Get(key K) (value V, exist bool) {
	m.rw.RLock()
	defer m.rw.RUnlock()
	value, exist = m.m[key]
	return
}

// ToMap returns a copy of the data as a regular map.
func (m *MapRWMutex[K, V]) ToMap() map[K]V {
	m.rw.RLock()
	defer m.rw.RUnlock()
	newMap := make(map[K]V, len(m.m))
	for k, v := range m.m {
		newMap[k] = v
	}
	return newMap
}

// Replace replaces the entire map with a new map.
func (m *MapRWMutex[K, V]) Replace(newMap map[K]V) {
	m.rw.Lock()
	defer m.rw.Unlock()
	m.m = newMap
	return
}

var _ ConcurrentMapOnlyReplace[int, int] = (*MapRWMutex[int, int])(nil)
  1. 这里用到了泛型,需要 Go 1.18+。能够支持任意类型的 keyvalue
  2. 抽象出了一个 ConcurrentMapOnlyReplace 接口,方便后续扩展不同的实现。

使用方式大概是这样:

package oauth

import (
	"time"

	"xxx/maps"
	"xxx/models"
	"xxx/utils"
)

var ubr *UserBanReader

func init() {
	// hungry singleton
	ubr = NewUserBanReader()
}

type userId = string

type UserBanReader struct {
	bans maps.ConcurrentMapOnlyReplace[userId, struct{}]
	// db
}

func GetUserBanReader() *UserBanReader {
	return ubr
}

func NewUserBanReader() *UserBanReader {
	umr := &UserBanReader{
		bans: maps.NewMapRWMutex[userId, struct{}](),
	}
	umr.syncBanList()

	go utils.NewSimpleCronJob(1*time.Minute, umr.syncBanList)
	return umr
}

func (ubr *UserBanReader) syncBanList() {
	users := ubr.readBannedUsersFromDB()
	newBans := make(map[userId]struct{}, len(users))
	for _, user := range users {
		newBans[user.ID.Hex()] = struct{}{}
	}

	// update the entire map
	ubr.bans.Replace(newBans)

	return
}

func (ubr *UserBanReader) readBannedUsersFromDB() []models.User {
	// todo: read banned users from db
	// just user id is enough, no need to read the entire user info
	return nil
}

func (ubr *UserBanReader) IsBanned(userId userId) bool {
	_, banned := ubr.bans.Get(userId)
	return banned
}

可以看到,在读取数据库的过程中,是不用加锁的,只有在读取数据库结束,替换 map 的那一刻,才需要加锁。所以总体上,系统「卡顿」的时间几乎为零,没有毛刺。

其中,NewSimpleCronJob 是我写的一个简单的 cron job 实现,比 cronjob 库用起来更简单高效,详见附录。

Lock-Free Map 的实现

虽然我们已经用了 RWMutexMutex 好很多了。但我们的场景其实,读»写,而在每次读的时候,都需要有一次锁的开销,能否做到 Lock-Free 呢?

当然可以!我们可以用 atomic.Value 来实现。庆幸的是,atomic.Pointer 也支持泛型,我们可以实现一个满血版的 Lock-Free Map!(完全替换 map 场景)。

package maps

import "sync/atomic"

type MapLockFree[K comparable, V any] struct {
	m *atomic.Pointer[map[K]V]
}

func NewMapLockFree[K comparable, V any]() *MapLockFree[K, V] {
	mapLockFree := atomic.Pointer[map[K]V]{}
	return &MapLockFree[K, V]{
		m: &mapLockFree,
	}
}

// Get returns the value and exist status for the given key.
func (m *MapLockFree[K, V]) Get(key K) (value V, exist bool) {
	mm := m.m.Load()
	if mm == nil {
		return
	}

	value, exist = (*mm)[key]
	return
}

// ToMap returns a copy of the data as a regular map.
func (m *MapLockFree[K, V]) ToMap() map[K]V {
	mm := m.m.Load()
	if mm == nil {
		return nil
	}

	newMap := make(map[K]V, len(*mm))
	for k, v := range *mm {
		newMap[k] = v
	}
	return newMap
}

// Replace replaces the entire map with a new map.
func (m *MapLockFree[K, V]) Replace(newMap map[K]V) {
	m.m.Store(&newMap)
	return
}

var _ ConcurrentMapOnlyReplace[int, int] = (*MapLockFree[int, int])(nil)

测试!!!

虽然目测,Lock-Free Map 的性能会更好,但是写代码不能只靠感觉,真男人,就来 Benchmark 一下!

(其实还要先测一下两个 map 的正确性,相对简单,这里就不贴代码了)

我们再来测一下,不检验正确性下,两个 map 的性能:

func BenchmarkMapRWMutex_GetAndReplace(b *testing.B) {
	// Create a new MapRWMutex and populate it with initial data
	m := NewMapRWMutex[string, string]()
	benchmarkMapGetAndReplace(b, m)
}

func BenchmarkMapLockFree_GetAndReplace(b *testing.B) {
	// Create a new MapLockFree and populate it with initial data
	m := NewMapLockFree[string, string]()
	benchmarkMapGetAndReplace(b, m)
}

func benchmarkMapGetAndReplace(b *testing.B, m ConcurrentMapOnlyReplace[string, string]) {
	const (
		mapLength    = 10000
		testBatchNum = 10
	)

	keysBatch := make([][]string, testBatchNum)
	mapBatch := make([]map[string]string, testBatchNum)

	for i := 0; i < testBatchNum; i++ {
		keys, maps := generateKeyValues(mapLength)
		keysBatch[i] = keys
		mapBatch[i] = maps
	}
	b.ResetTimer()

	// Run concurrent Get and Replace operations
	b.RunParallel(func(pb *testing.PB) {
		i := 0
		m.Replace(mapBatch[0])
		for pb.Next() {
			batchCur := i % testBatchNum
			keys := keysBatch[batchCur]
			// Perform Get operation
			for _, key := range keys {
				_, _ = m.Get(key)
			}

			// Perform Replace operation
			m.Replace(mapBatch[batchCur])
			i++
		}
	})
}

// todo: use file to store the data to make sure the data is the same in the two benchmark
func generateKeyValues(length int) ([]string, map[string]string) {
	keys := make([]string, length)
	maps := make(map[string]string, length)
	for i := 0; i < length; i++ {
		key := uuid.New().String()
		value := uuid.New().String()
		keys[i] = key
		maps[key] = value
	}
	return keys, maps
}

可以看到:

  1. 抽象出了共用的接口后,测试两个不同的 Map 实现,就非常简单了
  2. 写了一个生成随机数据的函数,keyvalue 都是 uuid。这里其实最好把生成的数据存到文件里,这样测试更精准。不过我们只需要测数量级,粗糙点也没关系。
  3. 我们每次生成包含 10000 个 keymap,然后并发地去读取所有的 key,模拟极端的读取场景。
  4. 使用了 ResetTimer,规避了生成数据的时间。提前准备了 10 组测试数据,然后在 RunParallel 中,通过取余来循环使用。(没有使用随机数也是为了防止随机数生成的时间影响测试结果)
开始测试!
这里我懒得用命令行了,直接用 GoLandRun 按钮,很香。

结果如下:

  1. RW Mutex
BenchmarkMapRWMutex_GetAndReplace-8   	    1318	    918932 ns/op

在 1 秒内,可以完成 1318 次操作,每次操作耗时 918932 纳秒。

  1. Lock-Free
BenchmarkMapLockFree_GetAndReplace-8   	   15111	     75722 ns/op

在 1 秒内,可以完成 15111 次操作,每次操作耗时 75722 纳秒。

总体上,Lock-Free 的性能要好10倍以上!(如果数据量和测试方法不一样,可能会有差异,但差了一个数量级是肯定的)

碎碎念
其实这个 Benchmark 还是挺难写的,自己也算是第一次正式写 Benchmark 测试。结合 AI 去生成一些代码,效率是真的高。

附录 - SimpleCronJob

package utils

import (
	"context"
	"time"
)

// SimpleCronJob will run a job every duration
type SimpleCronJob struct {
	Job      []func()
	duration time.Duration
}

func NewSimpleCronJob(duration time.Duration, jobs ...func()) *SimpleCronJob {
	if duration <= 0 {
		duration = 5 * time.Minute
	}
	return &SimpleCronJob{
		Job:      jobs,
		duration: duration,
	}
}

func (scj *SimpleCronJob) Run(ctx context.Context) {
	if scj.Job == nil {
		return
	}
	ticker := time.NewTicker(scj.duration)
	for {
		select {
		case <-ticker.C:
			for _, job := range scj.Job {
				job()
			}
		case <-ctx.Done():
			ticker.Stop()
			return
		}
	}
}