原创

07、ZooKeeper ZookeeperServer

继承层次

 

在之前分析zookeeper各个角色初始化的过程中,看到了每个角色都会创建一个相应的ZookeeperServer
ZookeeperServer代表服务器
QuorumZookeeperServer代表参与选举的服务器
LearnerZookeeperServer代表非Leader服务器
LeaderZookeeperServer代表Leader节点服务器
FollowerZookeeperServer代表Follower节点服务器
ObserverZookeeperServer代表Observer节点服务器
ReadOnlyZookeeperServer代表只读节点服务器

ZookeeperServer

public static interface SessionExpirer {
   
     
   // 过期指定的session
   void expire(Session session);
  
   // 获取当前节点的id
   long getServerId();
}

// 获取当前server的一些统计信息
public interface Provider {
   
     
    public long getOutstandingRequests();
    public long getLastProcessedZxid();
    public String getState();
    public int getNumAliveConnections();
    public long getDataDirSize();
    public long getLogDirSize();
}

public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
   
     
}

因为ZookeeperServer实现了SessionExpirer和ServerStatus.Provider接口,因此具有过期会话和查询当前服务器信息的能力

内部类

ChangeRecord

// ChangeRecord
static class ChangeRecord {
   
     
    ChangeRecord(long zxid, String path, StatPersisted stat, int childCount,
            List<ACL> acl) {
   
     
        this.zxid = zxid;
        this.path = path;
        this.stat = stat;
        this.childCount = childCount;
        this.acl = acl;
    }

	// 事务id
    long zxid;
	
	// 记录的路径
    String path;
	
    StatPersisted stat; /* Make sure to create a new object when changing */

    int childCount;

    List<ACL> acl; /* Make sure to create a new object when changing */

	// 拷贝
    ChangeRecord duplicate(long zxid) {
   
     
        StatPersisted stat = new StatPersisted();
        if (this.stat != null) {
   
     
            DataTree.copyStatPersisted(this.stat, stat);
        }
        return new ChangeRecord(zxid, path, stat, childCount,
                acl == null ? new ArrayList<ACL>() : new ArrayList<ACL>(acl));
    }
}

ChangeRecord用来在PrepRequestProcessor和FinalRequestProcessor之间传递信息

MissingSessionException

// MissingSessionException
public static class MissingSessionException extends IOException {
   
     
    private static final long serialVersionUID = 7467414635467261007L;

    public MissingSessionException(String msg) {
   
     
        super(msg);
    }
}

MissingSessionException代表事务丢失

State

// State
protected enum State {
   
     
    INITIAL, RUNNING, SHUTDOWN, ERROR
}

State代表当前状态

主要属性

// jmx bean
protected ZooKeeperServerBean jmxServerBean;
protected DataTreeBean jmxDataTreeBean;

// 默认的心跳时间
public static final int DEFAULT_TICK_TIME = 3000;
protected int tickTime = DEFAULT_TICK_TIME;

// 最小session过期时间
/** value of -1 indicates unset, use default */
protected int minSessionTimeout = -1;
// 最大session过期时间
/** value of -1 indicates unset, use default */
protected int maxSessionTimeout = -1;

// 事务跟踪器
protected SessionTracker sessionTracker;

// 事务日志
private FileTxnSnapLog txnLogFactory = null;

// zookeeper内部数据库
private ZKDatabase zkDb;

// 请求处理器
protected RequestProcessor firstProcessor;

// 当前状态
protected volatile State state = State.INITIAL;

private final AtomicInteger requestsInProcess = new AtomicInteger(0);

// 未处理changeRecord
final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>();
// this data structure must be accessed under the outstandingChanges lock
// 未处理changeRecord和其path之间的映射
final HashMap<String, ChangeRecord> outstandingChangesForPath =
    new HashMap<String, ChangeRecord>();

// 连接管理工厂
protected ServerCnxnFactory serverCnxnFactory;
protected ServerCnxnFactory secureServerCnxnFactory;

// 服务器统计信息
private final ServerStats serverStats;
// 当有重要的线程异常时会回调这个listener
private final ZooKeeperServerListener listener;

构造方法

默认构造方法

public ZooKeeperServer() {
   
     
    serverStats = new ServerStats(this);
    listener = new ZooKeeperServerListenerImpl(this);
}

// ZooKeeperServerListenerImpl
class ZooKeeperServerListenerImpl implements ZooKeeperServerListener {
   
     
    private static final Logger LOG = LoggerFactory
            .getLogger(ZooKeeperServerListenerImpl.class);

    private final ZooKeeperServer zkServer;

    ZooKeeperServerListenerImpl(ZooKeeperServer zkServer) {
   
     
        this.zkServer = zkServer;
    }

    @Override
    public void notifyStopping(String threadName, int exitCode) {
   
     
        LOG.info("Thread {} exits, error code {}", threadName, exitCode);
        zkServer.setState(State.ERROR);
    }
}

ZooKeeperServerListenerImpl默认的ZooKeeperServerListener的实现类,当重要线程出现致命错误时,会通知该类,该类会将当前的服务器状态设置为失败

ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, ZKDatabase zkDb)

public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
            int minSessionTimeout, int maxSessionTimeout, ZKDatabase zkDb) {
   
     
    serverStats = new ServerStats(this);
    // 设置事务日志
    this.txnLogFactory = txnLogFactory;
    this.txnLogFactory.setServerStats(this.serverStats);
    this.zkDb = zkDb;
    this.tickTime = tickTime;
    // 如果为-1,会设置为2倍的心跳时间
    setMinSessionTimeout(minSessionTimeout);
    // 如果为-1,会设置为20倍的心跳时间
    setMaxSessionTimeout(maxSessionTimeout);
    listener = new ZooKeeperServerListenerImpl(this);
    LOG.info("Created server with tickTime " + tickTime
            + " minSessionTimeout " + getMinSessionTimeout()
            + " maxSessionTimeout " + getMaxSessionTimeout()
            + " datadir " + txnLogFactory.getDataDir()
            + " snapdir " + txnLogFactory.getSnapDir());
}

ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime)

// ZooKeeperServer
public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime)
            throws IOException {
   
     
 // 指定心跳时间
 // 最小事务过期时间和最大事务过期时间设置为默认值
 this(txnLogFactory, tickTime, -1, -1, new ZKDatabase(txnLogFactory));
}

ZooKeeperServer(FileTxnSnapLog txnLogFactory)

public ZooKeeperServer(FileTxnSnapLog txnLogFactory)
        throws IOException
{
   
     
	// 使用默认心跳时间
	// 最小事务过期时间和最大事务过期时间设置为默认值
    this(txnLogFactory, DEFAULT_TICK_TIME, -1, -1, new ZKDatabase(txnLogFactory));
}

ZooKeeperServer(File snapDir, File logDir, int tickTime)

public ZooKeeperServer(File snapDir, File logDir, int tickTime)
            throws IOException {
   
     
 // 使用指定的快照目录和事务日志目录创建文件快照日志
 this( new FileTxnSnapLog(snapDir, logDir),
            tickTime);
}

重要方法

startup

public synchronized void startup() {
   
     
	// 1. 创建SessionTracker
    if (sessionTracker == null) {
   
     
        createSessionTracker();
    }
   	// 2. 启动SessionTracker
    startSessionTracker();
    // 3. 设置请求处理链
    setupRequestProcessors();
    // 4. 注册jmx
    registerJMX();

	// 5. 设置当前状态为running
    setState(State.RUNNING);
    notifyAll();
}

protected void createSessionTracker() {
   
     
    sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
            tickTime, createSessionTrackerServerId, getZooKeeperServerListener());
}

setupRequestProcessor

setupRequestProcessor用来设置请求处理链

// ZookeeperServer
protected void setupRequestProcessors() {
   
     
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor syncProcessor = new SyncRequestProcessor(this,
            finalProcessor);
    ((SyncRequestProcessor)syncProcessor).start();
    firstProcessor = new PrepRequestProcessor(this, syncProcessor);
    ((PrepRequestProcessor)firstProcessor).start();
}

loadData

public void loadData() throws IOException, InterruptedException {
   
     
    /*
     * When a new leader starts executing Leader#lead, it 
     * invokes this method. The database, however, has been
     * initialized before running leader election so that
     * the server could pick its zxid for its initial vote.
     * It does it by invoking QuorumPeer#getLastLoggedZxid.
     * Consequently, we don't need to initialize it once more
     * and avoid the penalty of loading it a second time. Not 
     * reloading it is particularly important for applications
     * that host a large database.
     * 
     * The following if block checks whether the database has
     * been initialized or not. Note that this method is
     * invoked by at least one other method: 
     * ZooKeeperServer#startdata.
     *  
     * See ZOOKEEPER-1642 for more detail.
     */
    // 1. 设置事务id
    // 判断内存数据库是否初始化过
    if(zkDb.isInitialized()){
   
     
    	// 初始化过,使用上一次处理过的事务id
        setZxid(zkDb.getDataTreeLastProcessedZxid());
    }
    else {
   
     
   		// 没有初始化过,使用快照文件和事务日志文件来恢复内存数据库
        setZxid(zkDb.loadDataBase());
    }
    
    // Clean up dead sessions
    // 2. 杀死过期的事务
    LinkedList<Long> deadSessions = new LinkedList<Long>();
    for (Long session : zkDb.getSessions()) {
   
     
        if (zkDb.getSessionWithTimeOuts().get(session) == null) {
   
     
            deadSessions.add(session);
        }
    }

    for (long session : deadSessions) {
   
     
        // XXX: Is lastProcessedZxid really the best thing to use?
       	// 清除事务
        killSession(session, zkDb.getDataTreeLastProcessedZxid());
    }

    // Make a clean snapshot
    // 3. 进行快照
    takeSnapshot();
}

// ZookeeperServer
protected void killSession(long sessionId, long zxid) {
   
     
   // 1. 修改内存数据库,将当前session创建的临时节点删除
   zkDb.killSession(sessionId, zxid);
   if (LOG.isTraceEnabled()) {
   
     
       ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                                    "ZooKeeperServer --- killSession: 0x"
               + Long.toHexString(sessionId));
   }
   // 从sessionTracker中移除当前事务
   if (sessionTracker != null) {
   
     
       sessionTracker.removeSession(sessionId);
   }
}

submitRequest

public void submitRequest(Request si) {
   
     
    if (firstProcessor == null) {
   
     
        synchronized (this) {
   
     
            try {
   
     
                // 当设置完请求处理链之后,会将状态设置为running,这里会循环等待设置的完成
                while (state == State.INITIAL) {
   
     
                    wait(1000);
                }
            } catch (InterruptedException e) {
   
     
                LOG.warn("Unexpected interruption", e);
            }
            if (firstProcessor == null || state != State.RUNNING) {
   
     
                throw new RuntimeException("Not started");
            }
        }
    }
    try {
   
     
    	// 更新事务过期时间
        touch(si.cnxn);
        // 验证请求类型是否合法
        boolean validpacket = Request.isValid(si.type);
        if (validpacket) {
   
     
        	// 将请求交给请求处理链处理
            firstProcessor.processRequest(si);
            if (si.cnxn != null) {
   
     
                incInProcess();
            }
        } else {
   
     
            LOG.warn("Received packet at server of unknown type " + si.type);
            new UnimplementedRequestProcessor().processRequest(si);
        }
    } catch (MissingSessionException e) {
   
     
        if (LOG.isDebugEnabled()) {
   
     
            LOG.debug("Dropping request: " + e.getMessage());
        }
    } catch (RequestProcessorException e) {
   
     
        LOG.error("Unable to process request:" + e.getMessage(), e);
    }
}

LeaderZookeeperServer

Leader节点会使用LeaderZookeeperServer作为自己的服务器实现

重要方法

setupRequestProcessor

// LeaderZookeeperServer
protected void setupRequestProcessors() {
   
     
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
    commitProcessor = new CommitProcessor(toBeAppliedProcessor,
            Long.toString(getServerId()), false,
            getZooKeeperServerListener());
    commitProcessor.start();
    ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
            commitProcessor);
    proposalProcessor.initialize();
    prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
    prepRequestProcessor.start();
    firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);

    setupContainerManager();
}

FollowerZookeeperServer

Follower节点使用的是FollowerZookeeperServer作为自己的服务器实现

主要属性

// 待同步请求
ConcurrentLinkedQueue<Request> pendingSyncs;

// 待处理的事务
LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<Request>();

重要方法

setupRequestProcessors

protected void setupRequestProcessors() {
   
     
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    commitProcessor = new CommitProcessor(finalProcessor,
            Long.toString(getServerId()), true, getZooKeeperServerListener());
    commitProcessor.start();
    firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
    ((FollowerRequestProcessor) firstProcessor).start();
    syncProcessor = new SyncRequestProcessor(this,
            new SendAckRequestProcessor((Learner)getFollower()));
    syncProcessor.start();
}

logRequest

public void logRequest(TxnHeader hdr, Record txn) {
   
     
    Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
    if ((request.zxid & 0xffffffffL) != 0) {
   
     
        pendingTxns.add(request);
    }
    // 将请求交给syncProcessor处理,SyncProcessor会将请求日志落盘
    syncProcessor.processRequest(request);
}

当Follower接收到Leader端发来的proposal请求时,会调用logRequest来记录事务日志

sync

// LeaderZookeeperServer
synchronized public void sync(){
   
     
	// 当前没有待同步请求
    if(pendingSyncs.size() ==0){
   
     
        LOG.warn("Not expecting a sync.");
        return;
    }
	
	// 取出待同步请求,交给commitProcessor进行处理
    Request r = pendingSyncs.remove();
	commitProcessor.commit(r);
}

当客户端向follower发送sync请求,想要follower和leader数据进行同步,FollowerRequestProcessor处理到该请求,会调用sync方法

public void run() {
   
     
	// 省略
	switch (request.type) {
   
     
       case OpCode.sync:
       	   // 将同步请求添加到队列中,并向leader发起同步请求
           zks.pendingSyncs.add(request);
           zks.getFollower().request(request);
           break;
}

commit

// FollowerZookeeperServer
public void commit(long zxid) {
   
     
    if (pendingTxns.size() == 0) {
   
     
        LOG.warn("Committing " + Long.toHexString(zxid)
                + " without seeing txn");
        return;
    }
    // 1. 匹配当前提交的请求和当前未处理的最老的请求
    long firstElementZxid = pendingTxns.element().zxid;
    if (firstElementZxid != zxid) {
   
     
        LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
                + " but next pending txn 0x"
                + Long.toHexString(firstElementZxid));
        System.exit(12);
    }
    
    // 2. 将请求交给CommitProcessor
    Request request = pendingTxns.remove();
    commitProcessor.commit(request);
}

版权声明:「DDKK.COM 弟弟快看,程序员编程资料站」本站文章,版权归原作者所有

正文到此结束
本文目录