deque in Rust

std::collections::VecDeque

Rust 的 VecDeque 采取的是 ring buffer 的方式来实现的。我们首先看看它的结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/// A double-ended queue implemented with a growable ring buffer.
///
/// The "default" usage of this type as a queue is to use [`push_back`] to add to
/// the queue, and [`pop_front`] to remove from the queue. [`extend`] and [`append`]
/// push onto the back in this manner, and iterating over `VecDeque` goes front
/// to back.
///
/// [`push_back`]: #method.push_back
/// [`pop_front`]: #method.pop_front
/// [`extend`]: #method.extend
/// [`append`]: #method.append
#[stable(feature = "rust1", since = "1.0.0")]
pub struct VecDeque<T> {
// tail and head are pointers into the buffer. Tail always points
// to the first element that could be read, Head always points
// to where data should be written.
// If tail == head the buffer is empty. The length of the ringbuffer
// is defined as the distance between the two.
tail: usize,
head: usize,
buf: RawVec<T>,
}
  • RawVec 我们稍后去看,目前可以理解成一段分配的、连续的 Unininitialize Memory
1
2
3
4
5
6
#[allow(missing_debug_implementations)]
pub struct RawVec<T, A: Alloc = Global> {
ptr: Unique<T>,
cap: usize,
a: A,
}

他们具体结构可以看看我随手弄的图:

1
2
buf		tail	head buf(end)
| | | |

写入的时候向 head 写入,tail 是 可读的头(老实说我看代码之前总觉得是反过来的)。

new 的时候会 call with_capacity

1
2
3
4
5
6
7
8
9
10
11
12
#[stable(feature = "rust1", since = "1.0.0")]
pub fn with_capacity(capacity: usize) -> VecDeque<T> {
// +1 since the ringbuffer always leaves one space empty
let cap = cmp::max(capacity + 1, MINIMUM_CAPACITY + 1).next_power_of_two();
assert!(cap > capacity, "capacity overflow");

VecDeque {
tail: 0,
head: 0,
buf: RawVec::with_capacity(cap),
}
}

cap 会让大小成为 2 的倍数。因为 head == tail 是 empty 的表示,所以需要 capacity + 1

对于 push_back:

1
2
3
4
5
6
7
8
#[stable(feature = "rust1", since = "1.0.0")]
pub fn push_back(&mut self, value: T) {
self.grow_if_necessary();

let head = self.head;
self.head = self.wrap_add(self.head, 1);
unsafe { self.buffer_write(head, value) }
}

grow_if_necessary 比较重要,我们后面介绍

Index Wrap

wrap 其实想一想很清晰,这个不是直接去取地址,而是在 ring buffer 里面拿到一个相对的地址。

1
2
3
4
5
6
7

/// Returns the index in the underlying buffer for a given logical element
/// index + addend.
#[inline]
fn wrap_add(&self, idx: usize, addend: usize) -> usize {
wrap_index(idx.wrapping_add(addend), self.cap())
}

wrap_index:

1
2
3
4
5
6
7
/// Returns the index in the underlying buffer for a given logical element index.
#[inline]
fn wrap_index(index: usize, size: usize) -> usize {
// size is always a power of 2
debug_assert!(size.is_power_of_two());
index & (size - 1)
}

这个 size 其实是 RawVec 的 cap. 我们之前看到了 cap 会保证自己是 2 的倍数, 这里也有 debug_assert。所以 index & (size - 1) 相当于 index % size. 拿到这里对应的 index。迭代器、index 都是用这一套的。

所以之前的 push_back

  • 检查是否要 grow, 如果要的话肯定会先 grow, grow 方法一会儿介绍。
  • 拿到 old head (可写入的)
  • head 在 ringbuffer 里面前进
  • 写入 old head

然后可以看看 push_front , 很类似

1
2
3
4
5
6
7
8
9
10
#[stable(feature = "rust1", since = "1.0.0")]
pub fn push_front(&mut self, value: T) {
self.grow_if_necessary();

self.tail = self.wrap_sub(self.tail, 1);
let tail = self.tail;
unsafe {
self.buffer_write(tail, value);
}
}

不需要我过多解释,你肯定知道:

  • 检查是否要 grow, 如果要的话肯定会先 grow, grow 方法一会儿介绍。
  • 拿到 old tail
  • tail 在 ringbuffer 里面后退
  • 写入 old tail

Pop

1
2
3
4
5
6
7
8
9
10
#[stable(feature = "rust1", since = "1.0.0")]
pub fn pop_front(&mut self) -> Option<T> {
if self.is_empty() {
None
} else {
let tail = self.tail;
self.tail = self.wrap_add(self.tail, 1);
unsafe { Some(self.buffer_read(tail)) }
}
}

grow_if_neccessary

1
2
3
4
5
6
7
8
9
10
11
12
// This may panic or abort
#[inline]
fn grow_if_necessary(&mut self) {
if self.is_full() {
let old_cap = self.cap();
self.buf.double();
unsafe {
self.handle_capacity_increase(old_cap);
}
debug_assert!(!self.is_full());
}
}

double 是 raw_vec 的一个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#[inline(never)]
#[cold]
pub fn double(&mut self) {
unsafe {
let elem_size = mem::size_of::<T>();

// since we set the capacity to usize::MAX when elem_size is
// 0, getting to here necessarily means the RawVec is overfull.
assert!(elem_size != 0, "capacity overflow");

let (new_cap, uniq) = match self.current_layout() {
Some(cur) => {
// Since we guarantee that we never allocate more than
// isize::MAX bytes, `elem_size * self.cap <= isize::MAX` as
// a precondition, so this can't overflow. Additionally the
// alignment will never be too large as to "not be
// satisfiable", so `Layout::from_size_align` will always
// return `Some`.
//
// tl;dr; we bypass runtime checks due to dynamic assertions
// in this module, allowing us to use
// `from_size_align_unchecked`.
let new_cap = 2 * self.cap;
let new_size = new_cap * elem_size;
alloc_guard(new_size).unwrap_or_else(|_| capacity_overflow());
let ptr_res = self.a.realloc(NonNull::from(self.ptr).cast(),
cur,
new_size);
match ptr_res {
Ok(ptr) => (new_cap, ptr.cast().into()),
Err(_) => handle_alloc_error(
Layout::from_size_align_unchecked(new_size, cur.align())
),
}
}
None => {
// skip to 4 because tiny Vec's are dumb; but not if that
// would cause overflow
let new_cap = if elem_size > (!0) / 8 { 1 } else { 4 };
match self.a.alloc_array::<T>(new_cap) {
Ok(ptr) => (new_cap, ptr.into()),
Err(_) => handle_alloc_error(Layout::array::<T>(new_cap).unwrap()),
}
}
};
self.ptr = uniq;
self.cap = new_cap;
}
}

调用 realloc不改变现有布局的情况下 buffer 倍扩,所以接下来需要:

1
2
3
unsafe {
self.handle_capacity_increase(old_cap);
}

实现上代码写的很清晰:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/// Frobs the head and tail sections around to handle the fact that we
/// just reallocated. Unsafe because it trusts old_capacity.
#[inline]
unsafe fn handle_capacity_increase(&mut self, old_capacity: usize) {
let new_capacity = self.cap();

// Move the shortest contiguous section of the ring buffer
// T H
// [o o o o o o o . ]
// T H
// A [o o o o o o o . . . . . . . . . ]
// H T
// [o o . o o o o o ]
// T H
// B [. . . o o o o o o o . . . . . . ]
// H T
// [o o o o o . o o ]
// H T
// C [o o o o o . . . . . . . . . o o ]

if self.tail <= self.head {
// A
// Nop
} else if self.head < old_capacity - self.tail {
// B
self.copy_nonoverlapping(old_capacity, 0, self.head);
self.head += old_capacity;
debug_assert!(self.head > self.tail);
} else {
// C
let new_tail = new_capacity - (old_capacity - self.tail);
self.copy_nonoverlapping(new_tail, self.tail, old_capacity - self.tail);
self.tail = new_tail;
debug_assert!(self.head < self.tail);
}
debug_assert!(self.head < self.cap());
debug_assert!(self.tail < self.cap());
debug_assert!(self.cap().count_ones() == 1);
}

这个算法会保证尽量移动最少的元素。

  • A: 不需要调整. 刚好 tail 完全在开头,可以想象成只有 push_back。(front push pop 同样也有可能啦,准确的说)
  • B: self.head < old_capacity - self.tail 保证移动前段元素
  • C: 否则 移动后段元素。

Drop

1
2
3
4
5
6
7
8
9
10
11
12
#[stable(feature = "rust1", since = "1.0.0")]
unsafe impl<#[may_dangle] T> Drop for VecDeque<T> {
fn drop(&mut self) {
let (front, back) = self.as_mut_slices();
unsafe {
// use drop for [T]
ptr::drop_in_place(front);
ptr::drop_in_place(back);
}
// RawVec handles deallocation
}
}

这个 front back 是 [start, head) 到 [tail, end) 的 slice

1
2
3
4
5
6
7
8
9
10
#[inline]
#[stable(feature = "deque_extras_15", since = "1.5.0")]
pub fn as_mut_slices(&mut self) -> (&mut [T], &mut [T]) {
unsafe {
let head = self.head;
let tail = self.tail;
let buf = self.buffer_as_mut_slice();
RingSlices::ring_slices(buf, head, tail)
}
}

这个 ring_slice 实现是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/// Returns the two slices that cover the `VecDeque`'s valid range
trait RingSlices: Sized {
fn slice(self, from: usize, to: usize) -> Self;
fn split_at(self, i: usize) -> (Self, Self);

fn ring_slices(buf: Self, head: usize, tail: usize) -> (Self, Self) {
let contiguous = tail <= head;
if contiguous {
let (empty, buf) = buf.split_at(0);
(buf.slice(tail, head), empty)
} else {
let (mid, right) = buf.split_at(tail);
let (left, _) = mid.split_at(head);
(right, left)
}
}
}

这个 contiguous 类似我们之前扩容 的时候。

而内存的回收交给 RawVec.

性能

  • push_front push_back: 均摊复杂度和 Vecpush_back一样,不过可能扩容的时候需要 1.5 倍 size 的复制

下面这段从官方文档复制过来的

get(i) insert(i) remove(i) append split_off(i)
Vec O(1) O(n-i)* O(n-i) O(m)* O(n-i)
VecDeque O(1) O(min(i, n-i))* O(min(i, n-i)) O(m)* O(min(i, n-i))
LinkedList O(min(i, n-i)) O(min(i, n-i)) O(min(i, n-i)) O(1) O(min(i, n-i))

Note that where ties occur, Vec is generally going to be faster than VecDeque, and VecDeque is generally going to be faster than