Lab8是对锁相关的知识进行探讨。前置知识是xv6book的第六章以及3.5,8.1~8.3这些对物理内存分配和文件系统Buffer Cache相关内容。
本次实验有两个任务,并且任务间完全独立,可以乱序做。不过核心思想都是减小锁的粒度来减少竞争。
Memory allocator
第一个任务是解决内存块的竞争问题。目前的内存块的结构是利用一个大锁锁住一个 freelist 的方式。按照实验指导书所说,我们需要将 freelist 进行拆分,给每个 cpu 分配一个freelist,来降低竞争。
根据任务提示,我们可以利用以及定义的常量NCPU。
首先更改内存块的结构,拆成NCPU个小块:
// kernel/kalloc.c
struct {
struct spinlock lock;
struct run *freelist;
} kmem[NCPU];
然后修改kinit()
,kfree()
,注意在调用cpuid()
时,要调用push_off()
和pop_off()
来关闭中断。
// kernel/kalloc.c
void
kinit()
{
for(int i = 0; i < NCPU; i++)
initlock(&kmem[i].lock, "kmem");
freerange(end, (void*)PHYSTOP);
}
void
kfree(void *pa)
{
struct run *r;
if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)
panic("kfree");
// Fill with junk to catch dangling refs.
memset(pa, 1, PGSIZE);
r = (struct run*)pa;
push_off();
int id = cpuid();
pop_off();
acquire(&kmem[id].lock);
r->next = kmem[id].freelist;
kmem[id].freelist = r;
release(&kmem[id].lock);
}
最后是修改kalloc()
,根据实验要求,我们需要在一个cpu的空间不够的时候从另外一个cpu上偷一块来:
// kernel/kalloc.c
void *
kalloc(void)
{
struct run *r;
push_off();
int id = cpuid();
pop_off();
acquire(&kmem[id].lock);
r = kmem[id].freelist;
if(r)
kmem[id].freelist = r->next;
else{
for(int i = 0; i < NCPU; i++)
{
if(i == id) continue;
acquire(&kmem[i].lock);
r = kmem[i].freelist;
if(r)
kmem[i].freelist = r->next;
release(&kmem[i].lock);
if(r) break;
}
}
release(&kmem[id].lock);
if(r)
memset((char*)r, 5, PGSIZE); // fill with junk
return (void*)r;
}
这样任务一就完成了,难度不算很大。
Buffer cache
任务二的目的是解决 cache 缓存竞争问题。可以参考任务一,设计小粒度的锁。不过缓存区大小是固定的,不能随意更改。而且,题目要求使用 ticks 的方式代替现有的 LRU 机制。
根据任务提示,我们设定固定数量且为质数的桶来减少散列冲突,因为Hint里面提到了13,那么就设定为13个bucket。同时因为要用ticks的方式取代LRU,所以我们还需要获取 trap.c 中的 ticks 变量,并且在buf结构体中添加一个lastuse的字段。
// kernel/buf.h
struct buf {
int valid; // has data been read from disk?
int disk; // does disk "own" buf?
uint dev;
uint blockno;
struct sleeplock lock;
uint refcnt;
struct buf *prev; // LRU cache list
struct buf *next;
uchar data[BSIZE];
uint lastuse;
};
#define NBUCKET 13
// kernel/bio.c
extern uint ticks;
接下来是有关锁的设计,因为buf的大小是固定的,所以没法像任务一中直接用bcache数组,所以为了防止死锁我们还需要一把全局的大锁。
// kernel/bio.c
struct {
struct spinlock global_lock;
struct buf buf[NBUF];
// Linked list of all buffers, through prev/next.
// Sorted by how recently the buffer was used.
// head.next is most recent, head.prev is least.
struct buf head[NBUCKET];
struct spinlock lock[NBUCKET];
} bcache;
然后修改初始化环节:
// kernel/bio.c
void
binit(void)
{
struct buf *b;
initlock(&bcache.global_lock, "bcache_global");
for (int i = 0; i < NBUCKET; i++)
initlock(&bcache.lock[i], "bcache");
// Create linked list of buffers
for (int i = 0; i < NBUCKET; i++)
{
bcache.head[i].prev = &bcache.head[i];
bcache.head[i].next = &bcache.head[i];
}
// bcache.head.prev = &bcache.head;
// bcache.head.next = &bcache.head;
for(b = bcache.buf; b < bcache.buf+NBUF; b++){
b->next = bcache.head[0].next;
b->prev = &bcache.head[0];
initsleeplock(&b->lock, "buffer");
bcache.head[0].next->prev = b;
bcache.head[0].next = b;
}
}
因为我们用ticks代替了LRU,所以遇到空闲块的时候直接设置它的使用时间即可。以下是对一些相关函数的修改,都没什么难度。
// kernel/bio.c
void
brelse(struct buf *b)
{
if(!holdingsleep(&b->lock))
panic("brelse");
releasesleep(&b->lock);
acquire(&bcache.lock[b->blockno % NBUCKET]);
b->refcnt--;
if (b->refcnt == 0) {
// no one is waiting for it.
// b->next->prev = b->prev;
// b->prev->next = b->next;
// b->next = bcache.head.next;
// b->prev = &bcache.head;
// bcache.head.next->prev = b;
// bcache.head.next = b;
b->lastuse = ticks;
}
release(&bcache.lock[b->blockno % NBUCKET]);
}
void
bpin(struct buf *b) {
acquire(&bcache.lock[b->blockno % NBUCKET]);
b->refcnt++;
release(&bcache.lock[b->blockno % NBUCKET]);
}
void
bunpin(struct buf *b) {
acquire(&bcache.lock[b->blockno % NBUCKET]);
b->refcnt--;
release(&bcache.lock[b->blockno % NBUCKET]);
}
最后一步也是最关键的一部是修改bget,这里锁的获取和释放关系比较复杂,而且还涉及从别的cpu偷取,所以代码可能会比较繁杂,但总体的准则是:优先获取全局大锁,然后获取自己的小锁,锁用完要即时释放。
static struct buf*
bget(uint dev, uint blockno)
{
struct buf *b, *b2 = 0;
int i = blockno % NBUCKET, minticks = 0;
acquire(&bcache.lock[i]);
// Is the block already cached?
for(b = bcache.head[i].next; b != &bcache.head[i]; b = b->next){
if(b->dev == dev && b->blockno == blockno){
b->refcnt++;
release(&bcache.lock[i]);
acquiresleep(&b->lock);
return b;
}
}
release(&bcache.lock[i]);
// Not cached.
acquire(&bcache.global_lock);
acquire(&bcache.lock[i]);
for(b = bcache.head[i].next; b != &bcache.head[i]; b = b->next){
if(b->dev == dev && b->blockno == blockno){
b->refcnt++;
release(&bcache.lock[i]);
release(&bcache.global_lock);
acquiresleep(&b->lock);
return b;
}
}
// Recycle the least recently used (LRU) unused buffer.
for(b = bcache.head[i].next; b != &bcache.head[i]; b = b->next){
if(b->refcnt == 0) {
if(b2 == 0 || b->lastuse < minticks)
{
b2 = b;
minticks = b->lastuse;
}
}
}
if (b2) {
b2->dev = dev;
b2->blockno = blockno;
b2->refcnt++;
b2->valid = 0;
release(&bcache.lock[i]);
release(&bcache.global_lock);
acquiresleep(&b2->lock);
return b2;
}
//steal other block
for(int j = (i + 1) % NBUCKET; j != i; j = (j + 1) % NBUCKET){
acquire(&bcache.lock[j]);
for(b = bcache.head[j].next; b != &bcache.head[j]; b = b->next){
if(b->refcnt == 0) {
if(b2 == 0 || b->lastuse < minticks)
{
b2 = b;
minticks = b->lastuse;
}
}
}
if (b2) {
b2->dev = dev;
b2->blockno = blockno;
b2->refcnt++;
b2->valid = 0;
b2->next->prev = b2->prev;
b2->prev->next = b2->next;
release(&bcache.lock[j]);
b2->next = bcache.head[i].next;
b2->prev = &bcache.head[i];
bcache.head[i].next->prev = b2;
bcache.head[i].next = b2;
release(&bcache.lock[i]);
release(&bcache.global_lock);
acquiresleep(&b2->lock);
return b2;
}
release(&bcache.lock[j]);
}
release(&bcache.lock[i]);
release(&bcache.global_lock);
panic("bget: no buffers");
}
这样任务二也做完了。
总结
其实这个Lab的难度也不算很大,但是比较考验细心程度,因为锁的获取和释放哪怕一个不慎就有可能造成竞争或者死锁。