简单逻辑说明
- 通过topic名字确定namespace
- 查找这个namespace的bundle分配信息
- 根据bundle分配信息来确认这个topic属于哪个bundle
- 根据bundle信息来确认哪个broker负责这个bundle,返回broker的地址。
CommandLookup
主要用来查找Topic在被哪个broker负责。
一般客户端可以通过http协议或者二进制协议来查询。
message CommandLookupTopic {
// topic 名字
required string topic = 1;
// 网络层请求id
required uint64 request_id = 2;
optional bool authoritative = 3 [default = false];
// TODO - Remove original_principal, original_auth_data, original_auth_method
// Original principal that was verified by
// a Pulsar proxy.
optional string original_principal = 4;
// Original auth role and auth Method that was passed
// to the proxy.
optional string original_auth_data = 5;
optional string original_auth_method = 6;
// 从哪个指定的连接点进行连接
optional string advertised_listener_name = 7;
}
这里直接看服务端的代码ServerCnx
protected void handleLookup(CommandLookupTopic lookup) {
final long requestId = lookup.getRequestId();
final boolean authoritative = lookup.isAuthoritative();
final String advertisedListenerName = lookup.hasAdvertisedListenerName() ? lookup.getAdvertisedListenerName()
: null;
// 校验topic名字
TopicName topicName = validateTopicName(lookup.getTopic(), requestId, lookup);
if (topicName == null) {
return;
}
// 这里的Semaphore 是服务端Lookup请求的限流器
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
....
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP)
.thenApply(isAuthorized -> {
// 通过鉴权
if (isAuthorized) {
lookupTopicAsync(getBrokerService().pulsar(),
topicName,
authoritative,
getPrincipal(),
getAuthenticationData(),
requestId,
advertisedListenerName)
.handle((lookupResponse, ex) -> {
if (ex == null) {
ctx.writeAndFlush(lookupResponse);
} else {
....
}
lookupSemaphore.release();
return null;
});
} else {
....
}).exceptionally(ex -> {
....
});
} else {
// 如果有异常是发送的`CommandLookupTopicResponse`
// 这里已经是新的定义二进制消息的方式了
// / Wire format
// [TOTAL_SIZE] [CMD_SIZE][CMD]
ctx.writeAndFlush(newLookupErrorResponse(ServerError.TooManyRequests,
"Failed due to too many pending lookup requests", requestId));
}
}
TopicLookupBase.lookupTopicAsync
org.apache.pulsar.broker.lookup.TopicLookupBase#lookupTopicAsync
这个是一个静态方法
主要
- validation 校验集群,topic名字等(这里面有跨集群检查的逻辑,先略过)
- lookup逻辑
这里校验的逻辑先略过了,实际核心的逻辑在下面这2行上。
LookupOptions options = LookupOptions.builder()
.authoritative(authoritative)
.advertisedListenerName(advertisedListenerName)
.loadTopicsInBundle(true) // 这里这个条件是true
.build();
pulsarService.getNamespaceService().getBrokerServiceUrlAsync(topicName, options)
这里面的主要逻辑在NamespaceService
里面,PulsarService
可以认为是一个全局对象,pulsar需要的任何核心逻辑对象
(比如说NamspaceService
,BrokerService
,ConfigurationCacheService
等)你都可以从这个对象里面拿到。
NamespaceService.getBrokerServiceUrlAsync
这里面的主要逻辑是
根据传递过来的topic名字定位namespace
之后确认这个topic属于哪个NamespaceBundle。
之后根据这个NamespaceBundle 来找到这个bundle 的owner broker的地址。
public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic, LookupOptions options) {
....
CompletableFuture<Optional<LookupResult>> future = getBundleAsync(topic)
.thenCompose(bundle -> findBrokerServiceUrl(bundle, options));
....
}
public CompletableFuture<NamespaceBundle> getBundleAsync(TopicName topic) {
return bundleFactory.getBundlesAsync(topic.getNamespaceObject())
.thenApply(bundles -> bundles.findBundle(topic));
}
这里面的bundleFactory实际上是一个异步加载的cache。
我们看一下定义
// org.apache.pulsar.common.naming.NamespaceBundleFactory
private final AsyncLoadingCache<NamespaceName, NamespaceBundles> bundlesCache;
// 构造函数里面
public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
// .....
this.bundlesCache = Caffeine.newBuilder()
.recordStats() // 记录metric
.buildAsync(
// 加载cache 的逻辑
(NamespaceName namespace, Executor executor) -> {
String path = AdminResource.joinPath(LOCAL_POLICIES_ROOT, namespace.toString());
....
CompletableFuture<NamespaceBundles> future = new CompletableFuture<>();
// Read the static bundle data from the policies
pulsar
.getLocalZkCacheService() // 获取LocalZooKeeperCacheService
.policiesCache()
.getWithStatAsync(path)
.thenAccept(result -> {
// 这里实际是去找有没有单独为这个namespace配置bundle数量
BundlesData bundlesData = result.map(Entry::getKey).map(p -> p.bundles).orElse(null);
// 通过namespace拿到namespaceBundle
NamespaceBundles namespaceBundles = getBundles(
namespace, bundlesData, result.map(Entry::getValue).map(s -> s.getVersion()).orElse(-1));
....
future.complete(namespaceBundles);
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});
return future;
});
// .....
}
这里简单说一下NamespaceBundles 这个类,这个类会保存这个Namespace的所有NamespaceBundle,提供一个聚合的视图。
这个类表示一个hash环,这个环按照配置的分片个数,会被分成几个片段,
每个broker会按照一定算法来确定这个环上的哪一部分属于他自己。
topic也会按照一定的算法分配到这个hash环上。
这样broker就能确定自己负责哪些topic。
就可以返回lookup请求了,这个流程也会触发topic的加载流程。
NamespaceBundles.findBundle
这个函数就是确定这个topic属于哪个NamespaceBundle
// 映射topic到hash环上的一段, 这一段就被NamespaceBundle 标识
public NamespaceBundle findBundle(TopicName topicName) {
checkArgument(this.nsname.equals(topicName.getNamespaceObject()));
long hashCode = factory.getLongHashCode(topicName.toString());
NamespaceBundle bundle = getBundle(hashCode);
if (topicName.getDomain().equals(TopicDomain.non_persistent)) {
bundle.setHasNonPersistentTopic(true);
}
return bundle;
}
到这一步我们就能确定这个namespace的信息了,namespce被分为多少个bundle。
而且可以确定这个topic属于哪个namespacebundle。
下一步是根据namespaceBundle查找负责的broker。
NamespaceService.findBrokerServiceUrl
到这里是根据namespacebundle 确定broker
// 这个记录的是一个broker的元数据信息
public class NamespaceEphemeralData {
private String nativeUrl;
private String nativeUrlTls;
private String httpUrl;
private String httpUrlTls;
private boolean disabled;
private Map<String, AdvertisedListener> advertisedListeners;
}
private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
NamespaceBundle bundle, LookupOptions options) {
ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>> targetMap;
return targetMap.computeIfAbsent(bundle, (k) -> {
CompletableFuture<Optional<LookupResult>> future = new CompletableFuture<>();
// First check if we or someone else already owns the bundle
ownershipCache.getOwnerAsync(bundle)
.thenAccept(nsData -> {
// nsData : Optional<NamespaceEphemeralData>
if (!nsData.isPresent()) {
// 如果没找到这个信息
if (options.isReadOnly()) {
// Do not attempt to acquire ownership
future.complete(Optional.empty());
} else {
// 目前还没有人负责这个bundle 尝试查找这个bundle的owner
pulsar.getExecutor().execute(() -> {
searchForCandidateBroker(bundle, future, options);
});
}
} else if (nsData.get().isDisabled()) {
// namespce 正在unload
future.completeExceptionally(
new IllegalStateException(String.format("Namespace bundle %s is being unloaded", bundle)));
} else {
// 到这里是找到了的逻辑,直接拼接正常的response就行了
...
// find the target
future.complete(Optional.of(new LookupResult(nsData.get())));
}
}).exceptionally(exception -> {
...
});
// 这里实际上是使用这个targetMap来做一个锁的结构避免多次加载。
// https://github.com/apache/pulsar/pull/1527
future.whenComplete((r, t) -> pulsar.getExecutor().execute(
() -> targetMap.remove(bundle)
));
return future;
});
}
这样如果cache中存在这个topic的owner信息,就可以直接返回。
网友评论