原创

09、ZooKeeper 客户端通信

继承关系

 

ServerCnxn代表服务端到客户端的一条连接

ServerCnxn实现了Stats和Watcher,所有具有查看ServerCnxn统计数据以及监听一些事件并进行处理的能力

属性

// 连接创建时间
protected final Date established = new Date();

// packet接收数量
protected final AtomicLong packetsReceived = new AtomicLong();
// packet发送数量
protected final AtomicLong packetsSent = new AtomicLong();

// 最小延迟
protected long minLatency;
// 最大延迟
protected long maxLatency;
// 上一次操作
protected String lastOp;
// 上一次的cxid
protected long lastCxid;
// 上一次zxid
protected long lastZxid;
// 上一次响应时间
protected long lastResponseTime;
// 上一次延迟
protected long lastLatency;
// 数量
protected long count;
// 总的延迟
protected long totalLatency;

内部类

CloseRequestException

请求关闭异常

protected static class CloseRequestException extends IOException {
   
     
  private static final long serialVersionUID = -7854505709816442681L;

  public CloseRequestException(String msg) {
   
     
    super(msg);
  }
}

EndOfStreamException

流结束异常

protected static class EndOfStreamException extends IOException {
   
     
  private static final long serialVersionUID = -8255690282104294178L;

  public EndOfStreamException(String msg) {
   
     
    super(msg);
  }

  public String toString() {
   
     
    return "EndOfStreamException: " + getMessage();
  }
}

重要方法

抽象方法

// 获取会话超时时间
abstract int getSessionTimeout();

// 关闭
abstract void close();

// 通知客户端session已关闭
abstract void sendCloseSession();

// 处理事件
public abstract void process(WatchedEvent event);

// 获取sessionId
public abstract long getSessionId();

// 设置sessionId
abstract void setSessionId(long sessionId);

// 输出缓冲
abstract void sendBuffer(ByteBuffer closeConn);

// 允许接收
abstract void enableRecv();

// 禁用接收
abstract void disableRecv();

// 设置会话的过期时间
abstract void setSessionTimeout(int sessionTimeout);

// 获取统计信息
protected abstract ServerStats serverStats();

具体方法

sendReponse

发送响应结果

public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException {
   
     
  ByteArrayOutputStream baos = new ByteArrayOutputStream();
  // Make space for length
  BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
  try {
   
     
    // 写入4个0
    baos.write(fourBytes);
    // 写入header
    bos.writeRecord(h, "header");
    // 写入record
    if (r != null) {
   
     
      bos.writeRecord(r, tag);
    }
    baos.close();
  } catch (IOException e) {
   
     
    LOG.error("Error serializing response");
  }
  // 获取序列化后的响应
  byte b[] = baos.toByteArray();
  // 使用响应信息的大小更新系统状态
  serverStats().updateClientResponseSize(b.length - 4);
  // 将序列化后的响应结果以及长度放入buffer中
  ByteBuffer bb = ByteBuffer.wrap(b);
  bb.putInt(b.length - 4).rewind();
  // 发送
  sendBuffer(bb);
}

获取验证信息

public List<Id> getAuthInfo() {
   
     
  return Collections.unmodifiableList(authInfo);
}

添加验证信息

public void addAuthInfo(Id id) {
   
     
  // 不存在才添加
  if (authInfo.contains(id) == false) {
   
     
    authInfo.add(id);
  }
}

移除验证信息

public boolean removeAuthInfo(Id id) {
   
     
  return authInfo.remove(id);
}

增加接收到的packet数量

protected void packetReceived() {
   
     
  // 自增packetsReceived
  incrPacketsReceived();
  // 更新服务器统计值
  ServerStats serverStats = serverStats();
  if (serverStats != null) {
   
     
    serverStats().incrementPacketsReceived();
  }
}

增加发送packet数量

protected void packetSent() {
   
     
  // 自增packetsSent
  incrPacketsSent();
  // 更新服务器统计值
  ServerStats serverStats = serverStats();
  if (serverStats != null) {
   
     
    serverStats.incrementPacketsSent();
  }
}

使用响应更新服务器状态

protected synchronized void updateStatsForResponse(long cxid, long zxid,
            String op, long start, long end)
{
   
     
  // don't overwrite with "special" xids - we're interested
  // in the clients last real operation
  if (cxid >= 0) {
   
     
    lastCxid = cxid;
  }
  lastZxid = zxid;
  lastOp = op;
  lastResponseTime = end;
  long elapsed = end - start;
  lastLatency = elapsed;
  if (elapsed < minLatency) {
   
     
    minLatency = elapsed;
  }
  if (elapsed > maxLatency) {
   
     
    maxLatency = elapsed;
  }
  count++;
  totalLatency += elapsed;
}

NIOServerCnxn

NIOServerCnxn是ServeCnxn的实现类,使用NIO来完成网络通信

也是每个客户端对应一个NIOServerCnxn

属性

// 工厂
private final NIOServerCnxnFactory factory;

// 连接对应的channel
private final SocketChannel sock;

// selector线程
private final SelectorThread selectorThread;

// 关注的事件
private final SelectionKey sk;

// 是否初始化
private boolean initialized;

// 四个字节的缓冲区
private final ByteBuffer lenBuffer = ByteBuffer.allocate(4);

private ByteBuffer incomingBuffer = lenBuffer;

// 缓冲队列 
private final Queue<ByteBuffer> outgoingBuffers =
  new LinkedBlockingQueue<ByteBuffer>();

// 会话过期时间
private int sessionTimeout;

// 服务器对象
private final ZooKeeperServer zkServer;

/**
     * The number of requests that have been submitted but not yet responded to.
     */
// 提交了但是未收到响应的请求个数
private final AtomicInteger outstandingRequests = new AtomicInteger(0);

/**
     * This is the id that uniquely identifies the session of a client. Once
     * this session is no longer active, the ephemeral nodes will go away.
     */
// 会话id
private long sessionId;

private final int outstandingLimit;

内部类

SendBufferWriter

主要的功能是对响应结果进行分块

private class SendBufferWriter extends Writer {
   
     
  private StringBuffer sb = new StringBuffer();

  // 两种情况会返回true
  // 1. 强制发送force设置为true
  // 2. 待发送内容大于2048
  private void checkFlush(boolean force) {
   
     
    if ((force && sb.length() > 0) || sb.length() > 2048) {
   
     
      sendBufferSync(ByteBuffer.wrap(sb.toString().getBytes()));
      // clear our internal buffer
      sb.setLength(0);
    }
  }

  @Override
  public void close() throws IOException {
   
     
    if (sb == null) return;
    checkFlush(true);
    sb = null; // clear out the ref to ensure no reuse
  }

  @Override
  public void flush() throws IOException {
   
     
    checkFlush(true);
  }

  @Override
  public void write(char[] cbuf, int off, int len) throws IOException {
   
     
    sb.append(cbuf, off, len);
    checkFlush(false);
  }
}

重要方法

构造方法

public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock,
                         SelectionKey sk, NIOServerCnxnFactory factory,
                         SelectorThread selectorThread) throws IOException {
   
     
  this.zkServer = zk;
  this.sock = sock;
  this.sk = sk;
  this.factory = factory;
  this.selectorThread = selectorThread;
  if (this.factory.login != null) {
   
     
    this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
  }
  if (zk != null) {
   
     
    outstandingLimit = zk.getGlobalOutstandingLimit();
  } else {
   
     
    outstandingLimit = 1;
  }
  sock.socket().setTcpNoDelay(true);
  /* set socket linger to false, so that socket close does not block */
  sock.socket().setSoLinger(false, -1);
  InetAddress addr = ((InetSocketAddress) sock.socket()
                      .getRemoteSocketAddress()).getAddress();
  authInfo.add(new Id("ip", addr.getHostAddress()));
  this.sessionTimeout = factory.sessionlessCnxnTimeout;
}

sendBuffer

public void sendBuffer(ByteBuffer bb) {
   
     
  if (LOG.isTraceEnabled()) {
   
     
    LOG.trace("Add a buffer to outgoingBuffers, sk " + sk
              + " is valid: " + sk.isValid());
  }
  // 将当前需要发送的ByteBuffer添加到队列中,等待异步发送
  outgoingBuffers.add(bb);
  // 
  requestInterestOpsUpdate();
}

sendBuffer

// 同步发送
void sendBufferSync(ByteBuffer bb) {
   
     
  try {
   
     
    if (bb != ServerCnxnFactory.closeConn) {
   
     
      if (sock.isOpen()) {
   
     
        // 配置为阻塞
        sock.configureBlocking(true);
        // 阻塞写出
        sock.write(bb);
      }
      // 更新统计值
      packetSent();
    }
  } catch (IOException ie) {
   
     
    LOG.error("Error sending data synchronously ", ie);
  }
}

doIO

处理io

读总共分为如下几步:

1、 使用lenBuffer读取四个字节;
2、 判断四个字节是指令还是长度;
3、 如果是四个字节的指令,那么会执行指令;
4、 如果是长度,那么会为incomingBuffer分配指定大小的空间,然后读取接收剩余内容,解析成请求并处理;

void doIO(SelectionKey k) throws InterruptedException {
   
     
  try {
   
     
    if (isSocketOpen() == false) {
   
     
      LOG.warn("trying to do i/o on a null socket for session:0x"
               + Long.toHexString(sessionId));

      return;
    }
    // 当前可读
    if (k.isReadable()) {
   
     
      // 将输入读取到incomingBuffer中
      int rc = sock.read(incomingBuffer);
      if (rc < 0) {
   
     
        throw new EndOfStreamException(
          "Unable to read additional data from client sessionid 0x"
          + Long.toHexString(sessionId)
          + ", likely client has closed socket");
      }
      // 接收输入的缓冲区已满
      if (incomingBuffer.remaining() == 0) {
   
     
        // 当前incomingBuffer中存放的是四个字母的command还是请求内容
        boolean isPayload;
        // 每次一个完整的读请求处理后,都会清空lenBuffer,并赋值给incomingBuffer
        // incomingBuffer标志一个完整读请求的开始
        if (incomingBuffer == lenBuffer) {
   
      // start of next request
          // 可读
          incomingBuffer.flip();
          // 从lenBuffer中读取整数
          // 如果代表的是长度,那么会为incomingBuffer分配指定长度空间,此时isPayLoad为true
          // 如果代表的是指令,那么此时isPayLoad为false,并且会使用CommandExecutor来执行指令
          isPayload = readLength(k);
          // 清空incomingBuffer
          incomingBuffer.clear();
        } else {
   
     
          // continuation
          isPayload = true;
        }
        
        
       	// 读取真实请求内容
        if (isPayload) {
   
      // not the case for 4letterword
          readPayload();
        }
        else {
   
     
          // four letter words take care
          // need not do anything else
          // 为四个字母的指令
          return;
        }
      }
    }
}

private boolean readLength(SelectionKey k) throws IOException {
   
     
  // Read the length, now get the buffer
  // 从lenBuffer中读取一个整数
  int len = lenBuffer.getInt();
  // 判断是否是四字母的指令
  if (!initialized && checkFourLetterWord(sk, len)) {
   
     
    return false;
  }
  if (len < 0 || len > BinaryInputArchive.maxBuffer) {
   
     
    throw new IOException("Len error " + len);
  }
  if (!isZKServerRunning()) {
   
     
    throw new IOException("ZooKeeperServer not running");
  }
  // len不是四个字母的指令,而是请求内容的长度
  // 分配空间
  incomingBuffer = ByteBuffer.allocate(len);
  return true;
}

private void readPayload() throws IOException, InterruptedException {
   
     
  // 如果incomingBuffer还有空间,代表还未读取完毕
  if (incomingBuffer.remaining() != 0) {
   
      // have we read length bytes?
    // 继续读取
    int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
    if (rc < 0) {
   
     
      throw new EndOfStreamException(
        "Unable to read additional data from client sessionid 0x"
        + Long.toHexString(sessionId)
        + ", likely client has closed socket");
    }
  }

  // 如果incomingBuffer没有空间,代表读取完毕
  if (incomingBuffer.remaining() == 0) {
   
      // have we read length bytes?
    // 更新统计值
    packetReceived();
    // 可读
    incomingBuffer.flip();
    // 当前仍未初始化
    if (!initialized) {
   
     
      // 处理连接请求
      readConnectRequest();
    } else {
   
     
      // 从inputBuffer中读取请求,并处理请求
      readRequest();
    }
    // 清空lenBuffer,并将incomingBuffer重新设置为lenBuffer,标志此次读取操作完成
    lenBuffer.clear();
    incomingBuffer = lenBuffer;
  }
}

if (k.isWritable()) {
   
     
  handleWrite(k);

  if (!initialized && !getReadInterest() && !getWriteInterest()) {
   
     
    throw new CloseRequestException("responded to info probe");
  }
}

void handleWrite(SelectionKey k) throws IOException, CloseRequestException {
   
     
  if (outgoingBuffers.isEmpty()) {
   
     
    return;
  }

  // 分配直接缓冲
  ByteBuffer directBuffer = NIOServerCnxnFactory.getDirectBuffer();
  if (directBuffer == null) {
   
     
    // 直接缓冲为null,不使用直接缓冲
    ByteBuffer[] bufferList = new ByteBuffer[outgoingBuffers.size()];
    // Use gathered write call. This updates the positions of the
    // byte buffers to reflect the bytes that were written out.
    // 将outgoingBuffer中的多个write call产生的响应写出
    sock.write(outgoingBuffers.toArray(bufferList));

    // Remove the buffers that we have sent
    // 将已经发送出去的buffer从outgoingBuffers中移除
    ByteBuffer bb;
    while ((bb = outgoingBuffers.peek()) != null) {
   
     
      if (bb == ServerCnxnFactory.closeConn) {
   
     
        throw new CloseRequestException("close requested");
      }
      if (bb.remaining() > 0) {
   
     
        break;
      }
      packetSent();
      outgoingBuffers.remove();
    }
  } else {
   
     
    // 直接缓冲不为null,清空直接缓冲
    directBuffer.clear();

    for (ByteBuffer b : outgoingBuffers) {
   
     
      // 响应的大小大于直接缓冲的大小
      // 缩小响应缓冲大小
      if (directBuffer.remaining() < b.remaining()) {
   
     
        b = (ByteBuffer) b.slice().limit(
          directBuffer.remaining());
      }
      // 记录b当前位置
      int p = b.position();
      // 将b添加到直接缓冲,此时会修改b的position
      directBuffer.put(b);
      // 将b的position修改回去
      b.position(p);
      if (directBuffer.remaining() == 0) {
   
     
        break;
      }
    }
    
    // 使直接缓冲可读
    directBuffer.flip();

    // 记录发送量
    int sent = sock.write(directBuffer);

    ByteBuffer bb;

    // Remove the buffers that we have sent
    // 通过send将outgoingBuffer中已经发送的buffer移除
    while ((bb = outgoingBuffers.peek()) != null) {
   
     
      if (bb == ServerCnxnFactory.closeConn) {
   
     
        throw new CloseRequestException("close requested");
      }
      if (sent < bb.remaining()) {
   
     
        // 当前buffer只发送了部分,更新位置
        bb.position(bb.position() + sent);
        break;
      }
      packetSent();
      /* We've sent the whole buffer, so drop the buffer */
      sent -= bb.remaining();
      outgoingBuffers.remove();
    }
  }
}

版权声明:「DDKK.COM 弟弟快看,程序员编程资料站」本站文章,版权归原作者所有

正文到此结束
本文目录