美文网首页
八、RocketMQ实践方案

八、RocketMQ实践方案

作者: 恨别A鸟惊心 | 来源:发表于2019-03-31 15:16 被阅读0次

    1、Broker 的最佳实践

    Broker Role

    Broker Role有ASYNC_MASTER,SYNC_MASTER或SLAVE。如果您无法容忍消息丢失,我们建议您部署SYNC_MASTER并为其附加SLAVE。如果您容忍一少部分消息丢失,但希望Broker始终可用,则可以使用SLAVE部署ASYNC_MASTER。如果你只是想让它变得简单,你可能只需要一个没有SLAVE的ASYNC_MASTER。

    FlushDiskType 刷新磁盘的类型

    建议使用ASYNC_FLUSH,因为SYNC_FLUSH价格昂贵且会导致性能损失过大。如果您需要可靠性,我们建议您使用带有SLAVE的SYNC_MASTER。


    2、NameServer的最佳实践

    在Apache RocketMQ中,NameServer旨在协调分布式系统的每个组件,协调主要通过管理主题路由信息来实现。

    管理由两部分组成:

    • broker定期更新每个NameServer中保存的元数据。
    • NameServer为客户端提供服务,包括生产者,消费者和命令行客户端以及最新的路由信息​​。

    因此,在启动broker和client之前,我们需要告诉他们NameServer地址来访问 NameServer。在Apache RocketMQ中,这可以通过四种方式完成。

    Programmatic Way

    对于代理,我们可以namesrvAddr=name-server-ip1:port;name-server-ip2:port在代理配置文件中指定。

    对于生产者和消费者,我们可以向他们提供NameServer地址列表,如下所示:

    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    producer.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port");
    
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
    consumer.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port");
    
    

    如果从shell使用admin命令行,也可以这样指定:

    sh mqadmin command-name -n name-server-ip1:port;name-server-ip2:port -X OTHER-OPTION
    
    

    一个简单的例子是: sh mqadmin -n localhost:9876 clusterList 假设在NameServer节点上查询集群信息。

    如果您已将管理工具集成到自己的仪表板中,则可以:

    DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt("please_rename_unique_group_name");
    defaultMQAdminExt.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port");
    
    

    Java Options

    NameServer地址列表也可以通过rocketmq.namesrv.addr在启动之前指定续集java选项来提供给您的应用程序 。

    Environment Variable

    您可以导出NAMESRV_ADDR环境变量。如果设置,broker和client将检查并使用其值。

    HTTP Endpoint

    如果您未使用前面提到的方法指定名称服务器地址列表,Apache RocketMQ将访问以下HTTP端点,每两分钟获取并更新名称服务器地址列表,初始延迟为10秒。

    默认情况下,结束点是:

    http://jmenv.tbsite.net:8080/rocketmq/nsaddr

    您可以jmenv.tbsite.net使用此Java选项覆盖:rocketmq.namesrv.domain,您也可以nsaddr使用此Java选项覆盖part:rocketmq.namesrv.domain.subgroup

    如果您正在生产中运行Apache RocketMQ,建议使用此方法,因为它为您提供了最大的灵活性 - 您可以根据名称服务器的系统负载动态添加或删除名称服务器节点,而无需重新启动代理和客户端。

    优先

    首先介绍的方法优先于后者:
    Programmatic Way > Java Options > Environment Variable > HTTP Endpoint


    3、Producer最佳实践

    SendStatus

    发送消息时,您将获得包含SendStatus的SendResult。首先,我们假设Message的isWaitStoreMsgOK = true(默认为true)。如果没有,如果没有抛出异常,我们将始终获得SEND_OK。以下是每个状态的说明列表:

    FLUSH_DISK_TIMEOUT

    如果Broker设置MessageStoreConfig的FlushDiskType = SYNC_FLUSH(默认为ASYNC_FLUSH),并且Broker没有在MessageStoreConfig的syncFlushTimeout(默认为5秒)内完成刷新磁盘,您将获得此状态。

    FLUSH_SLAVE_TIMEOUT

    如果Broker的角色是SYNC_MASTER(默认为ASYNC_MASTER),并且从属Broker未在MessageStoreConfig的syncFlushTimeout(默认为5秒)内完成与主服务器的同步,则您将获得此状态。

    SLAVE_NOT_AVAILABLE

    如果Broker的角色是SYNC_MASTER(默认为ASYNC_MASTER),但没有配置slave Broker,您将获得此状态。

    SEND_OK

    SEND_OK并不意味着它是可靠的。要确保不会丢失任何消息,还应启用SYNC_MASTER或SYNC_FLUSH。

    复制或丢失

    如果你得到FLUSH_DISK_TIMEOUT,FLUSH_SLAVE_TIMEOUT并且Broker正好关闭,你可以找到你的消息丢失。此时,您有两个选择,一个是放手,这可能会导致此消息丢失; 另一种方法是重新发送消息,这可能会使消息重复。通常我们建议重新发送并找到一种方法来处理消费时的重复删除。除非你觉得某些消息丢失并不重要。但请记住,当您获得SLAVE_NOT_AVAILABLE时,重新发送是无用的。如果发生这种情况,您应该保留场景并提醒群集管理器。

    超时

    客户端向Broker发送请求,并等待响应,但如果最大等待时间已过,并且未返回任何响应,则客户端将抛出RemotingTimeoutException。默认等待时间为3秒。您还可以使用send(msg,timeout)而不是send(msg)传递超时参数。请注意,我们不建议等待时间太小,因为Broker需要一些时间来刷新磁盘或与从站同步。如果该值超过syncFlushTimeout,则该值可能影响不大,因为Broker可能会在超时之前返回FLUSH_SLAVE_TIMEOUT或FLUSH_SLAVE_TIMEOUT的响应。

    消息大小

    我们建议消息的大小不应超过512K。

    异步发送

    默认发送(msg)将阻塞,直到返回响应。因此,如果您关心性能,我们建议您使用send(msg,回调),它将以异步方式运行。

    Producer Group

    通常,生产者组没有任何影响。但是如果存在事务,你应该注意它。默认情况下,您只能在同一个JVM中创建一个具有相同生产者组的生产者,这通常就足够了。

    线程安全

    生产者是线程安全的,您可以在业务解决方案中使用它。

    性能

    如果您希望在一个JVM中有多个生产者进行大数据处理,我们建议:

    • use async sending with a few producers (3~5 is enough)
    • 每个生产者的设置不同的实例名称

    4、Consumer最佳实践

    Consumer群体和订阅

    您应该注意的第一件事是,不同的消费者组可以独立地使用相同的主题,并且每个消费者组都有自己的消费抵消。请确保同一组内的每个消费者订阅相同的主题。

    消息监听

    Orderly有顺序地

    消费者将锁定每个MessageQueue以确保它按顺序逐个使用。这会导致性能下降,但是当您关心消息的顺序时它会很有用。不建议抛出异常,您可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT。

    Concurrently同时

    顾名思义,Consumer将同时使用这些消息。建议使用它以获得良好的性能。不建议抛出异常,您可以返回ConsumeConcurrentlyStatus.RECONSUME_LATER。

    Consume Status消费状况

    对于MessageListenerConcurrently,您可以返回RECONSUME_LATER以告诉消费者您现在不能使用它并希望稍后重新生成它。然后,您可以继续使用其他消息。对于MessageListenerOrderly,因为您关心订单,所以不能跳过邮件,但是您可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT来告诉消费者等待片刻。

    Blocking闭塞

    不建议阻止监听器,因为它会阻塞线程池,最终可能会停止使用过程。

    Thread Number线程号

    使用者使用ThreadPoolExecutor在内部处理消费,因此您可以通过设置setConsumeThreadMin或setConsumeThreadMax来更改它。

    ConsumeFromWhere

    建立新的消费者群体时,需要决定是否需要消费已经存在于经纪人中的历史消息。CONSUME_FROM_LAST_OFFSET将忽略历史消息,并消耗之后生成的任何内容。CONSUME_FROM_FIRST_OFFSET将使用Broker中存在的每条消息。您还可以使用CONSUME_FROM_TIMESTAMP来使用在指定时间戳之后生成的消息。

    重复消费问题

    许多情况都可能导致重复消费问题,例如:

    • 生产者重新发送消息(即,在FLUSH_SLAVE_TIMEOUT的情况下)
    • 消费者关闭,一些抵消未及时更新到broker。

    因此,如果您的应用程序无法容忍重复,您可能需要执行一些外部工作来处理此问题。例如,您可以检查数据库的主键。

    相关文章

      网友评论

          本文标题:八、RocketMQ实践方案

          本文链接:https://www.haomeiwen.com/subject/vnzybqtx.html