今天我們來聊一聊以數(shù)組為數(shù)據(jù)結(jié)構(gòu)的阻塞隊列 ArrayBlockingQueue,它實現(xiàn)了 BlockingQueue 接口,繼承了抽象類 AbstractQueue。
BlockingQueue 提供了三個元素入隊的方法。
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
三個元素出隊的方法。
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
boolean remove(Object o);
一起來看看,ArrayBlockingQueue 是如何實現(xiàn)的吧。
初識
首先看一下 ArrayBlockingQueue 的主要屬性和構(gòu)造函數(shù)。
屬性
//存放元素
final Object[] items;
//取元素的索引
int takeIndex;
//存元素的索引
int putIndex;
//元素的數(shù)量
int count;
//控制并發(fā)的鎖
final ReentrantLock lock;
//非空條件信號量
private final Condition notEmpty;
//非滿條件信號量
private final Condition notFull;
transient Itrs itrs = null;
從以上屬性可以看出:
- 以數(shù)組的方式存放元素。
- 用 putIndex 和 takeIndex 控制元素入隊和出隊的索引。
- 用重入鎖控制并發(fā)、保證線程的安全。
構(gòu)造函數(shù)
ArrayBlockingQueue 有三個構(gòu)造函數(shù),其中 public ArrayBlockingQueue(int capacity, boolean fair, Collection c)
構(gòu)造函數(shù)并不常用,暫且不提。看其中兩個構(gòu)造函數(shù)。
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
//構(gòu)造數(shù)組
this.items = new Object[capacity];
//默認(rèn)以非公平鎖初始化 ReentrantLock
lock = new ReentrantLock(fair);
//創(chuàng)建兩個條件信號量
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
可以看出 ArrayBlockingQueue 必須再創(chuàng)建時傳入數(shù)組的大小。
元素入隊
ArrayBlockingQueue 有 add()、offer()、put()、offer(E e, long timeout, TimeUnit unit) 方法用來元素的入隊。
add
//ArrayBlockingQueue.add()
public boolean add(E e) {
//調(diào)用父類的 AbstractQueue.add() 方法
return super.add(e);
}
//AbstractQueue.add()
public boolean add(E e) {
//調(diào)用 ArrayBlockingQueue.offer(),成功則返回 true,否則拋出異常
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
//ArrayBlockingQueue.offer()
public boolean offer(E e) {
//非空檢查
checkNotNull(e);
//加鎖
final ReentrantLock lock = this.lock;
lock.lock();
try {
//數(shù)組滿了,返回 false
if (count == items.length)
return false;
else {
//添加元素
enqueue(e);
return true;
}
} finally {
//解鎖
lock.unlock();
}
}
//ArrayBlockingQueue.enqueue()
private void enqueue(E x) {
final Object[] items = this.items;
//直接放到 putIndex 的位置
items[putIndex] = x;
//如果索引滿了,putIndex 就從 0 開始,為什么呢?
if (++putIndex == items.length)
putIndex = 0;
//數(shù)量加一
count++;
//數(shù)組里面有數(shù)據(jù)了,對 notEmpty 條件隊列進(jìn)行通知
notEmpty.signal();
}
上面留下了一個坑,索引等于數(shù)組的長度的時候,索引就從 0 開始了。其實很簡單,這個數(shù)組是不是先入先出的,0 索引的數(shù)組先入隊,也是先出隊的。這時候 0 索引的位置就空了,所以 putIndex 到達(dá)數(shù)組長度的時候就可以從 0 開始。這里可以看出,ArrayBlockingQueue 是絕對不可以修改數(shù)組長度的,一旦初始化后長度就不能再改變了。
put
//ArrayBlockingQueue.put()
public void put(E e) throws InterruptedException {
//非空檢查
checkNotNull(e);
final ReentrantLock lock = this.lock;
//加鎖
lock.lockInterruptibly();
try {
//數(shù)組滿了,線程加入 notFull 隊列中等待被喚醒
while (count == items.length)
notFull.await();
//添加元素
enqueue(e);
} finally {
//解鎖
lock.unlock();
}
}
offer
ArrayBlockingQueue 中有兩個 offer() 方法,offer(E e) 和 offer(E e, long timeout, TimeUnit unit),add() 方法調(diào)用的就是 offer(E e) 方法。
//ArrayBlockingQueue.offer(E e, long timeout, TimeUnit unit)
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
//非空檢查
checkNotNull(e);
//將時間轉(zhuǎn)換為納秒
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
//加鎖
lock.lockInterruptibly();
try {
//當(dāng)數(shù)組滿了
while (count == items.length) {
//時間到了,元素還沒有入隊,則返回 false
if (nanos <= 0)
return false;
//線程加入 notFull 隊列中,等待被喚醒,到達(dá) nanos 時間返回剩余的 nanos 時間
nanos = notFull.awaitNanos(nanos);
}
//元素入隊
enqueue(e);
return true;
} finally {
//解鎖
lock.unlock();
}
}
以上就是所有的元素入隊的方法,可以得出一些結(jié)論:
- add() 元素滿了,就拋出異常。
- offer() 元素滿了,返回 false。
- put() 元素滿了,線程阻塞等待被入隊。
- offer(E e, long timeout, TimeUnit unit) 加入超時時間,如果時間到了元素還是沒有被入隊,則返回 false
移除元素
ArrayBlockingQueue 提供了 poll()、take()、poll(long timeout, TimeUnit unit)、remove() 方法用于元素的出隊。
poll
ArrayBlockingQueue 中有兩個 poll() 方法,poll() 和 poll(long timeout, TimeUnit unit)。
//ArrayBlockingQueue.poll()
public E poll() {
final ReentrantLock lock = this.lock;
//加鎖
lock.lock();
try {
//沒有元素返回 null,否則元素出隊
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
//ArrayBlockingQueue.dequeue()
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//獲取 takeIndex 上的元素
E x = (E) items[takeIndex];
//設(shè)置 takeIndex 索引上的元素為 null
items[takeIndex] = null;
//當(dāng) takeIndex 長度是數(shù)組長度,takeIndex 索引從 0 開始
if (++takeIndex == items.length)
takeIndex = 0;
//元素數(shù)量 -1
count--;
if (itrs != null)
//更新迭代器
itrs.elementDequeued();
//喚醒 notFull 的等待隊列,其中等待的第一個線程可以添加元素了
notFull.signal();
return x;
}
//ArrayBlockingQueue.poll(long timeout, TimeUnit unit)
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
////將時間轉(zhuǎn)換為納秒
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
//加鎖
lock.lockInterruptibly();
try {
//數(shù)組為空,超時還沒有元素出隊,則返回 null
while (count == 0) {
if (nanos <= 0)
return null;
//線程加入 notEmpty 中,等待被喚醒,到達(dá) nanos 時間返回剩余的 nanos 時間
nanos = notEmpty.awaitNanos(nanos);
}
//元素出隊
return dequeue();
} finally {
lock.unlock();
}
}
take
//ArrayBlockingQueue.take()
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//加鎖
lock.lockInterruptibly();
try {
//無元素
while (count == 0)
//將線程加入 notEmpty 的等待隊列中,等待被入隊的元素喚醒
notEmpty.await();
//元素出隊
return dequeue();
} finally {
//解鎖
lock.unlock();
}
}
remove
//ArrayBlockingQueue.remove()
public boolean remove(Object o) {
//非空檢查
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
//加鎖
lock.lock();
try {
if (count > 0) {
//入隊元素的索引
final int putIndex = this.putIndex;
//出隊元素的索引
int i = takeIndex;
do {
//找到元素
if (o.equals(items[i])) {
removeAt(i);
return true;
}
//i 等于數(shù)組長度的時候,從 0 開始
if (++i == items.length)
i = 0;
// i == putIndex 說明已經(jīng)遍歷了一遍
} while (i != putIndex);
}
return false;
} finally {
//解鎖
lock.unlock();
}
}
//ArrayBlockingQueue.removeAt()
void removeAt(final int removeIndex) {
final Object[] items = this.items;
//需要出隊的 removeIndex 正好是 takeIndex
if (removeIndex == takeIndex) {
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
//更新迭代器
if (itrs != null)
itrs.elementDequeued();
} else {
final int putIndex = this.putIndex;
// 循環(huán)移動元素,將 next 元素向前移動 1 個
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
//設(shè)置 i 索引的位置為空,putIndex 索引為 i
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
// 喚醒 notFull 隊列中等待的線程,通知可以元素入隊了
notFull.signal();
}
以上就是所有的元素出隊的方法,可以得出一些結(jié)論:
- poll() 元素出隊為空,則返回空
- take() 元素出隊為空的時候,會阻塞線程
- remove() 元素出隊的時候可能會移動數(shù)組
- poll(long timeout, TimeUnit unit) 加入超時時間,如果時間到了還是沒有元素需要出隊,則返回 null
總結(jié)
ArrayBlockingQueue 可以被用在生產(chǎn)者和消費者模型中。
- ArrayBlockingQueue,不能被擴(kuò)容,初始化被指定容量。
- 利用 putIndex 和 takeIndex 循環(huán)利用數(shù)組。
- 利用了 ReentrantLock 和 兩個 Condition 保證了線程的安全。
-
接口
+關(guān)注
關(guān)注
33文章
8687瀏覽量
151671 -
函數(shù)
+關(guān)注
關(guān)注
3文章
4345瀏覽量
62867 -
數(shù)據(jù)結(jié)構(gòu)
+關(guān)注
關(guān)注
3文章
573瀏覽量
40190 -
數(shù)組
+關(guān)注
關(guān)注
1文章
417瀏覽量
26000
發(fā)布評論請先 登錄
相關(guān)推薦
評論