作者: songtianyi create@2021-01-28
这篇文章源于一个实际案例,文件的并发读写,比如如何提高日志文件的并发写效率。在这种有性能差的场景里,我们可以使用内存 buffer (队列)来做写入缓冲,先将内容写到内存,再将内存整个刷到磁盘。
那么队列的并发写入,自然需要加锁,问题是如何做到无锁?
CAS 全称 compare and swap, 它是一个原子操作, 伪代码如下:
function cas(p: pointer to int, old: int, new: int) is
if *p ≠ old
return false
*p ← new
return true
大致意思是,当我们要修改一个值当时候(将 old 修改为 new)先检查它的值是否等于 old, 如果相等就赋值为 new, 否则返回 false, 因为不相等说明该值已经被(其它线程)修改,这个时候就不能再贸然修改它。
CAS 是一个原子操作,CPU 只使用一条指令来表达和执行它,因此 CAS 是 lock free 的, 不需要加锁。
基于 CAS 我们可以实现无锁的并发写逻辑。
先看一个 Go 没有加锁的例子, 如下:
package main
import (
"fmt"
"sync"
)
func main() {
i := 0
wg := sync.WaitGroup{}
wg.Add(10)
for j := 0; j < 10; j++ {
go func() {
i++
wg.Done()
}()
}
wg.Wait()
fmt.Println(i)
}
并发修改 i, 多次执行的结果是不一致的,如下:
songtianyi-mb$ go run concurrency.go
10
songtianyi-mb$ go run concurrency.go
9
songtianyi-mb$ go run concurrency.go
10
songtianyi-mb$ go run concurrency.go
10
songtianyi-mb$ go run concurrency.go
10
songtianyi-mb$ go run concurrency.go
10
songtianyi-mb$ go run concurrency.go
9
songtianyi-mb$ go run concurrency.go
10
加上基于 CAS 的 atomic 再试试:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
i := uint32(0)
wg := sync.WaitGroup{}
wg.Add(10)
for j := 0; j < 10; j++ {
go func() {
atomic.AddUint32(&i, 1)
wg.Done()
}()
}
wg.Wait()
fmt.Println(i)
}
执行多次结果是稳定且正确的。
linux kernel 里就有队列的实现 kfifo, 不过使用的是自旋锁(spinlock). 感兴趣的可以读一读源码
自旋锁特点是,当线程试图获得锁但没拿到的时候,线程会忙等(一直循环执行一段 test-and-set 的原子逻辑),不会释放 CPU。所以,使用自旋锁的前提是,等待锁的时间足够短,否则会空耗 CPU,它的优点是不会休眠和唤醒的过程,避免了上下文切换的开销。
先用 C 实现一个简单的队列,
``` C
/* C */
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
struct Queue {
char *buf;
int size;
int out;
int in;
};
struct Queue *newQ(int size) {
struct Queue *q = malloc(sizeof(struct Queue));
q->size = size;
q->buf = malloc(sizeof(char) * size);
q->out = q->in = 0;
return q;
}
void enQ(struct Queue *q, char v) {
q->buf[q->in % q->size] = v;
q->in++;
}
char deQ(struct Queue *q) {
char x = q->buf[q->out % q->size];
q->out++;
return x;
}
bool emptyQ(struct Queue *q) {
if (q->out == q->in) {
q->out = q->in = 0;
return true;
}
return false;
}
void freeQ(struct Queue *q) {
free(q->buf);
free(q);
}
int main() {
struct Queue *q = newQ(3);
enQ(q, 'c');
enQ(q, 'b');
enQ(q, 'a');
printf("%c\n", deQ(q));
printf("%c\n", deQ(q));
printf("%c\n", deQ(q));
enQ(q, 'a');
enQ(q, 'b');
enQ(q, 'c');
printf("%c\n", deQ(q));
printf("%c\n", deQ(q));
printf("%c\n", deQ(q));
freeQ(q);
return 0;
}
当并发入队出队的时候必然会有问题,我们尝试使用 CAS 来对修改的操作加锁
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
struct Queue {
char *buf;
int size;
int out;
int in;
};
bool CAS(int *addr, int oldval, int newval) {
if (*addr != oldval) {
return false;
}
*addr = newval;
return true;
}
struct Queue *newQ(int size) {
struct Queue *q = malloc(sizeof(struct Queue));
q->size = size;
q->buf = malloc(sizeof(char) * size);
q->out = q->in = 0;
return q;
}
void enQ(struct Queue *q, char v) {
int i;
do {
i = q->in;
q->buf[i % q->size] = v;
} while (__sync_bool_compare_and_swap(&q->in, i, i + 1) == false);
}
char deQ(struct Queue *q) {
char x;
int i;
do {
x = q->buf[q->out % q->size];
i = q->out;
} while (__sync_bool_compare_and_swap(&q->out, i, i + 1) == false);
return x;
}
bool emptyQ(struct Queue *q) {
if (q->out == q->in) {
q->out = q->in = 0;
return true;
}
return false;
}
void freeQ(struct Queue *q) {
free(q->buf);
free(q);
}
int main() {
struct Queue *q = newQ(3);
enQ(q, 'c');
enQ(q, 'b');
enQ(q, 'a');
printf("%c\n", deQ(q));
printf("%c\n", deQ(q));
printf("%c\n", deQ(q));
enQ(q, 'a');
enQ(q, 'b');
enQ(q, 'c');
printf("%c\n", deQ(q));
printf("%c\n", deQ(q));
printf("%c\n", deQ(q));
freeQ(q);
return 0;
}
上述代码并没有严格测试和论证,只为了说明无锁队列的大致实现思路,感兴趣的可以阅读参考资料。