今天帶大家深入剖析一下Redis分布式鎖,徹底搞懂它。
場景
既然要搞懂Redis分布式鎖,那肯定要有一個需要它的場景。
高并發售票問題就是一個經典案例。
搭建環境
@Service
public class TicketServiceImpl implements TicketService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private Logger logger = LoggerFactory.getLogger(TicketServiceImpl.class);
@Override
public String sellTicket() {
String ticketStr = stringRedisTemplate.opsForValue().get("ticket");
int ticket = 0;
if (null != ticketStr) {
ticket = Integer.parseInt(ticketStr);
}
if (ticket > 0) {
int ticketNew = ticket - 1;
stringRedisTemplate.opsForValue().set("ticket", String.valueOf(ticketNew));
logger.info("當前票的庫存為:" + ticketNew);
} else {
logger.info("手速不夠呀,票已經賣光了...");
}
return "搶票成功...";
}
}
分析解決問題
以上代碼沒有做任何的加鎖操作,在高并發情況下,票的超賣情況很嚴重,根本無法正常使用
分析1
既然要加分布式鎖,那么我們可以使用Redis中的setnx
命令來模擬一個鎖。
redis > EXISTS job # job 不存在
(integer) 0
redis > SETNX job "programmer" # job 設置成功
(integer) 1
redis > SETNX job "code-farmer" # 嘗試覆蓋 job ,失敗
(integer) 0
當一個線程進入到當前方法中,使用 setnx
設置一個鍵,如果設置成功,就允許繼續訪問,設置失敗,就不能訪問該方法;
當方法運行完畢時,將這個鍵刪除,下一次再有線程來訪問時,就重新執行該操作。
public String sellTicket() {
String lock="lock";
// 如果成功設置這個值,證明目前該方法并沒有被操作,可以進行賣票操作
Boolean tag = stringRedisTemplate.opsForValue().setIfAbsent(lock, "");
if (!tag) { // 如果設置失敗,證明當前方法正在被執行,不允許再次執行
// 實際開發環境應該使用隊列來完成訪問操作,這里主要探究分布式鎖的問題,所以僅僅模擬了場景
// 這里使用自旋的方式,防止訪問信息丟失
sellTicket();
return "當前訪問人數過多,請稍后訪問...";
}
String ticketStr = stringRedisTemplate.opsForValue().get("ticket");
int ticket = 0;
if (null != ticketStr) {
ticket = Integer.parseInt(ticketStr);
}
if (ticket > 0) {
int ticketNew = ticket - 1;
stringRedisTemplate.opsForValue().set("ticket", String.valueOf(ticketNew));
logger.info("當前票的庫存為:" + ticketNew);
} else {
logger.info("手速不夠呀,票已經賣光了...");
}
stringRedisTemplate.delete(lock);
return "搶票成功...";
}
分析2
上述的代碼在程序正常運行下不會出現票超賣的問題,但是我們需要考慮:
- 如果程序運行中系統出現了異常,導致無法刪除
lock
,就會造成死鎖的問題。也許有人馬上就會想到,使用try{} finally {}
,在finally中進行刪除鎖的操作。- 但是,如果是分布式架構,第一個服務器接收到請求,加了鎖,此時第二個服務器也接收到請求,
setnx
命令失敗,需要執行return操作,根據finally的特性,執行return之前,需要先執行finally里的代碼,于是,第二個服務器把鎖給刪除了,程序中鎖失效了,肯定會出現票超賣等一系列問題。
- 但是,如果是分布式架構,第一個服務器接收到請求,加了鎖,此時第二個服務器也接收到請求,
- 如果程序在運行中直接徹底死了(比如,程序員閑著沒事兒,來了個 kill -9;或者斷電),就算加了finally,finally也不能執行,還是會出現死鎖問題
解決方法:
- 給鎖加一個標識符,只允許自己來操作鎖,其他訪問程序不能操作鎖
- 還要給鎖加一個過期時間,這樣就算程序死了,當時間過期后,還是能夠繼續執行
public String sellTicket() {
String lock="lock"; // 鎖的鍵
String lockId = UUID.randomUUID().toString(); // 鎖的值:唯一標識
try{
// 如果成功設置這個值,證明目前該方法并沒有被操作,可以進行賣票操作
// 添加一個過期時間,暫定為 30秒,這里的操作具有原子性,如果過期時間設置失敗,鍵也會設置失敗
Boolean tag = stringRedisTemplate.opsForValue().setIfAbsent(lock, lockId, 30, TimeUnit.SECONDS);
if (!tag) { // 如果設置失敗,證明當前方法正在被執行,不允許再次執行
// 實際開發環境應該使用隊列來完成訪問操作,這里主要探究分布式鎖的問題,所以僅僅模擬了場景
// 不設置回調的話,訪問信息會丟失
sellTicket();
return "當前訪問人數過多,請稍后訪問...";
}
String ticketStr = stringRedisTemplate.opsForValue().get("ticket");
int ticket = 0;
if (null != ticketStr) {
ticket = Integer.parseInt(ticketStr);
}
if (ticket > 0) {
int ticketNew = ticket - 1;
stringRedisTemplate.opsForValue().set("ticket", String.valueOf(ticketNew));
logger.info("當前票的庫存為:" + ticketNew);
} else {
logger.info("手速不夠呀,票已經賣光了...");
}
} finally {
// 如果redis中的值,和當前的值一致,才允許刪除鎖。
if (lockId.equals(stringRedisTemplate.opsForValue().get(lock))) {
stringRedisTemplate.delete(lock);
}
}
return "搶票成功...";
}
分析3
寫到這里已經可以解決大部分問題了,但是還需要考慮一個問題:
如果程序運行的極慢(硬件處理慢或者進行了GC),導致30秒已經到了,鎖已經失效了,程序還沒有運行完成,這時候,就會有另一個線程總想鉆個空子,導致票的超賣問題。
- 這里我們可以使用 sleep 模擬一下
......
if (ticket > 0) {
try {
// 為了測試方便,過期時間和線程暫停時間都改成了3秒
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int ticketNew = ticket - 1;
stringRedisTemplate.opsForValue().set("ticket", String.valueOf(ticketNew));
......
- 這樣運行就會出現極其嚴重的超賣問題
那么該如何設置這個過期時間呢?繼續加大?這顯然是不合適的,因為無論多么大,總有可能出現問題。
解決方法
我們可以使用 守護線程 ,來保證這個時間永不過期
public String sellTicket() {
String lock="lock"; // 鎖的鍵
String lockId = UUID.randomUUID().toString(); // 鎖的值:唯一標識
MyThread myThread = null; // 鎖的守護線程
try{
// 如果成功設置這個值,證明目前該方法并沒有被操作,可以進行賣票操作
// 添加一個過期時間,暫定為 3 秒,這里的操作具有原子性,如果過期時間設置失敗,鍵也會設置失敗
Boolean tag = stringRedisTemplate.opsForValue().setIfAbsent(lock, lockId, 3, TimeUnit.SECONDS);
if (!tag) { // 如果設置失敗,證明當前方法正在被執行,不允許再次執行
// 實際開發環境應該使用隊列來完成訪問操作,這里主要探究分布式鎖的問題,所以僅僅模擬了場景
// 不設置回調的話,訪問信息會丟失
sellTicket();
return "當前訪問人數過多,請稍后訪問...";
}
// 開啟守護線程, 每隔三分之一的時間,給鎖續命
myThread = new MyThread(lock);
myThread.setDaemon(true);
myThread.start();
String ticketStr = stringRedisTemplate.opsForValue().get("ticket");
int ticket = 0;
if (null != ticketStr) {
ticket = Integer.parseInt(ticketStr);
}
if (ticket > 0) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int ticketNew = ticket - 1;
stringRedisTemplate.opsForValue().set("ticket", String.valueOf(ticketNew));
logger.info("當前票的庫存為:" + ticketNew);
} else {
logger.info("手速不夠呀,票已經賣光了...");
}
} finally {
// 如果redis中的值,和當前的值一致,才允許刪除鎖。
if (lockId.equals(stringRedisTemplate.opsForValue().get(lock))) {
// 程序運行結束,需要關閉守護線程
myThread.stop();
stringRedisTemplate.delete(lock);
logger.info("釋放鎖成功...");
}
}
return "搶票成功...";
}
/** 使用后臺線程進行續命
* 守護線程
* 在主線程下 如果有一個守護線程 這個守護線程的生命周期 跟主線程是同生死的
*/
class MyThread extends Thread{
String lock;
MyThread (String lock) {
this.lock = lock;
}
@Override
public void run() {
while (true) {
try {
// 三分之一的時間
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 假設線程還活著,就要給鎖續命
logger.info("線程續命ing...");
stringRedisTemplate.expire(lock, 3, TimeUnit.SECONDS);
}
}
}
總結
到這里,我們已經基本實現了redis分布式鎖,并且可以在高并發場景下正常運行。
需要注意的是,實現分布式鎖的代碼肯定不是最佳的,重要的是了解分布式鎖的實現原理,以及發現問題并解決問題的思路。
-
代碼
+關注
關注
30文章
4798瀏覽量
68728 -
工具
+關注
關注
4文章
312瀏覽量
27828 -
線程
+關注
關注
0文章
505瀏覽量
19705 -
Redis
+關注
關注
0文章
376瀏覽量
10887
發布評論請先 登錄
相關推薦
評論