Go sync.Map


image

sync.map 底层使用 map[interface{}]*entry 来做存储,所以无论 key 还是 value 都是支持多种数据类型。

package main

import (
    "fmt"
    "sync"
)

type MySyncMap struct {
    sync.Map
}

func (m MySyncMap) Print(k interface{}) {
    value, ok := m.Load(k)
    fmt.Println(value, ok)
}

func main() {
    var syncMap MySyncMap
    syncMap.Print("Key1")
    syncMap.Store("Key1", "Value1")
    syncMap.Print("Key1")
    syncMap.Store("Key2", "Value2")
    syncMap.Store("Key3", 2)
    syncMap.Print("Key3")
    syncMap.Store(4, 4)
    syncMap.Print(4)
    syncMap.Delete("Key1")
    syncMap.Print("Key1")
}
<nil> false
Value1 true
2 true
4 true
<nil> false

并发 hashmap 的方案有很多。

  1. 直接在不支持并发的 hashmap 上,使用一个读写锁的保护,这也是 sync map 还没出来前,常用的方法。这种方法的缺点是写会堵塞读。
  2. 分段锁,每一个读写锁保护一段区间。平均情况下这样的性能还挺好的,但是极端情况下,如果某个区间有热点写,那么那个区间的读请求也会受到影响。
  3. 使用使用链表法解决冲突,然后链表使用 CAS 去解决并发下冲突。

An overview of sync.Map 中有提到,在 cpu 核数很多的情况下,因为 cache contention,reflect.New、sync.RWMutex、atomic.AddUint32 都会很慢。

读写分离

使用了两个 map,一个叫 read,一个叫 dirty,两个 map 存储的都是指针,指向 value 数据本身,所以两个 map 是共享 value 数据的,更新 value 对两个 map 同时可见。

dirty 可以进行增删查,要进行加互斥锁。

read 中存在的 key,可以无锁的读,借助 CAS 进行无锁的更新、删除操作,但是不能新增 key,相当于 dirty 的一个 cache,由于 value 共享,所以能通过 read 对已存在的 value 进行更新。

sync.Map 中会记录 miss cache 的次数,当 miss 次数大于等于 dirty 元素个数时,就会把 dirty 变成 read,原来的 dirty 清空。

为了方便 dirty 直接变成 read,那么得保证 read 中存在的数据 dirty 必须有,所以在 dirty 是空的时候,如果要新增一个 key,那么会把 read 中的元素复制到 dirty 中,然后写入新 key。

删除操作使用的是延迟删除,优先看 read 中没有,read 中有,就把 read 中的对应 entry 指针中的 p 置为 nil,作为一个标记。在 read 中标记为 nil 的,只有在 dirty 提升为 read 时才会被实际删除。

// The zero Map is empty and ready for use. A Map must not be copied after first use.
type Map struct {
    mu Mutex

    // read contains the portion of the map's contents that are safe for
    // concurrent access (with or without mu held).
    //
    // The read field itself is always safe to load, but must only be stored with
    // mu held.
    //
    // Entries stored in read may be updated concurrently without mu, but updating
    // a previously-expunged entry requires that the entry be copied to the dirty
    // map and unexpunged with mu held.
    read atomic.Value // readOnly

    // dirty contains the portion of the map's contents that require mu to be
    // held. To ensure that the dirty map can be promoted to the read map quickly,
    // it also includes all of the non-expunged entries in the read map.
    //
    // Expunged entries are not stored in the dirty map. An expunged entry in the
    // clean map must be unexpunged and added to the dirty map before a new value
    // can be stored to it.
    //
    // If the dirty map is nil, the next write to the map will initialize it by
    // making a shallow copy of the clean map, omitting stale entries.
    dirty map[interface{}]*entry

    // misses counts the number of loads since the read map was last updated that
    // needed to lock mu to determine whether the key was present.
    //
    // Once enough misses have occurred to cover the cost of copying the dirty
    // map, the dirty map will be promoted to the read map (in the unamended
    // state) and the next store to the map will make a new dirty copy.
    misses int
}

// read的实际结构体
// readOnly is an immutable struct stored atomically in the Map.read field.
type readOnly struct {
    m       map[interface{}]*entry
    amended bool // true if the dirty map contains some key not in m.
}

// expunged is an arbitrary pointer that marks entries which have been deleted
// from the dirty map.
var expunged = unsafe.Pointer(new(interface{}))

// An entry is a slot in the map corresponding to a particular key.
type entry struct {
    // p points to the interface{} value stored for the entry.
    //
    // If p == nil, the entry has been deleted and m.dirty == nil.
    //
    // If p == expunged, the entry has been deleted, m.dirty != nil, and the entry
    // is missing from m.dirty.
    //
    // Otherwise, the entry is valid and recorded in m.read.m[key] and, if m.dirty
    // != nil, in m.dirty[key].
    //
    // An entry can be deleted by atomic replacement with nil: when m.dirty is
    // next created, it will atomically replace nil with expunged and leave
    // m.dirty[key] unset.
    //
    // An entry's associated value can be updated by atomic replacement, provided
    // p != expunged. If p == expunged, an entry's associated value can be updated
    // only after first setting m.dirty[key] = e so that lookups using the dirty
    // map find the entry.
    p unsafe.Pointer // *interface{}
}

image

mu 是用来保护 dirty 的互斥锁。

missed 是记录没命中read的次数。

注意对于 entry.p,有两个特殊值,一个是 nil,另一个是 expunged。

nil 代表的意思是,在read中被删除了,但是dirty中还在,所以能直接更新值。如果 dirty==nill 的特殊情况,下次写入新值时会复制。

expunged 代表数据在 ditry 中已经被删除了,更新值的时候要先把这个 entry 复制到 dirty。

Load 读取

// Load returns the value stored in the map for a key, or nil if no
// value is present.
// The ok result indicates whether value was found in the map.
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    if !ok && read.amended {
        m.mu.Lock()
        // Avoid reporting a spurious miss if m.dirty got promoted while we were
        // blocked on m.mu. (If further loads of the same key will not miss, it's
        // not worth copying the dirty map for this key.)
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            e, ok = m.dirty[key]
            // Regardless of whether the entry was present, record a miss: this key
            // will take the slow path until the dirty map is promoted to the read
            // map.
            m.missLocked()
        }
        m.mu.Unlock()
    }
    if !ok {
        return nil, false
    }
    return e.load()
}

func (e *entry) load() (value interface{}, ok bool) {
    p := atomic.LoadPointer(&e.p)
    if p == nil || p == expunged {
        return nil, false
    }
    return *(*interface{})(p), true
}

func (m *Map) missLocked() {
    m.misses++
    if m.misses < len(m.dirty) {
        return
    }
    m.read.Store(readOnly{m: m.dirty})
    m.dirty = nil
    m.misses = 0
}

读取时,先去 read 读取。如果没有,就加锁,然后去 dirty 读取,同时调用 missLocked(),再解锁。在m issLocked 中,会递增 misses 变量,如果 misses>len(dirty),那么把 dirty 提升为 read,清空原来的 dirty。

在代码中,我们可以看到一个 double check,检查 read 没有,上锁,再检查 read 中有没有,是因为有可能在第一次检查之后,上锁之前的间隙,dirty 提升为 read 了,这时如果不 double check,可能会导致一个存在的 key 却返回给调用方说不存在。

Store 写入

// Store sets the value for a key.
func (m *Map) Store(key, value interface{}) {
    read, _ := m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok && e.tryStore(&value) {
        return
    }

    m.mu.Lock()
    read, _ = m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok {
        if e.unexpungeLocked() {
            // The entry was previously expunged, which implies that there is a
            // non-nil dirty map and this entry is not in it.
            m.dirty[key] = e
        }
        e.storeLocked(&value)
    } else if e, ok := m.dirty[key]; ok {
        e.storeLocked(&value)
    } else {
        if !read.amended {
            // We're adding the first new key to the dirty map.
            // Make sure it is allocated and mark the read-only map as incomplete.
            m.dirtyLocked()
            m.read.Store(readOnly{m: read.m, amended: true})
        }
        m.dirty[key] = newEntry(value)
    }
    m.mu.Unlock()
}

// tryStore stores a value if the entry has not been expunged.
//
// If the entry is expunged, tryStore returns false and leaves the entry
// unchanged.
func (e *entry) tryStore(i *interface{}) bool {
    p := atomic.LoadPointer(&e.p)
    if p == expunged {
        return false
    }
    for {
        if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
            return true
        }
        p = atomic.LoadPointer(&e.p)
        if p == expunged {
            return false
        }
    }
}

func (m *Map) dirtyLocked() {
    if m.dirty != nil {
        return
    }

    read, _ := m.read.Load().(readOnly)
    m.dirty = make(map[interface{}]*entry, len(read.m))
    for k, e := range read.m {
        if !e.tryExpungeLocked() {
            m.dirty[k] = e
        }
    }
}

func (e *entry) tryExpungeLocked() (isExpunged bool) {
    p := atomic.LoadPointer(&e.p)
    for p == nil {
        if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
            return true
        }
        p = atomic.LoadPointer(&e.p)
    }
    return p == expunged
}

// unexpungeLocked ensures that the entry is not marked as expunged.
//
// If the entry was previously expunged, it must be added to the dirty map
// before m.mu is unlocked.
func (e *entry) unexpungeLocked() (wasExpunged bool) {
    return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}

写入的时候,先看 read 中能否查到 key,在 read 中存在的话,直接通过 read 中的 entry 来更新值。在 read 中不存在,那么就上锁,然后 double check。

double check 情况:

  1. double check 发现 read 中存在,如果是 expunged,那么就先尝试把 expunged 替换成 nil,最后如果 entry.p==expunged 就复制到 dirty 中,再写入值。否则不用替换直接写入值。
  2. dirty 中存在,直接更新。dirty 中不存在,如果此时 dirty 为空,那么需要将 read 复制到 dirty 中,最后再把新值写入到 dirty 中。复制的时候调用的是 dirtyLocked(),在复制到 dirty 的时候,read 中为 nil 的元素,会更新为 expunged,并且不复制到 dirty 中。

在更新 read 中的数据时,使用的是 tryStore,通过 CAS 来解决冲突,在 CAS 出现冲突后,如果发现数据被置为 expung,tryStore 那么就不会写入数据,而是会返回 false,在 Store 流程中,就是接着往下走,在 dirty 中写入。

double check 的时候,在 read 中存在,那么就是说在加锁之前,有并发线程先写入了 key,然后由 Load 触发了 dirty 提升为 read,这时 dirty 可能为空,也可能不为空,但无论 dirty 状态如何,都是可以直接更新 entry.p。如果是 expunged 的话,那么要先替换成 nil,再复制 entry 到 dirty 中。

Delete 删除

// Delete deletes the value for a key.
func (m *Map) Delete(key interface{}) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    if !ok && read.amended {
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            delete(m.dirty, key)
        }
        m.mu.Unlock()
    }
    if ok {
        e.delete()
    }
}

func (e *entry) delete() (hadValue bool) {
    for {
        p := atomic.LoadPointer(&e.p)
        if p == nil || p == expunged {
            return false
        }
        if atomic.CompareAndSwapPointer(&e.p, p, nil) {
            return true
        }
    }
}

删除很简单,read 中存在,就把 read 中的 entry.p 置为 nil,如果只在 ditry 中存在,那么就直接从 dirty 中删掉对应的 entry。

分享:

评论