博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
自己动手实现自旋锁(spinlock)
阅读量:5741 次
发布时间:2019-06-18

本文共 15789 字,大约阅读时间需要 52 分钟。

大多数的并行程序都需要在底层使用锁机制进行同步,简单来讲,锁无非是一套简单的原语,它们保证程序(或进程)对某一资源的互斥访问来维持数据的一致性,如果没有锁机制作为保证,多个线程可能同时访问某一资源,假设没有精心设计的(很复杂)无锁算法保证程序正确执行,那么后果往往非常严重的。无锁算法难于使用,所以一般而言都使用锁来保证程序的一致性。

如果更新某一数据结构的操作比较缓慢,那么互斥的锁是一个比较好的选择,此时如果某一进程或线程被阻塞,操作系统会重新接管控制权,并调度其他进程(或线程)继续执行,原先被阻塞的进程处于睡眠状态。控制权的转换伴随着进程上下文的切换,而这往往是一个昂贵而耗时的操作,所以对于等待锁的时间比较短,那么应该使用其他更高效的方法。


自旋锁(spinlock)

自旋锁(Spinlock)是一种常用的互斥(Mutual Exclusion)同步原语(Synchronization Primitive),试图进入临界区(Critical Section)的线程使用忙等待(Busy Waiting)的方式检测锁的状态,若锁未被持有则尝试获取。与其他锁不同,自旋锁仅仅只是“自旋”,即不停地检查某一锁是否已经被解开,自旋锁是非常快的,所以加锁-解锁操作耗时很短,然而,自旋锁也不是万精油,当因互斥导致进程睡眠的时间很长时,使用自旋锁是不明智的选择。

下面我们考虑实现自己的自旋锁,首先我们需要一些原语,幸好GCC已经为我们提供了一些内置函数,

#define atomic_xadd(P, V) __sync_fetch_and_add((P), (V)) #define cmpxchg(P, O, N) __sync_val_compare_and_swap((P), (O), (N)) #define atomic_inc(P) __sync_add_and_fetch((P), 1) #define atomic_dec(P) __sync_add_and_fetch((P), -1) #define atomic_add(P, V) __sync_add_and_fetch((P), (V)) #define atomic_set_bit(P, V) __sync_or_and_fetch((P), 1<<(V)) #define atomic_clear_bit(P, V) __sync_and_and_fetch((P), ~(1<<(V)))

然而,我们也需要自己实现其他的几个原子操作,如下:

/* Compile read-write barrier */ #define barrier() asm volatile("": : :"memory") /* Pause instruction to prevent excess processor bus usage */ #define cpu_relax() asm volatile("pause\n": : :"memory") /* Atomic exchange (of various sizes) */ static inline void *xchg_64(void *ptr, void *x) {
__asm__ __volatile__("xchgq %0,%1" :"=r" ((unsigned long long) x) :"m" (*(volatile long long *)ptr), "0" ((unsigned long long) x) :"memory"); return x; } static inline unsigned xchg_32(void *ptr, unsigned x) {
__asm__ __volatile__("xchgl %0,%1" :"=r" ((unsigned) x) :"m" (*(volatile unsigned *)ptr), "0" (x) :"memory"); return x; } static inline unsigned short xchg_16(void *ptr, unsigned short x) {
__asm__ __volatile__("xchgw %0,%1" :"=r" ((unsigned short) x) :"m" (*(volatile unsigned short *)ptr), "0" (x) :"memory"); return x; } /* Test and set a bit */ static inline char atomic_bitsetandtest(void *ptr, int x) {
char out; __asm__ __volatile__("lock; bts %2,%1\n" "sbb %0,%0\n" :"=r" (out), "=m" (*(volatile long long *)ptr) :"Ir" (x) :"memory"); return out; }

自旋锁可以使用交换原语实现,如下:

#define EBUSY 1 typedef unsigned spinlock; static void spin_lock(spinlock *lock) {
while (1) {
if (!xchg_32(lock, EBUSY)) return; while (*lock) cpu_relax(); } } static void spin_unlock(spinlock *lock) {
barrier(); *lock = 0; } static int spin_trylock(spinlock *lock) {
return xchg_32(lock, EBUSY); }

上面的自旋锁已经能够工作,但是也会产生问题,因为多个线程可能产生竞争,因为在锁释放的时候其他的每个线程都想获得锁。这会导致处理器总线的负载增大,从而使性能降低,所以接下来我们将实现另外一种自旋锁,该自旋锁能够感知下一个获得锁的进程或线程,因此能够大大减轻处理器总线负载。

下面我们介绍另外一种自旋锁,MCS自旋锁,该锁使用链表维护申请者的请求序列,

typedef struct mcs_lock_t mcs_lock_t; struct mcs_lock_t {
mcs_lock_t *next; int spin; }; typedef struct mcs_lock_t *mcs_lock; static void lock_mcs(mcs_lock *m, mcs_lock_t *me) {
mcs_lock_t *tail; me->next = NULL; me->spin = 0; tail = xchg_64(m, me); /* No one there? */ if (!tail) return; /* Someone there, need to link in */ tail->next = me; /* Make sure we do the above setting of next. */ barrier(); /* Spin on my spin variable */ while (!me->spin) cpu_relax(); return; } static void unlock_mcs(mcs_lock *m, mcs_lock_t *me) {
/* No successor yet? */ if (!me->next) {
/* Try to atomically unlock */ if (cmpxchg(m, me, NULL) == me) return; /* Wait for successor to appear */ while (!me->next) cpu_relax(); } /* Unlock next one */ me->next->spin = 1; } static int trylock_mcs(mcs_lock *m, mcs_lock_t *me) {
mcs_lock_t *tail; me->next = NULL; me->spin = 0; /* Try to lock */ tail = cmpxchg(m, NULL, &me); /* No one was there - can quickly return */ if (!tail) return 0; return EBUSY; }

当然,MCS锁也是有问题的,因为它的API除了需要传递锁的地址外,还需要传递另外一个结构,下面介绍另外一种自旋锁算法,K42锁算法,

typedef struct k42lock k42lock; struct k42lock {
k42lock *next; k42lock *tail; }; static void k42_lock(k42lock *l) {
k42lock me; k42lock *pred, *succ; me.next = NULL; barrier(); pred = xchg_64(&l->tail, &me); if (pred) {
me.tail = (void *) 1; barrier(); pred->next = &me; barrier(); while (me.tail) cpu_relax(); } succ = me.next; if (!succ) {
barrier(); l->next = NULL; if (cmpxchg(&l->tail, &me, &l->next) != &me) {
while (!me.next) cpu_relax(); l->next = me.next; } } else {
l->next = succ; } } static void k42_unlock(k42lock *l) {
k42lock *succ = l->next; barrier(); if (!succ) {
if (cmpxchg(&l->tail, &l->next, NULL) == (void *) &l->next) return; while (!l->next) cpu_relax(); succ = l->next; } succ->tail = NULL; } static int k42_trylock(k42lock *l) {
if (!cmpxchg(&l->tail, NULL, &l->next)) return 0; return EBUSY; }

K42和MCS锁都需要遍历链表才能找到下一个最可能获得锁的进程(或线程),有时查找可能比较费时,所以我们再次改进后:

typedef struct listlock_t listlock_t; struct listlock_t {
listlock_t *next; int spin; }; typedef struct listlock_t *listlock; #define LLOCK_FLAG (void *)1 static void listlock_lock(listlock *l) {
listlock_t me; listlock_t *tail; /* Fast path - no users */ if (!cmpxchg(l, NULL, LLOCK_FLAG)) return; me.next = LLOCK_FLAG; me.spin = 0; /* Convert into a wait list */ tail = xchg_64(l, &me); if (tail) {
/* Add myself to the list of waiters */ if (tail == LLOCK_FLAG) tail = NULL; me.next = tail; /* Wait for being able to go */ while (!me.spin) cpu_relax(); return; } /* Try to convert to an exclusive lock */ if (cmpxchg(l, &me, LLOCK_FLAG) == &me) return; /* Failed - there is now a wait list */ tail = *l; /* Scan to find who is after me */ while (1) {
/* Wait for them to enter their next link */ while (tail->next == LLOCK_FLAG) cpu_relax(); if (tail->next == &me) {
/* Fix their next pointer */ tail->next = NULL; return; } tail = tail->next; } } static void listlock_unlock(listlock *l) {
listlock_t *tail; listlock_t *tp; while (1) {
tail = *l; barrier(); /* Fast path */ if (tail == LLOCK_FLAG) {
if (cmpxchg(l, LLOCK_FLAG, NULL) == LLOCK_FLAG) return; continue; } tp = NULL; /* Wait for partially added waiter */ while (tail->next == LLOCK_FLAG) cpu_relax(); /* There is a wait list */ if (tail->next) break; /* Try to convert to a single-waiter lock */ if (cmpxchg(l, tail, LLOCK_FLAG) == tail) {
/* Unlock */ tail->spin = 1; return; } cpu_relax(); } /* A long list */ tp = tail; tail = tail->next; /* Scan wait list */ while (1) {
/* Wait for partially added waiter */ while (tail->next == LLOCK_FLAG) cpu_relax(); if (!tail->next) break; tp = tail; tail = tail->next; } tp->next = NULL; barrier(); /* Unlock */ tail->spin = 1; } static int listlock_trylock(listlock *l) {
/* Simple part of a spin-lock */ if (!cmpxchg(l, NULL, LLOCK_FLAG)) return 0; /* Failure! */ return EBUSY;

等等,还可以改进,可以在自旋锁里面嵌套一层自旋锁,

typedef struct bitlistlock_t bitlistlock_t; struct bitlistlock_t {
bitlistlock_t *next; int spin; }; typedef bitlistlock_t *bitlistlock; #define BLL_USED ((bitlistlock_t *) -2LL) static void bitlistlock_lock(bitlistlock *l) {
bitlistlock_t me; bitlistlock_t *tail; /* Grab control of list */ while (atomic_bitsetandtest(l, 0)) cpu_relax(); /* Remove locked bit */ tail = (bitlistlock_t *) ((uintptr_t) *l & ~1LL); /* Fast path, no waiters */ if (!tail) {
/* Set to be a flag value */ *l = BLL_USED; return; } if (tail == BLL_USED) tail = NULL; me.next = tail; me.spin = 0; barrier(); /* Unlock, and add myself to the wait list */ *l = &me; /* Wait for the go-ahead */ while (!me.spin) cpu_relax(); } static void bitlistlock_unlock(bitlistlock *l) {
bitlistlock_t *tail; bitlistlock_t *tp; /* Fast path - no wait list */ if (cmpxchg(l, BLL_USED, NULL) == BLL_USED) return; /* Grab control of list */ while (atomic_bitsetandtest(l, 0)) cpu_relax(); tp = *l; barrier(); /* Get end of list */ tail = (bitlistlock_t *) ((uintptr_t) tp & ~1LL); /* Actually no users? */ if (tail == BLL_USED) {
barrier(); *l = NULL; return; } /* Only one entry on wait list? */ if (!tail->next) {
barrier(); /* Unlock bitlock */ *l = BLL_USED; barrier(); /* Unlock lock */ tail->spin = 1; return; } barrier(); /* Unlock bitlock */ *l = tail; barrier(); /* Scan wait list for start */ do {
tp = tail; tail = tail->next; } while (tail->next); tp->next = NULL; barrier(); /* Unlock */ tail->spin = 1; } static int bitlistlock_trylock(bitlistlock *l) {
if (!*l && (cmpxchg(l, NULL, BLL_USED) == NULL)) return 0; return EBUSY; }

还可以再次改进,如下

/* Bit-lock for editing the wait block */ #define SLOCK_LOCK                 1 #define SLOCK_LOCK_BIT            0 /* Has an active user */ #define SLOCK_USED                2 #define SLOCK_BITS                3 typedef struct slock slock; struct slock {
uintptr_t p; }; typedef struct slock_wb slock_wb; struct slock_wb {
/* * last points to the last wait block in the chain. * The value is only valid when read from the first wait block. */ slock_wb *last; /* next points to the next wait block in the chain. */ slock_wb *next; /* Wake up? */ int wake; }; /* Wait for control of wait block */ static slock_wb *slockwb(slock *s) {
uintptr_t p; /* Spin on the wait block bit lock */ while (atomic_bitsetandtest(&s->p, SLOCK_LOCK_BIT)) {
cpu_relax(); } p = s->p; if (p <= SLOCK_BITS) {
/* Oops, looks like the wait block was removed. */ atomic_dec(&s->p); return NULL; } return (slock_wb *)(p - SLOCK_LOCK); } static void slock_lock(slock *s) {
slock_wb swblock; /* Fastpath - no other readers or writers */ if (!s->p && (cmpxchg(&s->p, 0, SLOCK_USED) == 0)) return; /* Initialize wait block */ swblock.next = NULL; swblock.last = &swblock; swblock.wake = 0; while (1) {
uintptr_t p = s->p; cpu_relax(); /* Fastpath - no other readers or writers */ if (!p) {
if (cmpxchg(&s->p, 0, SLOCK_USED) == 0) return; continue; } if (p > SLOCK_BITS) {
slock_wb *first_wb, *last; first_wb = slockwb(s); if (!first_wb) continue; last = first_wb->last; last->next = &swblock; first_wb->last = &swblock; /* Unlock */ barrier(); s->p &= ~SLOCK_LOCK; break; } /* Try to add the first wait block */ if (cmpxchg(&s->p, p, (uintptr_t)&swblock) == p) break; } /* Wait to acquire exclusive lock */ while (!swblock.wake) cpu_relax(); } static void slock_unlock(slock *s) {
slock_wb *next; slock_wb *wb; uintptr_t np; while (1) {
uintptr_t p = s->p; /* This is the fast path, we can simply clear the SRWLOCK_USED bit. */ if (p == SLOCK_USED) {
if (cmpxchg(&s->p, SLOCK_USED, 0) == SLOCK_USED) return; continue; } /* There's a wait block, we need to wake the next pending user */ wb = slockwb(s); if (wb) break; cpu_relax(); } next = wb->next; if (next) {
/* * There's more blocks chained, we need to update the pointers * in the next wait block and update the wait block pointer. */ np = (uintptr_t) next; next->last = wb->last; } else {
/* Convert the lock to a simple lock. */ np = SLOCK_USED; } barrier(); /* Also unlocks lock bit */ s->p = np; barrier(); /* Notify the next waiter */ wb->wake = 1; /* We released the lock */ } static int slock_trylock(slock *s) {
/* No other readers or writers? */ if (!s->p && (cmpxchg(&s->p, 0, SLOCK_USED) == 0)) return 0; return EBUSY; }

下面是另外一种实现方式,称为stack-lock算法,

typedef struct stlock_t stlock_t; struct stlock_t {
stlock_t *next; }; typedef struct stlock_t *stlock; static __attribute__((noinline)) void stlock_lock(stlock *l) {
stlock_t *me = NULL; barrier(); me = xchg_64(l, &me); /* Wait until we get the lock */ while (me) cpu_relax(); } #define MAX_STACK_SIZE (1<<12) static __attribute__((noinline)) int on_stack(void *p) {
int x; uintptr_t u = (uintptr_t) &x; return ((u - (uintptr_t)p + MAX_STACK_SIZE) < MAX_STACK_SIZE * 2); } static __attribute__((noinline)) void stlock_unlock(stlock *l) {
stlock_t *tail = *l; barrier(); /* Fast case */ if (on_stack(tail)) {
/* Try to remove the wait list */ if (cmpxchg(l, tail, NULL) == tail) return; tail = *l; } /* Scan wait list */ while (1) {
/* Wait for partially added waiter */ while (!tail->next) cpu_relax(); if (on_stack(tail->next)) break; tail = tail->next; } barrier(); /* Unlock */ tail->next = NULL; } static int stlock_trylock(stlock *l) {
stlock_t me; if (!cmpxchg(l, NULL, &me)) return 0; return EBUSY; }

改进后变成,

typedef struct plock_t plock_t; struct plock_t {
plock_t *next; }; typedef struct plock plock; struct plock {
plock_t *next; plock_t *prev; plock_t *last; }; static void plock_lock(plock *l) {
plock_t *me = NULL; plock_t *prev; barrier(); me = xchg_64(l, &me); prev = NULL; /* Wait until we get the lock */ while (me) {
/* Scan wait list for my previous */ if (l->next != (plock_t *) &me) {
plock_t *t = l->next; while (me) {
if (t->next == (plock_t *) &me) {
prev = t; while (me) cpu_relax(); goto done; } if (t->next) t = t->next; cpu_relax(); } } cpu_relax(); } done: l->prev = prev; l->last = (plock_t *) &me; } static void plock_unlock(plock *l) {
plock_t *tail; /* Do I know my previous? */ if (l->prev) {
/* Unlock */ l->prev->next = NULL; return; } tail = l->next; barrier(); /* Fast case */ if (tail == l->last) {
/* Try to remove the wait list */ if (cmpxchg(&l->next, tail, NULL) == tail) return; tail = l->next; } /* Scan wait list */ while (1) {
/* Wait for partially added waiter */ while (!tail->next) cpu_relax(); if (tail->next == l->last) break; tail = tail->next; } barrier(); /* Unlock */ tail->next = NULL; } static int plock_trylock(plock *l) {
plock_t me; if (!cmpxchg(&l->next, NULL, &me)) {
l->last = &me; return 0; } return EBUSY; }

下面介绍另外一种算法,ticket lock算法,实际上,Linux内核正是采用了该算法,不过考虑到执行效率,人家是以汇编形式写的,

typedef union ticketlock ticketlock; union ticketlock {
unsigned u; struct {
unsigned short ticket; unsigned short users; } s; }; static void ticket_lock(ticketlock *t) {
unsigned short me = atomic_xadd(&t->s.users, 1); while (t->s.ticket != me) cpu_relax(); } static void ticket_unlock(ticketlock *t) {
barrier(); t->s.ticket++; } static int ticket_trylock(ticketlock *t) {
unsigned short me = t->s.users; unsigned short menew = me + 1; unsigned cmp = ((unsigned) me << 16) + me; unsigned cmpnew = ((unsigned) menew << 16) + me; if (cmpxchg(&t->u, cmp, cmpnew) == cmp) return 0; return EBUSY; } static int ticket_lockable(ticketlock *t) {
ticketlock u = *t; barrier(); return (u.s.ticket == u.s.users); }

至此,自旋锁各种不同的实现介绍完毕,亲,你明白了吗?:)

(全文完)

转载于:https://www.cnblogs.com/haippy/archive/2011/12/17/2290773.html

你可能感兴趣的文章
[OC]-RunTime学习路线
查看>>
子类与父类变量与方法的测试
查看>>
幸运大转盘-jQuery+PHP实现的抽奖程序
查看>>
关于android应用内存占用查看及优化
查看>>
web 应用 使用 shiro 安全框架 在 was7访问性能慢
查看>>
分辨率ppi与dpi
查看>>
2014京东校园招聘-软件开发笔试题
查看>>
Postgres-X2部署步骤
查看>>
MODIS批量处理软件MRT的安装说明
查看>>
Python网络通信TCP客户端
查看>>
Redis笔记系列(一)——Redis精要介绍与适用场景
查看>>
java 连接MySql数据库
查看>>
架构设计:一种远程调用服务的设计构思(zookeeper的一种应用实践)
查看>>
Linux 系统病毒入侵分析
查看>>
文件操作(File)
查看>>
MySQL 常用函数详解
查看>>
Tomcat部署Web应用方法总结
查看>>
Spring Security 文档翻译 - 架构与实现之二核心服务
查看>>
解决[Errno 24] Too many open files
查看>>
Android之parseSDKContent failed
查看>>