SynchronousQueue详解

SynchronousQueue介绍

  【1】 SynchronousQueue是一个没有数据缓冲的BlockingQueue ,生产者线程对其的插入操作put必须等待消费者的移除操作take。

             

  【2】如图所示,SynchronousQueue 最大的不同之处在于, 它的容量为 0 ,所以没有一个地方来暂存元素,导致 每次取数据都要先阻塞,直到有数据被放入 ;同理, 每次放数据的时候也会阻塞,直到有消费者来取

  【3】需要注意的是,SynchronousQueue 的容量不是 1 而是 0, 因为 SynchronousQueue 不需要去持有元素,它所做的就是直接传递(direct handoff) 。由于每当需要传递的时候,SynchronousQueue 会把元素直接从生产者传给消费者,在此期间并不需要做存储,所以如果运用得当,它的效率是很高的。

SynchronousQueue的源码分析

  【1】构造函数

 //  默认采用非公平 
 public  SynchronousQueue() {  this ( false  );
}
// 可以选择模式 public SynchronousQueue( boolean fair) {
transferer
= fair ? new TransferQueue<E>() : new TransferStack<E> ();
}

   【2】 核心方法分析

 //  这些方法本质上都是调用属性值transferer的transfer方法 
 public   void  put(E e)  throws  InterruptedException {  if  (e ==  null )  throw   new  NullPointerException();  if  (transferer.transfer(e,  false , 0) ==  null  ) {
Thread.interrupted();
throw new InterruptedException();
}
}
public boolean offer(E e) { if (e == null ) throw new NullPointerException(); return transferer.transfer(e, true , 0) != null ;
}
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null ) throw new NullPointerException(); if (transferer.transfer(e, true , unit.toNanos(timeout)) != null ) return true ; if (! Thread.interrupted()) return false ; throw new InterruptedException();
}
public E take() throws InterruptedException {
E e
= transferer.transfer( null , false , 0 ); if (e != null ) return e;
Thread.interrupted();
throw new InterruptedException();
}
public E poll( long timeout, TimeUnit unit) throws InterruptedException {
E e
= transferer.transfer( null , true , unit.toNanos(timeout)); if (e != null || ! Thread.interrupted()) return e; throw new InterruptedException();
}
public E poll() { return transferer.transfer( null , true , 0 );
}

 

Transferer分析

  【1】Transferer是SynchronousQueue的内部抽象类,双栈和双队列算法共享该类。他只有一个transfer方法,用于转移元素,从生产者转移到消费者;或者消费者调用该方法从生产者取数据。

  【2】Transferer有两个实现类:TransferQueue和TransferStack。

  【3】这两个类的区别就在于是否公平。TransferQueue是公平的,TransferStack非公平。

  【4】源码展示

 //  堆栈和队列共同的接口,负责执行 put or take 
 abstract   static   class  Transferer<E> {  //  e 为空的,会直接返回特殊值,不为空会传递给消费者  //  timed 为 true,说明会有超时时间 
     abstract  E transfer(E e,  boolean  timed,  long  nanos);
}

 

TransferQueue分析

  【1】节点元素

 //  队列节点元素 
 static   final   class  QNode {  //  当前元素的下一个元素 
     volatile  QNode next;  //  当前元素的值,如果当前元素被阻塞住了,等其他线程来唤醒自己时,其他线程会把自己 set 到 item 里面 
     volatile  Object item;  //  可以阻塞住的当前线程 
     volatile  Thread waiter;  //  节点类型:true是 put,false是 take 
     final   boolean  isData;         

....
}

  【2】构造方法

 //  队列头结点指针 
 transient   volatile  QNode head;  //  队列尾结点指针 
 transient   volatile  QNode tail;

TransferQueue() {
QNode h
= new QNode( null , false ); // initialize to dummy node. head = h;
tail
= h;
}

  【3】核心方法

@SuppressWarnings("unchecked" )
E transfer(E e,
boolean timed, long nanos) {

QNode s
= null ; // 根据是否传入数据 判断是获取还是存放 boolean isData = (e != null ); for (;;) { // 队列头和尾的临时变量,队列是空的时候,t=h QNode t = tail;
QNode h
= head; // tail 和 head 没有初始化时,无限循环,虽然这种 continue 非常耗cpu,但感觉不会碰到这种情况 // 因为 tail 和 head 在 TransferQueue 初始化的时候,就已经被赋值空节点了 if (t == null || h == null ) // saw uninitialized value continue ; // spin // 首尾节点相同,说明是空队列 // 或者尾节点的操作和当前节点操作一致 if (h == t || t.isData == isData) { // empty or same-mode QNode tn = t.next; if (t != tail) // 直至拿到尾节点 continue ; if (tn != null ) { // lagging tail advanceTail(t, tn); continue ;
}
// 超时直接返回 null if (timed && nanos <= 0) // can't wait return null ; // 构建新节点 if (s == null )
s
= new QNode(e, isData); // 将新建节点塞入队列 if (!t.casNext( null , s)) // failed to link in continue ;

advanceTail(t, s);
// 阻塞住自己 Object x = awaitFulfill(s, e, timed, nanos); if (x == s) { // wait was cancelled clean(t, s); return null ;
}
if (!s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head if (x != null ) // and forget fields s.item = s;
s.waiter
= null ;
}
return (x != null ) ? (E)x : e;

}
// 队列不为空,并且当前操作和队尾不一致,也就是说当前操作是队尾是对应的操作 // 比如说队尾是因为 take 被阻塞的,那么当前操作必然是 put else { // 也就是这行代码体现出队列的公平,每次操作时,从头开始按照顺序进行操作 QNode m = h.next; if (t != tail || m == null || h != head) continue ; // inconsistent read Object x = m.item; if (isData == (x != null ) || // m already fulfilled x == m || // m cancelled !m.casItem(x, e)) { // lost CAS advanceHead(h, m); // dequeue and retry continue ;
}
// 当前操作放到队头 advanceHead(h, m); // 释放队头阻塞节点 LockSupport.unpark(m.waiter); return (x != null ) ? (E)x : e;
}
}
}

 

TransferStack分析

  【1】节点元素

 //  栈中节点的几种类型:  //  1. 消费者(请求数据的) 
 static   final   int  REQUEST    = 0 ;  //  2. 生产者(提供数据的) 
 static   final   int  DATA       = 1 ;  //  3. 二者正在匹配中 
 static   final   int  FULFILLING = 2 ;  //  栈中的节点 
 static   final   class  SNode {  //  下一个节点 
     volatile  SNode next;  volatile  SNode match;        //  the node matched to this  //  等待着的线程 
     volatile  Thread waiter;    
Object item;
// 模式,也就是节点的类型,是消费者,是生产者,还是正在匹配中 int mode;
...
}

 

  【2】核心方法

 //  TransferStack.transfer()方法 
E transfer(E e,  boolean  timed,  long  nanos) {
SNode s
= null ; // constructed/reused as needed // 根据e是否为null决定是生产者还是消费者 int mode = (e == null ) ? REQUEST : DATA; // 自旋+CAS for (;;) { // 栈顶元素 SNode h = head; // 栈顶没有元素,或者栈顶元素跟当前元素是一个模式的 // 也就是都是生产者节点或者都是消费者节点 if (h == null || h.mode == mode) { // empty or same-mode // 如果有超时而且已到期 if (timed && nanos <= 0) { // can't wait // 如果头节点不为空且是取消状态 if (h != null && h.isCancelled()) // 就把头节点弹出,并进入下一次循环 casHead(h, h.next); // pop cancelled node else // 否则,直接返回null(超时返回null) return null ;
}
else if (casHead(h, s = snode(s, e, h, mode))) { // 入栈成功(因为是模式相同的,所以只能入栈) // 调用awaitFulfill()方法自旋+阻塞当前入栈的线程并等待被匹配到 SNode m = awaitFulfill(s, timed, nanos); // 如果m等于s,说明取消了,那么就把它清除掉,并返回null if (m == s) { // wait was cancelled clean(s); // 被取消了返回null return null ;
}
// 到这里说明匹配到元素了 // 因为从awaitFulfill()里面出来要不被取消了要不就匹配到了 // 如果头节点不为空,并且头节点的下一个节点是s // 就把头节点换成s的下一个节点 // 也就是把h和s都弹出了 // 也就是把栈顶两个元素都弹出了 if ((h = head) != null && h.next == s)
casHead(h, s.next);
// help s's fulfiller // 根据当前节点的模式判断返回m还是s中的值 return (E) ((mode == REQUEST) ? m.item : s.item);
}
}
else if (!isFulfilling(h.mode)) { // try to fulfill // 到这里说明头节点和当前节点模式不一样 // 如果头节点不是正在匹配中 // 如果头节点已经取消了,就把它弹出栈 if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry else if (casHead(h, s=snode(s, e, h, FULFILLING| mode))) { // 头节点没有在匹配中,就让当前节点先入队,再让他们尝试匹配 // 且s成为了新的头节点,它的状态是正在匹配中 for (;;) { // loop until matched or waiters disappear SNode m = s.next; // m is s's match // 如果m为null,说明除了s节点外的节点都被其它线程先一步匹配掉了 // 就清空栈并跳出内部循环,到外部循环再重新入栈判断 if (m == null ) { // all waiters are gone casHead(s, null ); // pop fulfill node s = null ; // use new node next time break ; // restart main loop }
SNode mn
= m.next; // 如果m和s尝试匹配成功,就弹出栈顶的两个元素m和s if (m.tryMatch(s)) {
casHead(s, mn);
// pop both s and m // 返回匹配结果 return (E) ((mode == REQUEST) ? m.item : s.item);
}
else // lost match // 尝试匹配失败,说明m已经先一步被其它线程匹配了 // 就协助清除它 s.casNext(m, mn); // help unlink }
}
}
else { // help a fulfiller // 到这里说明当前节点和头节点模式不一样 // 且头节点是正在匹配中 SNode m = h.next; // m is h's match if (m == null ) // waiter is gone // 如果m为null,说明m已经被其它线程先一步匹配了 casHead(h, null ); // pop fulfilling node else {
SNode mn
= m.next; // 协助匹配,如果m和s尝试匹配成功,就弹出栈顶的两个元素m和s if (m.tryMatch(h)) // help match // 将栈顶的两个元素弹出后,再让s重新入栈 casHead(h, mn); // pop both h and m else // lost match // 尝试匹配失败,说明m已经先一步被其它线程匹配了 // 就协助清除它 h.casNext(m, mn); // help unlink }
}
}
}
// 三个参数:需要等待的节点,是否需要超时,超时时间 SNode awaitFulfill(SNode s, boolean timed, long nanos) { // 到期时间 final long deadline = timed ? System.nanoTime() + nanos : 0L ; // 当前线程 Thread w = Thread.currentThread(); // 自旋次数 int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0 ); for (;;) { // 当前线程中断了,尝试清除s if (w.isInterrupted())
s.tryCancel();
// 检查s是否匹配到了元素m(有可能是其它线程的m匹配到当前线程的s) SNode m = s.match; // 如果匹配到了,直接返回m if (m != null ) return m; // 如果需要超时 if (timed) { // 检查超时时间如果小于0了,尝试清除s nanos = deadline - System.nanoTime(); if (nanos <= 0L ) {
s.tryCancel();
continue ;
}
}
if (spins > 0 ) // 如果还有自旋次数,自旋次数减一,并进入下一次自旋 spins = shouldSpin(s) ? (spins-1) : 0 ; // 后面的elseif都是自旋次数没有了 else if (s.waiter == null ) // 如果s的waiter为null,把当前线程注入进去,并进入下一次自旋 s.waiter = w; // establish waiter so can park next iter else if (! timed) // 如果不允许超时,直接阻塞,并等待被其它线程唤醒,唤醒后继续自旋并查看是否匹配到了元素 LockSupport.park( this ); else if (nanos > spinForTimeoutThreshold) // 如果允许超时且还有剩余时间,就阻塞相应时间 LockSupport.parkNanos( this , nanos);
}
}
// SNode里面的方向,调用者m是s的下一个节点 // 这时候m节点的线程应该是阻塞状态的 boolean tryMatch(SNode s) { // 如果m还没有匹配者,就把s作为它的匹配者 if (match == null && UNSAFE.compareAndSwapObject( this , matchOffset, null , s)) {
Thread w
= waiter; if (w != null ) { // waiters need at most one unpark waiter = null ; // 唤醒m中的线程,两者匹配完毕 LockSupport.unpark(w);
}
// 匹配到了返回true return true ;
}
// 可能其它线程先一步匹配了m,返回其是否是s return match == s;
}

 

SynchronousQueue总结

  【1】是一个没有数据缓冲的BlockingQueue,容量为0,它不会为队列中元素维护存储空间,它只是多个线程之间数据交换的媒介。

  【2】数据结构:链表,在其内部类中维护了数据

      先消费(take),后生产(put);

        第一个线程Thread0是消费者访问,此时队列为空,则入队(创建Node结点并赋值)

        第二个线程Thread1也是消费者访问,与队尾模式相同,继续入队

        第三个线程Thread2是生产者,携带了数据e,与队尾模式不同,不进行入队操作。直接将该线程携带的数据e返回给队首的消费者,并唤醒队首线程Thread1(默认非公平策略是栈结构),出队。

      反之,先生产(put)后消费(take),原理一样

  【3】锁:CAS+自旋(无锁)【阻塞: 自旋了一定次数后调用 LockSupport.park()

  【4】存取调用同一个方法:transfer()

      put、offer 为生产者,携带了数据 e,为 Data 模式,设置到 SNode或QNode 属性中。

      take、poll 为消费者,不携帯数据,为 Request 模式,设置到 SNode或QNode属性中。

  【5】过程

      线程访问阻塞队列,先判断队尾节点或者栈顶节点的 Node 与当前入队模式是否相同

      相同则构造节点 Node 入队,并阻塞当前线程,元素 e 和线程赋值给 Node 属性

      不同则将元素 e(不为 null) 返回给取数据线程,队首或栈顶线程被唤醒,出队

  【6】公平模式:TransferQueue,队尾匹配(判断模式),队头出队,先进先出

  【7】非公平模式(默认策略):TransferStack,栈顶匹配,栈顶出栈,后进先出

  【8】应用场景

      SynchronousQueue非常适合传递性场景做交换工作,生产者的线程和消费者的线程同步传递某些信息、事件或者任务。

      SynchronousQueue的一个使用场景是在线程池里。如果我们不确定来自生产者请求数量,但是这些请求需要很快的处理掉,那么配合SynchronousQueue为每个生产者请求分配一个消费线程是处理效率最高的办法。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。

        

标签: Java

添加新评论