请选择 进入手机版 | 继续访问电脑版

12360技术网 - 专业IT技术发表平台

 立即注册  找回密码
查看: 2222|回复: 2

BlockingQueue

[复制链接]

17

主题

23

帖子

187

积分

注册会员

Rank: 2

积分
187
发表于 2020-1-26 16:20:14 | 显示全部楼层 |阅读模式
文章目录


BlockingQueue简介

在Java中,BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理,却是类似的。
LinkedBlockingQueue 源码分析

LinkedBlockingQueue是一个基于链表实现的可选容量的阻塞队列。队头的元素是插入时间最长的,队尾的元素是最新插入的。新的元素将会被插入到队列的尾部。
LinkedBlockingQueue的容量限制是可选的,如果在初始化时没有指定容量,那么默认使用int的最大值作为队列容量。
底层数据结构

LinkedBlockingQueue内部是使用链表实现一个队列的,但是却有别于一般的队列,在于该队列至少有一个节点,头节点不含有元素。结构图如下

原理

LinkedBlockingQueue中维持两把锁,一把锁用于入队,一把锁用于出队,这也就意味着,同一时刻,只能有一个线程执行入队,其余执行入队的线程将会被阻塞;同时,可以有另一个线程执行出队,其余执行出队的线程将会被阻塞。换句话说,虽然入队和出队两个操作同时均只能有一个线程操作,但是可以一个入队线程和一个出队线程共同执行,也就意味着可能同时有两个线程在操作队列,那么为了维持线程安全,LinkedBlockingQueue使用一个AtomicInterger类型的变量表示当前队列中含有的元素个数,所以可以确保两个线程之间操作底层队列是线程安全的。
  1.     //用一个原子类型来表示当前队列中的元素个数    private final AtomicInteger count = new AtomicInteger();    //用于出队的锁    private final ReentrantLock takeLock = new ReentrantLock();    //当队列为空时,保存执行出队的线程    private final Condition notEmpty = takeLock.newCondition();    //用于入队的锁    private final ReentrantLock putLock = new ReentrantLock();    //当队列满时,保存执行入队的线程    private final Condition notFull = putLock.newCondition();
复制代码
不指定容量则是使用整形最大数作为容量
  1.     public LinkedBlockingQueue() {        this(Integer.MAX_VALUE);    }
复制代码
put()
  1.     public void put(E e) throws InterruptedException {        //null元素不能插入队列        if (e == null) throw new NullPointerException();        int c = -1;        //封装成节点        Node node = new Node(e);        //获取入队锁        final ReentrantLock putLock = this.putLock;        //获取队列元素数量        final AtomicInteger count = this.count;        //响应中断的获取锁        putLock.lockInterruptibly();        try {            //入队前已经是最大容量            while (count.get() == capacity) {                //那么将当前线程加入到等待队列中                notFull.await();            }            //入队            enqueue(node);            //c:入队前队列元素总数            c = count.getAndIncrement();            if (c + 1 < capacity)                //如果还可以插入元素,那么释放等待的入队线程                notFull.signal();        } finally {            putLock.unlock();        }        //通知出队线程队列非空        if (c == 0)            signalNotEmpty();    }
复制代码
入队:
  1.     private void enqueue(Node node) {        last = last.next = node;    }
复制代码
总结:

  • LinkedBlockingQueue不允许元素为null。
  • 同一时刻,只能有一个线程执行入队操作,因为putLock在将元素插入到队列尾部时加锁了
  • 如果队列满了,那么将会调用notFull的await()方法将该线程加入到Condition等待队列中。await()方法就会释放线程占有的锁,这将导致之前由于被锁阻塞的入队线程将会获取到锁,执行到while循环处,不过可能因为由于队列仍旧是满的,也被加入到条件队列中。
  • 一旦一个出队线程取走了一个元素,并通知了入队等待队列中可以释放线程了,那么第一个加入到Condition队列中的将会被释放,那么该线程将会重新获得put锁,继而执行enqueue()方法,将节点插入到队列的尾部
  • 然后得到插入一个节点之前的元素个数,如果队列中还有空间可以插入,那么就通知notFull条件的等待队列中的线程。
  • 通知出队线程队列为空了,因为插入一个元素之前的个数为0,而插入一个之后队列中的元素就从无变成了有,就可以通知因队列为空而阻塞的出队线程了。
take()
take()方法用于得到队头的元素,在队列为空时会阻塞,知道队列中有元素可取
  1.     public E take() throws InterruptedException {        E x;        int c = -1;        //获取队列元素数量        final AtomicInteger count = this.count;        //获取出队锁        final ReentrantLock takeLock = this.takeLock;        //可中断的获取锁        takeLock.lockInterruptibly();        try {            //如果队列没有元素,则将当前线程加入非空等待队列中            while (count.get() == 0) {                notEmpty.await();            }            //出队            x = dequeue();            //出队前队列元素数量            c = count.getAndDecrement();            //还可以出队则通知非空等待队列,唤醒后继线程出队            if (c > 1)                notEmpty.signal();        } finally {            //解锁            takeLock.unlock();        }        //如果队列中的元素从满到非满,通知put线程               if (c == capacity)            signalNotFull();        return x;    }
复制代码
remove()
remove()方法用于删除队列中一个元素,如果队列中不含有该元素,那么返回false;有的话则删除并返回true。入队和出队都是只获取一个锁,而remove()方法需要同时获得两把锁,其实现如下:
  1.     public boolean remove(Object o) {        if (o == null) return false;        //获取入队锁和出队锁        fullyLock();        try {            //从头结点开始遍历查找,找到则取消链接,返回true            for (Node trail = head, p = trail.next;                 p != null;                 trail = p, p = p.next) {                if (o.equals(p.item)) {                    unlink(p, trail);                    return true;                }            }            return false;        } finally {            fullyUnlock();        }    }
复制代码
LinkedBlockingQueue总结

LinkedBlockingQueue:一个单向链表+两把锁+两个条件
两把锁,一把用于入队,一把用于出队,有效的避免了入队与出队时使用一把锁带来的竞争。
在入队与出队都高并发的情况下,性能比ArrayBlockingQueue高很多
采用了链表,最大容量为整数最大值,可看做容量无限
ArrayBlockingQueue源码分析

ArrayBlockingQueue底层是使用一个数组实现队列的,并且在构造ArrayBlockingQueue时需要指定容量,也就意味着底层数组一旦创建了,容量就不能改变了,因此ArrayBlockingQueue是一个容量限制的阻塞队列。因此,在队列全满时执行入队将会阻塞,在队列为空时出队同样将会阻塞。
原理
  1.     //对象数组    final Object[] items;    //就一把锁    final ReentrantLock lock;    //非空等待队列    private final Condition notEmpty;    //非满等待队列    private final Condition notFull;
复制代码
put(E e)
  1.     public void put(E e) throws InterruptedException {        //插入元素不能为空        checkNotNull(e);        //获取锁        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            //如果队列已满,则将当前线程加入到非满等待队列,直到非满            while (count == items.length)                notFull.await();            //入队                enqueue(e);        } finally {            lock.unlock();        }    }
复制代码
入队:
  1.     private void enqueue(E x) {        //在数组末尾加入元素        final Object[] items = this.items;        items[putIndex] = x;        if (++putIndex == items.length)            putIndex = 0;        count++;        notEmpty.signal();    }
复制代码
put方法总结:

  • ArrayBlockingQueue不允许元素为null
  • ArrayBlockingQueue在队列已满时将会调用notFull的await()方法释放锁并处于阻塞状态
  • 一旦ArrayBlockingQueue不为满的状态,就将元素入队
take()
  1.     public E take() throws InterruptedException {        //获取锁        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            //如果队列为空,则将当前线程加入到等待队列中            while (count == 0)                notEmpty.await();            //出队                return dequeue();        } finally {            lock.unlock();        }    }
复制代码
ArrayBlockingQueue总结:

ArrayBlockingQueue的并发阻塞是通过ReentrantLock和Condition来实现的,ArrayBlockingQueue内部只有一把锁,意味着同一时刻只有一个线程能进行入队或者出队的操作。
一个对象数组+一把锁+两个条件,入队与出队都用同一把锁
在只有入队高并发或出队高并发的情况下,因为操作数组,且不需要扩容,性能很高
采用了数组,必须指定大小,即容量有限
                                                                                                                                       
                                                    
  • 点赞                        
  • 收藏                        
  • 分享                                                                                                                        
  •                                                         
                                      
    • 文章举报                           
                                                
                                                                        
                                            
                                                        Kevin_cai09                                                                发布了256 篇原创文章 · 获赞 136 · 访问量 3万+                                                                                            私信                                                            关注
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?立即注册

x




上一篇:Redis客户端(RedisDesktopManager)连接Linux系统的Redis服务(需要配置Re
下一篇:单例--Meyers' Singleton
回复

使用道具 举报

0

主题

11

帖子

241

积分

中级会员

Rank: 3Rank: 3

积分
241
发表于 2020-1-28 17:01:07 | 显示全部楼层
楼主,大恩不言谢了![www.12360.co]
回复

使用道具 举报

0

主题

11

帖子

241

积分

中级会员

Rank: 3Rank: 3

积分
241
发表于 4 天前 | 显示全部楼层
楼主发贴辛苦了,谢谢楼主分享![www.12360.co]
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

12360技术网

GMT+8, 2020-2-18 05:42 , Processed in 0.091187 second(s), 41 queries .

本网站内容收集于互联网,Www.12360.Co不承担任何由于内容的合法性及健康性所引起的争议和法律责任。 欢迎大家对网站内容侵犯版权等不合法和不健康行为进行监督和举报。

© 2019-2020 Www.12360.Co

快速回复 返回顶部 返回列表