跳到主要内容

07、RocketMQ 源码 - Producer发送消息的总体流程

  • 单向发送: 把消息发向Broker服务器, 不管Broker是否接收, 只管发, 不管结果。
  • 同步发送: 把消息发向Broker服务器, 如果Broker成功接收, 可以得到Broker的响应。
  • 异步发送: 把消息发向Broker服务器, 如果Broker成功接收, 可以得到Broker的响应。异步所以发送消息后, 不用等待, 等到Broker服务器的响应调用回调。

DefaultMQProducer提供了很多send方法的重载:

 
 

1.send源码入口

1.1 同步消息

 

public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
   
     
    //调用defaultMQProducerImpl#send发送消息
    return this.defaultMQProducerImpl.send(msg);
}

defaultMQProducerImpl#send发送消息。
 

调用另一个send(), 超时时间为3s。

public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
   
     
    //调用另一个send方法,设置超时时间参数,默认3000ms
    return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}

 

/**
 * DefaultMQProducerImpl的方法
 *
 * @param msg     消息
 * @param timeout 超时时间,毫秒值
 */
public SendResult send(Message msg,
                       long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
   
     
    //调用另一个sendDefaultImpl方法,设置消息发送模式为SYNC,即同步;设置回调函数为null
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

该方法内部又调用另一个sendDefaultImpl方法, 设置消息发送方式为SYNC, 为同步, 设置回调函数为null。

1.2 单向消息

单向消息使用sendOneway发送。

 

public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException
 {
   
     
    //根据namespace设置topic
    msg.setTopic(withNamespace(msg.getTopic()));
    //调用defaultMQProducerImpl#sendOneway发送消息
    this.defaultMQProducerImpl.sendOneway(msg);
}

defaultMQProducerImpl#sendOneway。

public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
   
     
    try {
   
     
        //调用sendDefaultImpl方法,设置消息发送模式为ONEWAY,即单向;设置回调函数为null;设置超时时间参数,默认3000ms
        this.sendDefaultImpl(msg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout());
    } catch (MQBrokerException e) {
   
     
        throw new MQClientException("unknown exception", e);
    }
}

最终调用sendDefaultImpl方法, 发送模式为ONEWAY, 设置回调函数为null, 超时时间为3s。

1.3 异步消息

异步消息使用带有callback函数的send方法发送。

 

public void send(Message msg, SendCallback sendCallback) throws MQClientException,
 RemotingException, InterruptedException {
   
     
    //根据namespace设置topic
    msg.setTopic(withNamespace(msg.getTopic()));
    //调用defaultMQProducerImpl#send发送消息,带有sendCallback参数
    this.defaultMQProducerImpl.send(msg, sendCallback);
}

该方法内部调用defaultMQProducerImpl#send方法发送消息, 带sendCallback参数。

public void send(Message msg, SendCallback sendCallback) throws MQClientException, 
RemotingException, InterruptedException {
   
     
    //该方法内部又调用另一个send方法,设置超时时间参数,默认3000ms。
    send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
}

public void send(final Message msg, final SendCallback sendCallback, final long timeout)
        throws MQClientException, RemotingException, InterruptedException {
   
     
    //调用起始时间
    final long beginStartTime = System.currentTimeMillis();
    //获取异步发送执行器线程池
    ExecutorService executor = this.getAsyncSenderExecutor();
    try {
   
     
        /*
         * 使用线程池异步的执行sendDefaultImpl方法,即异步发送消息
         */
        executor.submit(new Runnable() {
   
     
            @Override
            public void run() {
   
     
                /*
                 * 发送之前计算超时时间,如果超时则不发送,直接执行回调函数onException方法
                 */
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeout > costTime) {
   
     
                    try {
   
     
                        //调用sendDefaultImpl方法执行发送操作
                        sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
                    } catch (Exception e) {
   
     
                        //抛出异常,执行回调函数onException方法
                        sendCallback.onException(e);
                    }
                } else {
   
     
                    //超时,执行回调函数onException方法
                    sendCallback.onException(
                            new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
                }
            }

        });
    } catch (RejectedExecutionException e) {
   
     
        throw new MQClientException("executor rejected ", e);
    }

}

方法内部会获取获取异步发送执行器线程池, 使用线程池异步的执行sendDefaultImpl方法, 即异步发送。

发送之前计算超时时间, 如果超时则执行回调函数onException()。

2.sendDefaultImpl发送消息实现

该方法位于DefaultMQProducerImpl中, 无论是同步, 异步, 还是单向, 最后调用的都是sendDefaultImpl方法。

1、 makeSureStateOK方法,确定此producer的服务状态正常,如果服务状态不是RUNNING,那么抛出异常;
2、 检查消息的合法性;
3、 调用tryToFindTopicPublishInfo方法,尝试查找消息的一个topic路由,进行发送消息;
4、 计算循环发送消息的总次数timesTotal,默认情况下,同步为3,允许重试2次,其他模式为1,即不允许重试实际上,异步发送消息可以最多重试2次,不是这里实现的;
5、 调用selectOneMessageQueue方法,选择一个队列MessageQueue,支持失败故障转移;
6、 调用sendKernelImpl方法发送消息,同步、异步、单向发送消息都是这个方法实现的;
7、 调用updateFaultItem方法,更新本地错误表缓存数据,用于延迟时间的故障转移的功能;
8、 根据发送模式的不同,如果是异步或者单向发送则直接返回,如果是同步的话,如果开启了retryAnotherBrokerWhenNotStoreOK,那么如果返回值不返回SEND_OK状态,则重新执行;
9、 此过程中,如果抛出RemotingException、MQClientException、以及MQBrokerException异常,那么会重试,如果抛出InterruptedException,或者超时则不会重试;

 
 

/**
 * DefaultMQProducerImpl的方法
 *
 * @param msg               方法
 * @param communicationMode 通信模式
 * @param sendCallback      回调方法
 * @param timeout           超时时间
 */
private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
   
     
    /*
     * 1 确定此producer的服务状态正常,如果服务状态不是RUNNING,那么抛出异常
     */
    this.makeSureStateOK();
    /*
     * 2 校验消息的合法性
     */
    Validators.checkMessage(msg, this.defaultMQProducer);
    //生成本次调用id
    final long invokeID = random.nextLong();
    //开始时间戳
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    //结束时间戳
    long endTimestamp = beginTimestampFirst;
    /*
     * 3 尝试查找消息的一个topic路由,用以发送消息
     */
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    //找到有效的topic信息
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
   
     
        boolean callTimeout = false;
        MessageQueue mq = null;
        Exception exception = null;
        SendResult sendResult = null;
        /*
         * 4 计算发送消息的总次数
         * 同步模式为3,即默认允许重试2次,可更改重试次数;其他模式为1,即不允许重试,不可更改
         */
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        int times = 0;
        //记录每一次重试时候发送消息目标Broker名字的数组
        String[] brokersSent = new String[timesTotal];
        /*
         * 在循环中,发送消息,包含消息重试的逻辑,总次数默认不超过3
         */
        for (; times < timesTotal; times++) {
   
     
            //上次使用过的broker,可以为空,表示第一次选择
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            /*
             * 5 选择一个消息队列MessageQueue
             */
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
            if (mqSelected != null) {
   
     
                mq = mqSelected;
                //设置brokerName
                brokersSent[times] = mq.getBrokerName();
                try {
   
     
                    //调用的开始时间
                    beginTimestampPrev = System.currentTimeMillis();
                    //如果还有可调用次数,那么
                    if (times > 0) {
   
     
                        //在重新发送期间用名称空间重置topic
                        msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                    }
                    //现在调用的开始时间 减去 开始时间,判断时候在调用发起之前就超时了
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    //如果已经超时了,那么直接结束循环,不再发送
                    //即超时的时候,即使还剩下重试次数,也不会再继续重试
                    if (timeout < costTime) {
   
     
                        callTimeout = true;
                        break;
                    }
                    /*
                     * 6 异步、同步、单向发送消息
                     */
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                    //方法调用结束时间戳
                    endTimestamp = System.currentTimeMillis();
                    /*
                     * 7 更新本地错误表缓存数据,用于延迟时间的故障转移的功能
                     */
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    /*
                     * 8 根据发送模式执行不同的处理
                     */
                    switch (communicationMode) {
   
     
                        //异步和单向模式直接返回null
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            //同步模式,如果开启了retryAnotherBrokerWhenNotStoreOK开关,那么如果不是返回SEND_OK状态,则仍然会执行重试发送
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
   
     
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
   
     
                                    continue;
                                }
                            }
                            //如果发送成功,则返回
                            return sendResult;
                        default:
                            break;
                    }
                } catch (RemotingException e) {
   
     
                    //RemotingException异常,会执行重试
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (MQClientException e) {
   
     
                    //MQClientException异常,会执行重试
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (MQBrokerException e) {
   
     
                    //MQBrokerException异常
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    //如果返回的状态码属于一下几种,则支持重试:
                    //ResponseCode.TOPIC_NOT_EXIST,
                    //ResponseCode.SERVICE_NOT_AVAILABLE,
                    //ResponseCode.SYSTEM_ERROR,
                    //ResponseCode.NO_PERMISSION,
                    //ResponseCode.NO_BUYER_ID,
                    //ResponseCode.NOT_IN_CURRENT_UNIT

                    if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
   
     
                        continue;
                    } else {
   
     
                        //其他状态码不支持重试,如果有结果则返回,否则直接抛出异常
                        if (sendResult != null) {
   
     
                            return sendResult;
                        }

                        throw e;
                    }
                } catch (InterruptedException e) {
   
     
                    //InterruptedException异常,不会执行重试
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());

                    log.warn("sendKernelImpl exception", e);
                    log.warn(msg.toString());
                    throw e;
                }
            } else {
   
     
                break;
            }
        }
        /*
         * 抛出异常的操作
         */
        if (sendResult != null) {
   
     
            return sendResult;
        }

        String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                times,
                System.currentTimeMillis() - beginTimestampFirst,
                msg.getTopic(),
                Arrays.toString(brokersSent));

        info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

        MQClientException mqClientException = new MQClientException(info, exception);
        if (callTimeout) {
   
     
            throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
        }

        if (exception instanceof MQBrokerException) {
   
     
            mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
        } else if (exception instanceof RemotingConnectException) {
   
     
            mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
        } else if (exception instanceof RemotingTimeoutException) {
   
     
            mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
        } else if (exception instanceof MQClientException) {
   
     
            mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
        }

        throw mqClientException;
    }

    validateNameServerSetting();

    throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

2.1 makeSureStateOK确定生产者服务状态

确定此producer的服务状态正常, 如果服务状态不是RUNNING, 那么抛出异常。

 

/**
 * DefaultMQProducerImpl的方法
 */
private void makeSureStateOK() throws MQClientException {
   
     
    //服务状态不是RUNNING,那么抛出MQClientException异常。
    if (this.serviceState != ServiceState.RUNNING) {
   
     
        throw new MQClientException("The producer service state not OK, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
    }
}

2.2 checkMessage校验消息的合法性

1、 如果msg消息为null,抛出异常;
2、 校验topic,如果topic为空,或者长度大于127字符,或者topic中有非法字符则抛出异常,如果当前topic是不为允许使用的系统topic,抛出异常;
3、 校验消息体,如果消息体为null,或者为空数组,或者消息字节数组长度大于4M,抛出异常;
 

/**
 * Validators的方法
 */
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
   
     
    //如果消息为null,抛出异常
    if (null == msg) {
   
     
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
    }
    /*
     * 校验topic
     */
    //如果topic为空,或者长度大于127个字符,或者topic的字符串不符合 "^[%|a-zA-Z0-9_-]+$"模式,即包含非法字符,那么抛出异常
    Validators.checkTopic(msg.getTopic());
    //如果当前topic是不为允许使用的系统topic SCHEDULE_TOPIC_XXXX,那么抛出异常
    Validators.isNotAllowedSendTopic(msg.getTopic());

    // body
    //如果消息体为null,那么抛出异常
    if (null == msg.getBody()) {
   
     
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
    }
    //如果消息体为空数组,那么抛出异常
    if (0 == msg.getBody().length) {
   
     
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
    }
    //如果消息 字节数组长度大于4,194,304,即消息的大小大于4M,那么抛出异常
    if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
   
     
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
                "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
    }
}

public static void checkTopic(String topic) throws MQClientException {
   
     
    //如果topic为空,那么抛出异常
    if (UtilAll.isBlank(topic)) {
   
     
        throw new MQClientException("The specified topic is blank", null);
    }
    //如果topic长度大于127个字符,那么抛出异常
    if (topic.length() > TOPIC_MAX_LENGTH) {
   
     
        throw new MQClientException(
                String.format("The specified topic is longer than topic max length %d.", TOPIC_MAX_LENGTH), null);
    }
    //如果topic字符串包含非法字符,那么抛出异常
    if (isTopicOrGroupIllegal(topic)) {
   
     
        throw new MQClientException(String.format(
                "The specified topic[%s] contains illegal characters, allowing only %s", topic,
                "^[%|a-zA-Z0-9_-]+$"), null);
    }
}

2.3 tryToFindTopicPublishInfo查找topic的发布信息

该方法用于查找指定topic的发布信息TopicPublishInfo。

1、 首先在本地缓存topicPublishInfoTable获取;
2、 updateTopicRouteInfoFromNameServer()获取,从nameServer同步此topic的路由配置信息;
3、 从nameServer同步topic数据;

 

/**
 * DefaultMQProducerImpl的方法
 * <p>
 * 查找指定topic的推送信息
 */
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
   
     
    //尝试直接从producer的topicPublishInfoTable中获取topic信息
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    //如果没有获取到有效信息,
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
   
     
        //那么立即创建一个TopicPublishInfo
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        //立即从nameServer同步此topic的路由配置信息,并且更新本地缓存
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        //再次获取topicPublishInfo
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }
    //如果找到的路由信息是可用的,直接返回
    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
   
     
        return topicPublishInfo;
    } else {
   
     
        //再次从nameServer同步topic的数据,不过这次使用默认的topic “TBW102”去找路由配置信息作为本topic参数信息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

  • 首先在本地缓存topicPublishInfoTable获取, 如果没有获取到, 调用updateTopicRouteInfoFromNameServer方法从nameServer同步此topic的路由配置信息, 并更新缓存。如果还是没有获取到有效数据, 再次从nameServer同步topic数据, 不过这次默认的topic是 “TBW102”去找路由配置信息作为本topic参数信息。

TopicPublishInfo: topic的发布信息

 

/**
 * 是否是顺序消息
 */
private boolean orderTopic = false;
/**
 * 是否包含路由信息
 */
private boolean haveTopicRouterInfo = false;
/**
 * topic的消息队列集合
 */
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
/**
 * 当前线程线程的消息队列的下标,循环选择消息队列使用+1
 */
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
/**
 * topic路由信息,包括topic的队列信息queueDatas,topic的broker信息brokerDatas,顺序topic配置orderTopicConf,消费过滤信息filterServerTable等属性
 */
private TopicRouteData topicRouteData;

2.4 计算发送次数timesTotal

int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + 
this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

计算循环发送消息的总次数timesTotal, 默认情况下, 同步为3, 允许重试2次, 其他模式为1, 即不允许重试。实际上, 异步发送消息可以最多重试2次, 不是这里实现的。是MQClientAPIImpl#sendMessage方法中。

2.5 selectOneMessageQueue选择消息队列

方法内部调用mqFaultStrategy#selectOneMessageQueue方法:

 

/**
 * DefaultMQProducerImpl的方法
 *
 * 选择一个消息队列
 * @param tpInfo topic信息
 * @param lastBrokerName 上次使用过的broker
 */
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
   
     
    //调用mqFaultStrategy#selectOneMessageQueue方法
    return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}

mqFaultStrategy#selectOneMessageQueue方法支持故障转移机制:

1、 首先判断是否开启了发送延迟故障转移机制,即判断sendLatencyFaultEnable属性是否为true,默认为false不开启;

1、 如果开启的话,首先仍然是遍历消息队列,按照轮询的方式选取一个消息队列,当消息队列可用时,选择消息队列的工作就结束,否则循环选择其他队列如果该mq的broker不存在LatencyFaultTolerance维护的faultItemTable集合属性中,或者当前时间戳大于该broker下一次开始可用的时间戳,则表示无障碍;
2、 如果没有选出无故障mq,那么从LatencyFaultTolerance维护的不是最好的broker集合faultItemTable中随机选择一个broker,随后判断如果写队列数大于0,那么选择该Broker然后遍历消息队列,采用取模的方式获取一个队列,重置其brokerName和queueId,进行消息发送;
3、 如果上面的步骤抛出了异常,那么遍历消息队列,采用取模的方式获取一个队列;
2、 如果没有开启延迟故障转移机制,那么遍历消息队列,采用取模轮询的方式获取一个brokerName与lastBrokerName不相等的队列,即不会再次选择上次发送失败的broker如果没有找到一个不同broker的mq,那么退回到轮询的方式;

selectOneMessageQueue方法选择mq的时候的故障转移机制, 目的是为了保证每次消息尽快的发送, 是一种高可用手段。

1、 延迟时间的故障转移,消息队列选择时候,可用过滤mq认为不可用的broker,以免不断为宕机的broker发送消息,选取一个延迟比较短的broker,实现消息发送高可用;
2、 没有开启延迟时间的故障转移的时候,在轮询选择mq的时候,不会选择上一次发送失败的broker,实现消息发送高可用;

/**
 * MQFaultStrategy的方法
 * <p>
 * 选择一个消息队列,支持故障延迟转移
 *
 * @param tpInfo         topic信息
 * @param lastBrokerName 上次使用过的broker
 */
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
   
     
    /*
     * 判断是否开启了发送延迟故障转移机制,默认false不打开
     * 如果开启了该机制,那么每次选取topic下对应的queue时,会基于之前执行的耗时,在有存在符合条件的broker的前提下,优选选取一个延迟较短的broker,否则再考虑随机选取。
     */
    if (this.sendLatencyFaultEnable) {
   
     
        try {
   
     
            //当前线程线程的消息队列的下标,循环选择消息队列使用+1
            int index = tpInfo.getSendWhichQueue().incrementAndGet();
            //遍历消息队列,采用取模的方式获取一个队列,即轮询的方式
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
   
     
                //取模
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                //获取该消息队列
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                //如果当前消息队列是可用的,即无故障,那么直接返回该mq
                //如果该broker不存在LatencyFaultTolerance维护的faultItemTable集合属性中,或者当前时间已经大于该broker下一次开始可用的时间点,表示无故障
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                    return mq;
            }
            //没有选出无故障的mq,那么一个不是最好的broker集合中随机选择一个
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            //如果写队列数大于0,那么选择该broker
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
   
     
                //遍历消息队列,采用取模的方式获取一个队列,即轮询的方式
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
   
     
                    //重置其brokerName,queueId,进行消息发送
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                }
                return mq;
            } else {
   
     
                //如果写队列数小于0,那么移除该broker
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
   
     
            log.error("Error occurred when selecting message queue", e);
        }
        //如果上面的步骤抛出了异常,那么遍历消息队列,采用取模的方式获取一个队列,即轮询的方式
        return tpInfo.selectOneMessageQueue();
    }
    //如果没有发送延迟故障转移机制,那么那么遍历消息队列,即采用取模轮询的方式
    //获取一个brokerName与lastBrokerName不相等的队列,即不会再次选择上次发送失败的broker
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

selectOneMessageQueue方法有两个重载方法, 一个无参的, 一个有参的。

无参的表示选择mq无限制:

/**
 * TopicPublishInfo的方法
 * <p>
 * 轮询的选择一个mq
 */
public MessageQueue selectOneMessageQueue() {
   
     
    //获取下一个index
    int index = this.sendWhichQueue.incrementAndGet();
    //取模计算索引
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    //获取该索引的mq
    return this.messageQueueList.get(pos);
}

有参数, 其参数是上一次发送失败的brokerName, 表示不会选择上一次失败的brokerName的mq。如果最后没有选择出来, 那么走轮询的逻辑。

/**
 * TopicPublishInfo的方法
 *
 * @param lastBrokerName 上一次发送失败的brokerName
 */
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
   
     
    //如果lastBrokerName为null,即第一次发送,那么轮询选择一个
    if (lastBrokerName == null) {
   
     
        return selectOneMessageQueue();
    } else {
   
     
        for (int i = 0; i < this.messageQueueList.size(); i++) {
   
     
            //轮询选择一个mq
            int index = this.sendWhichQueue.incrementAndGet();
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            //如果mq的brokerName不等于lastBrokerName,就返回,否则选择下一个
            if (!mq.getBrokerName().equals(lastBrokerName)) {
   
     
                return mq;
            }
        }
        //没有选出来,那么轮询选择一个
        return selectOneMessageQueue();
    }
}

2.6 sendKernelImpl发送消息

选择了队列后, 调用sendKernelImpl发送消息

1、 首先调用findBrokerAddressInPublish方法从brokerAddrTable中查找Masterbroker地址如果找不到,tryToFindTopicPublishInfo从nameServer远程拉取配置,并更新本地缓存,再次尝试获取Masterbroker地址;
2、 调用brokerVIPChannel判断是否开启vip通道,如果开启了,那么将brokerAddr的port–2,vip通道的端口为普通端口-2;
3、 如果不是批量消息,那么设置唯一的uniqId;
4、 如果不是批量消息,且消息体大于4K,那么压缩消息;
5、 如果存在CheckForbiddenHook,则执行checkForbidden钩子方法如果存在SendMessageHook,则执行sendMessageBefore钩子方法;
6、 设置请求头信息SendMessageRequestHeader,请求头包含各种基本属性,producerGroup,topic,queueId,并且将消息重试次数和最大重试次数存入请求头中;
7. 根据不同的发送模式发送消息, 如果是异步, 需要先克隆并还原消息, 最终异步, 同步, 单向都是调用MQClientAPIImpl#sendMessage方法发送消息的。 8. 如果MQClientAPIImpl#sendMessage方法正常发送或者抛出RemotingException、MQBrokerException、InterruptedException异常, 判断如果存在SendMessageHook, 执行sendMessageAfter钩子方法。 9、 finally中对消息进行恢复;

 

 

/**
 * DefaultMQProducerImpl的方法
 * 发送消息
 *
 * @param msg               消息
 * @param mq                mq
 * @param communicationMode 发送模式
 * @param sendCallback      发送回调
 * @param topicPublishInfo  topic信息
 * @param timeout           超时时间
 * @return 发送结果
 */
private SendResult sendKernelImpl(final Message msg,
                                  final MessageQueue mq,
                                  final CommunicationMode communicationMode,
                                  final SendCallback sendCallback,
                                  final TopicPublishInfo topicPublishInfo,
                                  final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
   
     
    //开始时间
    long beginStartTime = System.currentTimeMillis();
    /*
     * 1 根据brokerName从brokerAddrTable中查找broker地址
     */
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    //如果本地找不到 broker 的地址
    if (null == brokerAddr) {
   
     
        /*
         * 2 从nameServer远程拉取配置,并更新本地缓存
         * 该方法此前就学习过了
         */
        tryToFindTopicPublishInfo(mq.getTopic());
        //再次获取地址
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }

    SendMessageContext context = null;
    if (brokerAddr != null) {
   
     
        /*
         * 3 vip通道判断
         */
        brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

        byte[] prevBody = msg.getBody();
        try {
   
     
            //for MessageBatch,ID has been set in the generating process
            /*
             * 4 如果不是批量消息,那么尝试生成唯一uniqId,即UNIQ_KEY属性。MessageBatch批量消息在生成时就已经设置uniqId
             * uniqId也被称为客户端生成的msgId,从逻辑上代表唯一一条消息
             */
            if (!(msg instanceof MessageBatch)) {
   
     
                MessageClientIDSetter.setUniqID(msg);
            }
            /*
             * 设置nameSpace为实例Id
             */
            boolean topicWithNamespace = false;
            if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
   
     
                msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
                topicWithNamespace = true;
            }
            //消息标识符
            int sysFlag = 0;
            //消息压缩标识
            boolean msgBodyCompressed = false;
            /*
             * 5 尝试压缩消息
             */
            if (this.tryToCompressMessage(msg)) {
   
     
                sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                msgBodyCompressed = true;
            }
            //事务消息标志,prepare消息
            final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
            if (Boolean.parseBoolean(tranMsg)) {
   
     
                sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
            }
            /*
             * 6 如果存在CheckForbiddenHook,则执行checkForbidden方法
             * 为什么叫禁止钩子呢,可能是想要使用者将不可发送消息的检查放在这个钩子函数里面吧(猜测)
             */
            if (hasCheckForbiddenHook()) {
   
     
                CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                checkForbiddenContext.setCommunicationMode(communicationMode);
                checkForbiddenContext.setBrokerAddr(brokerAddr);
                checkForbiddenContext.setMessage(msg);
                checkForbiddenContext.setMq(mq);
                checkForbiddenContext.setUnitMode(this.isUnitMode());
                this.executeCheckForbiddenHook(checkForbiddenContext);
            }
            /*
             * 7 如果存在SendMessageHook,则执行sendMessageBefore方法
             */
            if (this.hasSendMessageHook()) {
   
     
                context = new SendMessageContext();
                context.setProducer(this);
                context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                context.setCommunicationMode(communicationMode);
                context.setBornHost(this.defaultMQProducer.getClientIP());
                context.setBrokerAddr(brokerAddr);
                context.setMessage(msg);
                context.setMq(mq);
                context.setNamespace(this.defaultMQProducer.getNamespace());
                String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (isTrans != null && isTrans.equals("true")) {
   
     
                    context.setMsgType(MessageType.Trans_Msg_Half);
                }

                if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
   
     
                    context.setMsgType(MessageType.Delay_Msg);
                }
                this.executeSendMessageHookBefore(context);
            }
            /*
             * 8 设置请求头信息
             */
            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
            requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
            requestHeader.setTopic(msg.getTopic());
            requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
            requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setSysFlag(sysFlag);
            requestHeader.setBornTimestamp(System.currentTimeMillis());
            requestHeader.setFlag(msg.getFlag());
            requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
            requestHeader.setReconsumeTimes(0);
            requestHeader.setUnitMode(this.isUnitMode());
            requestHeader.setBatch(msg instanceof MessageBatch);
            //针对重试消息的处理
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
   
     
                //获取消息重新消费次数属性值
                String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                if (reconsumeTimes != null) {
   
     
                    //将重新消费次数设置到请求头中,并且清除该属性
                    requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                }
                //获取消息的最大重试次数属性值
                String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                if (maxReconsumeTimes != null) {
   
     
                    //将最大重新消费次数设置到请求头中,并且清除该属性
                    requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                }
            }
            /*
             * 9 根据不同的发送模式,发送消息
             */
            SendResult sendResult = null;
            switch (communicationMode) {
   
     
                /*
                 * 异步发送模式
                 */
                case ASYNC:
                    /*
                     * 首先克隆并还原消息
                     *
                     * 该方法的finally中已经有还原消息的代码了,为什么在异步发送消息之前,还要先还原消息呢?
                     *
                     * 因为异步发送时 finally 重新赋值的时机并不确定,有很大概率是在第一次发送结束前就完成了 finally 中的赋值,
                     * 因此在内部重试前 msg.body 大概率已经被重新赋值过,而 onExceptionImpl 中的重试逻辑 MQClientAPIImpl.sendMessageAsync 不会再对数据进行压缩,
                     * 简言之,在异步发送的情况下,如果调用 onExceptionImpl 内部的重试,有很大概率发送的是无压缩的数据
                     */
                    Message tmpMessage = msg;
                    boolean messageCloned = false;
                    //如果开启了消息压缩
                    if (msgBodyCompressed) {
   
     
                        //If msg body was compressed, msgbody should be reset using prevBody.
                        //Clone new message using commpressed message body and recover origin massage.
                        //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                        //克隆一个message
                        tmpMessage = MessageAccessor.cloneMessage(msg);
                        messageCloned = true;
                        //恢复原来的消息体
                        msg.setBody(prevBody);
                    }
                    //如果topic整合了namespace
                    if (topicWithNamespace) {
   
     
                        if (!messageCloned) {
   
     
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                        }
                        //还原topic
                        msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                    }
                    /*
                     * 发送消息之前,进行超时检查,如果已经超时了那么取消本次发送操作,抛出异常
                     */
                    long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeAsync) {
   
     
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    /*
                     * 10 发送异步消息
                     */
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
                    break;
                /*
                 * 单向、同步发送模式
                 */
                case ONEWAY:
                case SYNC:
                    /*
                     * 发送消息之前,进行超时检查,如果已经超时了那么取消本次发送操作,抛出异常
                     */
                    long costTimeSync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeSync) {
   
     
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    /*
                     * 10 发送单向、同步消息
                     */
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout - costTimeSync,
                            communicationMode,
                            context,
                            this);
                    break;
                default:
                    assert false;
                    break;
            }
            /*
             * 9 如果存在SendMessageHook,则执行sendMessageAfter方法
             */
            if (this.hasSendMessageHook()) {
   
     
                context.setSendResult(sendResult);
                this.executeSendMessageHookAfter(context);
            }
            //返回执行结果
            return sendResult;

            //如果抛出了异常,如果存在SendMessageHook,则执行sendMessageAfter方法
        } catch (RemotingException e) {
   
     
            if (this.hasSendMessageHook()) {
   
     
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } catch (MQBrokerException e) {
   
     
            if (this.hasSendMessageHook()) {
   
     
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } catch (InterruptedException e) {
   
     
            if (this.hasSendMessageHook()) {
   
     
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } finally {
   
     
            /*
             * 对消息进行恢复
             * 1、因为客户端可能还需要查看原始的消息内容,如果是压缩消息,则无法查看
             * 2、另外如果第一次压缩后消息还是大于4k,如果不恢复消息,那么客户端使用该message重新发送的时候,还会进行一次消息压缩
             */
            msg.setBody(prevBody);
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
        }
    }

    throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

2.6.1 findBrokerAddressInPublish查找broker地址#

根据brokerName向brokerAddrTable查找数据, 因为生产者只会向Master发送数据, 所以返回的是Master地址。

 

/**
 * MQClientInstance的方法
 */
public String findBrokerAddressInPublish(final String brokerName) {
   
     
    //查询brokerAddrTable缓存的数据
    HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
    //返回Mater节点的地址
    if (map != null && !map.isEmpty()) {
   
     
        return map.get(MixAll.MASTER_ID);
    }

    return null;
}

2.6.2 brokerVIPChannel判断vip通道#

调用brokerVIPChannel判断是否开启vip通道, 如果开启了, 那么将brokerAddr的port – 2, vip通道的端口为普通端口-2。

 

/**
 * MixAll的方法
 */
public static String brokerVIPChannel(final boolean isChange, final String brokerAddr) {
   
     
    //如果开启了vip通道
    if (isChange) {
   
     
        int split = brokerAddr.lastIndexOf(":");
        String ip = brokerAddr.substring(0, split);
        String port = brokerAddr.substring(split + 1);
        //重新拼接brokerAddr,其中port - 2
        String brokerAddrNew = ip + ":" + (Integer.parseInt(port) - 2);
        return brokerAddrNew;
    } else {
   
     
        //如果没有开启vip通道,那么返回原地址
        return brokerAddr;
    }
}

消费者拉取消息只能请求普通通道, 但是生产者可以选择vip和普通通道。

2.6.3 setUniqID生成uniqId#

设置到UNIQ_KEY属性中, 批量消息在生成时就已经设置uniqId。

uniqId表示客户端生成的唯一一条消息。

 

/**
 * MessageClientIDSetter的方法
 */
public static void setUniqID(final Message msg) {
   
     
    //如果这条消息不存在"UNIQ_KEY"属性,那么创建uniqId并且存入"UNIQ_KEY"属性中
    if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
   
     
        msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());
    }
}

2.6.4 tryToCompressMessage压缩消息#

消息压缩, 压缩比为5, 压缩之后, 设置压缩标志, 批量消息不支持压缩, 消息压缩有利于网络传输数据。

 

/**
 * DefaultMQProducerImpl的方法
 */
private boolean tryToCompressMessage(final Message msg) {
   
     
    //如果是批量消息,那么不进行压缩
    if (msg instanceof MessageBatch) {
   
     
        //batch dose not support compressing right now
        return false;
    }
    byte[] body = msg.getBody();
    if (body != null) {
   
     
        //如果消息长度大于4K
        if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
   
     
            try {
   
     
                //进行压缩,使用的JDK自带的压缩类
                byte[] data = UtilAll.compress(body, zipCompressLevel);
                if (data != null) {
   
     
                    //重新设置到body中
                    msg.setBody(data);
                    return true;
                }
            } catch (IOException e) {
   
     
                log.error("tryToCompressMessage exception", e);
                log.warn(msg.toString());
            }
        }
    }

    return false;
}

2.7 updateFaultItem更新故障表

发送消息后, 无论正常与否, 调用updateFaultItem更新故障表, 更新本地错误表缓存数据, 用于延迟时间的故障转移功能。

故障转移功能在此前的selectOneMessageQueue方法中被使用到, 用于查找一个可用的消息队列, updateFaultItem方法判断是否开启了故障转移功能, 会更新LatencyFaultTolerance维护的faultItemTable集合属性中的异常broker数据。

 

/**
 * DefaultMQProducerImpl的方法
 * @param brokerName brokerName
 * @param currentLatency 当前延迟
 * @param isolation 是否使用默认隔离
 */
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
   
     
    //调用MQFaultStrategy#updateFaultItem方法
    this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}

MQFaultStrategy#updateFaultItem方法。其根据本次发送消息的延迟时间currentLatency, 计算出broker的隔离时间duration, 即是broker的下一个可用时间点。用于更新故障记录表。

/**
 * MQFaultStrategy的方法
 *
 * @param brokerName     brokerName
 * @param currentLatency 当前延迟
 * @param isolation      是否使用默认隔离时间
 */
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
   
     
    //如果开启了故障转移,即sendLatencyFaultEnable为true,默认false
    if (this.sendLatencyFaultEnable) {
   
     
        //根据消息当前延迟currentLatency计算当前broker的故障延迟的时间duration
        //如果isolation为true,则使用默认隔离时间30000,即30s
        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
        //更新故障记录表
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    }
}

2.7.1 computeNotAvailableDuration计算隔离时间#

根据消息当前延迟currentLatency计算当前broker的故障延迟的时间duration, 据此即可以计算出该broker的下一个可用时间点。

 

//延迟等级
private long[] latencyMax = {
   
     50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
//不可用时间等级
private long[] notAvailableDuration = {
   
     0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

/**
 * MQFaultStrategy的方法
 *
 * @param currentLatency 当前延迟
 * @return 故障延迟的时间
 */
private long computeNotAvailableDuration(final long currentLatency) {
   
     
    //倒叙遍历latencyMax
    for (int i = latencyMax.length - 1; i >= 0; i--) {
   
     
        //选择broker延迟时间对应的broker不可用时间,默认30000对应的故障延迟的时间为600000,即10分钟
        if (currentLatency >= latencyMax[i])
            return this.notAvailableDuration[i];
    }

    return 0;
}

latencyMax为延迟等级, notAvailableDuration为隔离时间。之间的关系:

 

2.7.2 updateFaultItem更新故障表#

该方法更新LatencyFaultToleranceImpl维护的faultItemTable集合属性中的异常broker的故障信息, 将会设置发送消息的延迟时间currentLatency属性, 以及下一个可用时间点LatencyFaultToleranceImpl属性。

下次可用时间LatencyFaultToleranceImpl属性= 现在的时间 + 隔离的时间, 在selectOneMessageQueue方法选取消息队列的时候, 如果开启了故障转移, 那么会查找下一个可用时间点小于当前时间点的broker的队列来发送消息。

 

/**
 * LatencyFaultToleranceImpl的方法
 *
 * @param name                 brokerName
 * @param currentLatency       当前延迟
 * @param notAvailableDuration 隔离时间(不可用时间)
 */
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
   
     
    //获取该broker此前的故障记录数据
    FaultItem old = this.faultItemTable.get(name);
    //如果此前没有数据,那么设置一个新对象肌凝乳
    if (null == old) {
   
     
        final FaultItem faultItem = new FaultItem(name);
        //设置当前延迟
        faultItem.setCurrentLatency(currentLatency);
        //设置下一次可用时间点
        faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        //已有故障记录,更新
        old = this.faultItemTable.putIfAbsent(name, faultItem);
        if (old != null) {
   
     
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    } else {
   
     
        //已有故障记录,更新
        old.setCurrentLatency(currentLatency);
        old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
    }
}

3.总结

1、 生产者消息重试:RocketMQ的消费者消息重试和生产者消息重投;
2、 生产者故障转移:通过sendLatencyFaultEnable属性配置是否开启,目的是为了保证每次发送消息成功,是一种高可用手段;

1、 延迟时间的故障转移,需要sendLatencyFaultEnable为true,消息队列选择时候,可用过滤mq认为不可用的broker,以免不断为宕机的broker发送消息,选取一个延迟比较短的broker,实现消息发送高可用;
2、 没有开启延迟时间的故障转移的时候,在轮询选择mq的时候,不会选择上一次发送失败的broker,实现消息发送高可用;
3、 Vip通道:VIP通道用于隔离读写操作,消费者拉取消息只能请求普通通道,生产者可用选择vip或者普通通道;
4、 故障转移表:RocketMQ的Producer生产者故障转移依赖于故障转移表实现,是一个HasmMap消息发送结束之后,会根据本次发送消息的延迟时间currentLatency,计算该broker的隔离时间duration,即为broker的下一次可用时间点然后更新故障记录表故障转移表的key为brokerName,value为未来该broker可用时间;