之前闲来无事,打算尝试写一个简单的LSM存储引擎的模型。简单实现了一下SkipList和BloomFilter,BitSet之后,因为一些工作上紧急的需求以及下班后只想躺平摸鱼之类的原因,暂时搁置了下来。如今想起已经有些忘记了,于是姑且记录一下。

什么是SkipList

SkipList,跳表,是一种有序的数据结构,可以作为平衡树的一种替代。本质上是一种利用稀疏索引加速链表查询的一组数据+索引的结构。

平衡树一般指BST和红黑树等数据结构,这种数据结构解决了排序树的不平衡问题,但带来了旋转,变色等问题,在并发场景下锁的粒度很大,会一定程度上影响性能,并且实现比较复杂。相对而言,SkipList避免了这些问题。

实现

跳表一般基于有序链表实现。首先是链表的排序问题,对于链表的来说,排序的问题其实等价于怎么找到新增节点的在有序链表中插入位置

对于数组而言,只需要利用二分法查找到对应的位置,然后插入,并移动之后的元素,主要的开销在于拓展内存以及移动元素。

链表没法这么处理。链表的优势在于插入后无需移动后续元素,但无法跳跃查询,主要开销在于定位插入位置。

结合两者实际上就是跳表的基本思想:底层数据用有序链表维护,方便数据插入;在底层数据节点之上构建多层不同的稀疏索引(比如从上往下不断变密集),加速节点的查询,快速定位。

image-20200804113631837

比如底层索引6节点,第一层索引在中点处[ (0 + 6) / 2]添加索引,第一层索引将底层分成两个分区,第二层索引再在两个分区的中点添加索引,以此类推。

索引节点+数据节点就是跳表的核心,但这又有了另一个问题:怎么样便利的维护索引节点?

显然,将每层的分区的中点作为索引节点是不合适的,因为节点的增减是一种常见需求,每次数据节点的增减都会导致索引节点的变化,带来不少额外的开销。我们需要一种与数据节点数量无关的、确定索引节点位置的方法。

基本的思路就是使用随机化。在每次增加节点时确定是否需要此节点上建立索引节点。

比如对于一个最高$X$层的跳表,设底层$N$个数据节点。每次添加节点时,以$1/(2^n)$判断是否要在前$n$层添加索引节点(只有本层有索引节点才能往上层添加索引节点)。

对于第$K$层,将有$N / 2^k$个节点,与取分区中点作为索引节点等价。

数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
type SkipListNode struct {
        data *codec.Entry
        nextPtrs []*SkipListNode
}

type SkipList struct {
        header, tail *SkipListNode
        level        int
        size         int
        rwmtx        *sync.RWMutex
        maxSize      int
}

操作

值得关注的只有查询和插入节点两个操作。

查询

最核心的步骤是由上至下,利用每层的稀疏索引二分查找定位到需要的节点,或者位置。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// findPreNode find the node before node.key
func (sl *SkipList) findPreNode(key []byte) (*SkipListNode, bool) {
        // from top to bottom
        h := sl.header
        for i := sl.level - 1; i >= 0; i-- {
                for h.nextPtrs[i] != nil && bytes.Compare(h.nextPtrs[i].data.Key, key) != 1 {
                        if bytes.Equal(h.nextPtrs[i].data.Key, key) {
                                return h, true
                        }
                        h = h.nextPtrs[i]
                }
        }

        return nil, false
}

插入

首先要定位到插入的位置,插入,再随机化创建索引。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (sl *SkipList) randomLevel() int {
        ans := 1
        rand.Seed(time.Now().Unix())
        for rand.Intn(2) == 0 && ans <= defaultMaxLevel {
                ans++
        }
        return ans
}


func (sl *SkipList) Insert(data *codec.Entry) *SkipListNode {
        sl.rwmtx.Lock()
        defer sl.rwmtx.Unlock()

        h := sl.header
        updateNode := make([]*SkipListNode, defaultMaxLevel) // stores the node before newNode
        // search form the top level
        for i := sl.level - 1; i >= 0; i-- {
                // loop: 1. current nextPtrs is not empty && data is small than inserted one, curData < insertedData
                for h.nextPtrs[i] != nil && h.nextPtrs[i].data.Less(data) {
                        h = h.nextPtrs[i]
                }
                updateNode[i] = h
        }
        // choose level to insert
        lvl := sl.randomLevel()
        if lvl > sl.level {
                // Insert into higher level, we need to create header -> tail
                for i := sl.level; i < lvl; i++ {
                        updateNode[i] = sl.header
                }
                sl.level = lvl
        }
        // insert after updatedNote
        n := NewSkipListNode(lvl, data)
        for i := 0; i < lvl; i++ {
                n.nextPtrs[i] = updateNode[i].nextPtrs[i]
                updateNode[i].nextPtrs[i] = n
        }
        sl.size++
        return n
}

参考

跳跃列表(Skip List)与其在Redis中的实现详

Redis: zskiplist

wikipedia

spongecaptain’s blog