rocketmq采用netty作为底层通信框架,其中具体技术细节如下:
1、通信协议
rocketmq通信协议采用netty作为底层通信框架,协议格式采用自定义方式实现。具体协议如下:
rocketmq协议格式(4个字节的数据包长度与4个字节的头长度,这个包长与头长怎么会一样?感觉可以再优化下)
2、通信加密
rocketmq采用netty链接时可以使用netty 的sslContext方式加密传输,不过需要加相关启动参数上(-Dkey=value),启动参数如下(省略-D):
tls.server.mode=disabled (不加密传输) / permissive (默认) / enforcing (当然了,如果没有检测到证书路径也不会选择加密传输)
tls.server.certPath=xxx
tls.server.keyPath=xxx
tls.server.trustCertPath=xxx
以上三个均是证书路径,不过是服务端的,客户端也有,客户端如下:
tls.client.certPath=xxx
tls.client.keyPath=xxx
tls.client.trustCertPath=xxx
3、rocketmq-netty默认线程数量
rocketmq采用主从线程模型
thread-acceptor : 1
thread-boss : 3
thread-worker : 8
4、rocketmq可配参数
rocketmq组件配置架构图1)netty-server-config参数
listenPort=8888
serverWorkerThreads=8 (这是worker线程)
serverCallbackExecutorThreads=0
serverSelectorThreads=3 (这是boss线程)
serverOnewaySemaphoreValue=256
serverAsyncSemaphoreValue=64
serverChannelMaxIdleTimeSeconds=120(默认空闲2分钟断掉)
serverSocketSndBufSize=65535
serverSocketRcvBufSize=65535
serverPooledByteBufAllocatorEnable=true(是否使用内存池,默认使用)
useEpollNativeSelector=false(是否采用netty自己实现的EpollLoopGroup)
2) netty-client-config参数
clientWorkerThreads =4
clientCallbackExecutorThreads=核心数量
clientOnewaySemaphoreValue = 65535
clientAsyncSemaphoreValue = 65535
connectTimeoutMillis =3000
channelNotActiveInterval=1000 *60
clientChannelMaxIdleTimeSeconds=120
clientSocketSndBufSize = 65535
clientSocketRcvBufSize = 65535
clientPooledByteBufAllocatorEnable=false
clientCloseSocketIfTimeout=false
useTLS=false
3)nameserver-config参数
rocketmq.home.dir=xxx(这是启动参数,not properties)
user.home=xxx(启动参数,存放kvConfig.json、namesrv.properties)
productEnvName=center(默认生产环境名称)
orderMessageEnable=false(有序消息可用性)
4) broker-config参数
aclEnable=false
adminBrokerThreadPoolNums=16
autoCreateSubscriptionGroup=true
autoCreateTopicEnable=true
brokerClusterName=DefaultCluster
brokerFastFailureEnable=true
brokerId=0
brokerIP1=localhost
brokerIP2=localhost
brokerName=localhost
brokerPermission=4 | 2 (read | write)
brokerTopicEnable=true
clientManagerThreadPoolQueueCapacity=1000000
clientManageThreadPoolNums=32
clusterTopicEnable=true
commercialBaseCount=1
commercialBigCount=1
commercialEnable=true
commercialTimerCount=1
commercialTransCount=1
compressedRegister=false
consumerFallbehindThreshold=1024L *1024 *1024 *16
consumerManagerThreadPoolQueueCapacity=1000000
consumerManageThreadPoolNums=32
defaultTopicQueueNums=8
disableConsumeIfConsumerReadSlowly=false
enableCalcFilterBitMap=false
enablePropertyFilter=false
endTransactionPoolQueueCapacity=100000
endTransactionThreadPoolNums=8 + 核心数量 *2
expectConsumerNumUseFilter=32
fetchNamesrvAddrByAddressServer=false
filterDataCleanTimeSpan=24 *3600 *1000
filterServerNums=0
filterSupportRetry=false
flushConsumerOffsetHistoryInterval=1000 *60
flushConsumerOffsetInterval=1000 *5
forceRegister=true
heartbeatThreadPoolNums=Math.min(32, 核心数量)
heartbeatThreadPoolQueueCapacity=50000
highSpeedMode=false
longPollingEnable=true
maxDelayTime=40
maxErrorRateOfBloomFilter=20
messageStorePlugIn=""
msgTraceTopicName=RMQ_SYS_TRACE_TOPIC
namesrvAddr=localhost:8080;localhost:8080
notifyConsumerIdsChangedEnable=true
pullMessageThreadPoolNums=16 + 核心数量 *2
pullThreadPoolQueueCapacity=100000
queryMessageThreadPoolNums=8 + 核心数量
queryThreadPoolQueueCapacity=20000
regionId=DefaultRegion
registerBrokerTimeoutMills=6000
registerNameServerPeriod=1000 *30
rejectTransactionMessage=false
rocketmqHome=你的环境变量-ROCKETMQ_HOME
sendMessageThreadPoolNums=1
sendThreadPoolQueueCapacity=10000
shortPollingTimeMills=1000
slaveReadEnable=false
startAcceptSendRequestTimeStamp=0
traceOn=true
traceTopicEnable=false
transactionCheckInterval=60 *1000
transactionCheckMax=15
transactionTimeOut=6 *1000
transferMsgByHeap=true
waitTimeMillsInHeartbeatQueue=31 *1000
waitTimeMillsInPullQueue=5 *1000
waitTimeMillsInSendQueue=200
waitTimeMillsInTransactionQueue=3 *1000
5) message-store-config
accessMessageInMemoryMaxRatio=40
bitMapLengthConsumeQueueExt=64
brokerRole=ASYNC_MASTER
checkCRCOnRecover=true
cleanFileForciblyEnable=true
cleanResourceInterval=10000
commitCommitLogLeastPages=4
commitCommitLogThoroughInterval=200
commitIntervalCommitLog=200
debugLockEnable=false
defaultQueryMaxNum=32
deleteCommitLogFilesInterval=100
deleteConsumeQueueFilesInterval=100
deleteWhen="04" // When to delete,default is at 4 am
destroyMapedFileIntervalForcibly=1000 *120
diskFallRecorded=true
diskMaxUsedSpaceRatio=75
duplicationEnable=false
enableConsumeQueueExt=false
fastFailIfNoBufferInStorePool=false
fileReservedTime=72
flushCommitLogLeastPages=4
flushCommitLogThoroughInterval=1000 *10
flushCommitLogTimed=false
flushConsumeQueueLeastPages=2
flushConsumeQueueThoroughInterval=1000 *60
flushDelayOffsetInterval=1000 *10
flushDiskType=ASYNC_FLUSH
flushIntervalCommitLog=500
flushIntervalConsumeQueue=1000
flushLeastPagesWhenWarmMapedFile=1024 /4 *16
haHousekeepingInterval=1000 *20
haListenPort=10912
haSendHeartbeatInterval=1000 *5
haSlaveFallbehindMax=1024 *1024 *256
haTransferBatchSize=1024 *32
mapedFileSizeCommitLog=1024 *1024 *1024
mappedFileSizeConsumeQueueExt=48 *1024 *1024
maxHashSlotNum=5000000
maxIndexNum=5000000 *4
maxMessageSize=1024 *1024 *4
maxMsgsNumBatch=64
maxTransferBytesOnMessageInDisk=1024 *64
maxTransferBytesOnMessageInMemory=1024 *256
maxTransferCountOnMessageInDisk=8
maxTransferCountOnMessageInMemory=32
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
messageIndexEnable=true
messageIndexSafe=false
offsetCheckInSlave=false
osPageCacheBusyTimeOutMills=1000
putMsgIndexHightWater=600000
redeleteHangedFileInterval=1000 *120
storePathCommitLog=...
storePathRootDir=...
syncFlushTimeout=1000 *5
transientStorePoolEnable=false
transientStorePoolSize=5
useReentrantLockWhenPutMessage=false
warmMapedFileEnable=false
5 特殊参数
broker可以动态调整nameserver地址,频率是每2分钟以http的请求方式获取nameserver地址然后更新,这样可以增加系统高可用。热备broker链接nameserver需要启动参数,非配置文件,启动参数如下:
-Drocketmq.namesrv.domain=jmenv.tbsite.net
-Drocketmq.namesrv.domain.subgroup=nsaddr
默认 jmenv.tbsite.net:8080/rocketmq/nsaddr
网友评论