简介
作为 xds 的 server,链接上来的 envoy 都会注册到 meshcatalog,断开则取消注册
每一分钟或者 meshspec/cert/ingress/namespace/endpoint/deployment 有变化,则下发配置到 meshcatalog 中已注册的 envoy
当然也有 envoy 主动获取的
源码
cmd/osm-controller/osm-controller.go 中
func main() {
...
kubeClient := kubernetes.NewForConfigOrDie(kubeConfig)
...
cfg := configurator.NewConfigurator(kubernetes.NewForConfigOrDie(kubeConfig), stop, osmNamespace, osmConfigMapName)
...
namespaceController := namespace.NewNamespaceController(kubeClient, meshName, stop)
meshSpec, err := smi.NewMeshSpecClient(*smiKubeConfig, kubeClient, osmNamespace, namespaceController, stop)
...
provider, err := kube.NewProvider(kubeClient, namespaceController, stop, constants.KubeProviderName, cfg)
...
endpointsProviders := []endpoint.Provider{provider}
...
ingressClient, err := ingress.NewIngressClient(kubeClient, namespaceController, stop, cfg)
...
meshCatalog := catalog.NewMeshCatalog(
namespaceController,
kubeClient,
meshSpec,
certManager,
ingressClient,
stop,
cfg,
endpointsProviders...)
...
xdsServer := ads.NewADSServer(ctx, meshCatalog, enableDebugServer, osmNamespace, cfg)
...
grpcServer, lis := utils.NewGrpc(serverType, *port, adsCert.GetCertificateChain(), adsCert.GetPrivateKey(), adsCert.GetIssuingCA())
xds_discovery.RegisterAggregatedDiscoveryServiceServer(grpcServer, xdsServer)
go utils.GrpcServe(ctx, grpcServer, lis, cancel, serverType)
}
pkg/utils/grpc.go 中
func NewGrpc(serverType string, port int, certPem, keyPem, rootCertPem []byte) (*grpc.Server, net.Listener) {
log.Info().Msgf("Setting up %s gRPC server...", serverType)
addr := fmt.Sprintf(":%d", port)
lis, err := net.Listen("tcp", addr)
if err != nil {
log.Fatal().Err(err).Msgf("Could not start %s gRPC server on %s", serverType, addr)
}
log.Info().Msgf("Parameters for %s gRPC server: MaxConcurrentStreams=%d; KeepAlive=%+v", serverType, maxStreams, streamKeepAliveDuration)
grpcOptions := []grpc.ServerOption{
grpc.MaxConcurrentStreams(maxStreams),
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: streamKeepAliveDuration,
}),
}
mutualTLS, err := setupMutualTLS(false, serverType, certPem, keyPem, rootCertPem)
if err != nil {
log.Fatal().Err(err).Msg("Failed to setup mutual tls for GRPC server")
}
grpcOptions = append(grpcOptions, mutualTLS)
return grpc.NewServer(grpcOptions...), lis
}
pkg/envoy/ads/server.go 中
func NewADSServer(ctx context.Context, meshCatalog catalog.MeshCataloger, enableDebug bool, osmNamespace string, cfg configurator.Configurator) *Server {
server := Server{
catalog: meshCatalog,
ctx: ctx,
xdsHandlers: getHandlers(),
enableDebug: enableDebug,
osmNamespace: osmNamespace,
cfg: cfg,
}
if enableDebug {
server.xdsLog = make(map[certificate.CommonName]map[envoy.TypeURI][]time.Time)
}
return &server
}
func getHandlers() map[envoy.TypeURI]func(context.Context, catalog.MeshCataloger, *envoy.Proxy, *xds_discovery.DiscoveryRequest, configurator.Configurator) (*xds_discovery.DiscoveryResponse, error) {
return map[envoy.TypeURI]func(context.Context, catalog.MeshCataloger, *envoy.Proxy, *xds_discovery.DiscoveryRequest, configurator.Configurator) (*xds_discovery.DiscoveryResponse, error){
envoy.TypeEDS: eds.NewResponse,
envoy.TypeCDS: cds.NewResponse,
envoy.TypeRDS: rds.NewResponse,
envoy.TypeLDS: lds.NewResponse,
envoy.TypeSDS: sds.NewResponse,
}
}
pkg/envoy/ads/stream.go 中
func (s *Server) StreamAggregatedResources(server xds_discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
...
go receive(requests, &server, proxy, quit)
for {
select {
...
case discoveryRequest, ok := <-requests:
...
resp, err := s.newAggregatedDiscoveryResponse(proxy, &discoveryRequest, s.cfg)
...
if err := server.Send(resp); err != nil {
log.Error().Err(err).Msgf("Error sending DiscoveryResponse")
}
case <-proxy.GetAnnouncementsChannel():
s.sendAllResponses(proxy, &server, s.cfg)
}
}
}
pkg/envoy/ads/grpc.go 中
func receive(requests chan xds_discovery.DiscoveryRequest, server *xds_discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer, proxy *envoy.Proxy, quit chan struct{}) {
defer close(requests)
defer close(quit)
for {
var request *xds_discovery.DiscoveryRequest
request, recvErr := (*server).Recv()
if recvErr != nil {
if status.Code(recvErr) == codes.Canceled || recvErr == io.EOF {
log.Error().Msgf("[grpc] Connection terminated: %+v", recvErr)
return
}
log.Error().Msgf("[grpc] Connection terminated with error: %+v", recvErr)
return
}
if request.TypeUrl != "" {
log.Trace().Msgf("[grpc] Received DiscoveryRequest from Envoy %s: %+v", proxy.GetCommonName(), request)
requests <- *request
} else {
log.Warn().Msgf("[grpc] Unknown resource: %+v", request)
}
}
}
pkg/envoy/ads/response.go 中
func (s *Server) newAggregatedDiscoveryResponse(proxy *envoy.Proxy, request *xds_discovery.DiscoveryRequest, cfg configurator.Configurator) (*xds_discovery.DiscoveryResponse, error) {
typeURL := envoy.TypeURI(request.TypeUrl)
handler, ok := s.xdsHandlers[typeURL]
if !ok {
log.Error().Msgf("Responder for TypeUrl %s is not implemented", request.TypeUrl)
return nil, errUnknownTypeURL
}
...
response, err := handler(s.ctx, s.catalog, proxy, request, cfg)
...
return response, nil
}
拿 eds 举例
pkg/envoy/eds/response.go 中
func NewResponse(ctx context.Context, catalog catalog.MeshCataloger, proxy *envoy.Proxy, request *xds_discovery.DiscoveryRequest, cfg configurator.Configurator) (*xds_discovery.DiscoveryResponse, error) {
svc, err := catalog.GetServiceFromEnvoyCertificate(proxy.GetCommonName())
...
proxyServiceName := *svc
allTrafficPolicies, err := catalog.ListTrafficPolicies(proxyServiceName)
...
allServicesEndpoints := make(map[service.MeshService][]endpoint.Endpoint)
for _, trafficPolicy := range allTrafficPolicies {
isSourceService := trafficPolicy.Source.Equals(proxyServiceName)
if isSourceService {
destService := trafficPolicy.Destination
serviceEndpoints, err := catalog.ListEndpointsForService(destService)
if err != nil {
log.Error().Err(err).Msgf("Failed listing endpoints")
return nil, err
}
allServicesEndpoints[destService] = serviceEndpoints
}
}
...
var protos []*any.Any
for serviceName, serviceEndpoints := range allServicesEndpoints {
loadAssignment := cla.NewClusterLoadAssignment(serviceName, serviceEndpoints)
proto, err := ptypes.MarshalAny(loadAssignment)
if err != nil {
log.Error().Err(err).Msgf("Error marshalling EDS payload %+v", loadAssignment)
continue
}
protos = append(protos, proto)
}
resp := &xds_discovery.DiscoveryResponse{
Resources: protos,
TypeUrl: string(envoy.TypeEDS),
}
return resp, nil
}
pkg/catalog/routes.go 中
func (mc *MeshCatalog) ListTrafficPolicies(service service.MeshService) ([]trafficpolicy.TrafficTarget, error) {
log.Info().Msgf("Listing traffic policies for service: %s", service)
if mc.configurator.IsPermissiveTrafficPolicyMode() {
// Build traffic policies from service discovery for allow-all policy
trafficPolicies, err := mc.buildAllowAllTrafficPolicies(service)
if err != nil {
log.Error().Err(err).Msgf("Failed to build allow-all traffic policy for service %s", service)
return nil, err
}
return trafficPolicies, nil
}
// Build traffic policies from SMI
allRoutes, err := mc.getHTTPPathsPerRoute()
if err != nil {
log.Error().Err(err).Msgf("Could not get all routes")
return nil, err
}
allTrafficPolicies, err := getTrafficPolicyPerRoute(mc, allRoutes, service)
if err != nil {
log.Error().Err(err).Msgf("Could not get all traffic policies")
return nil, err
}
return allTrafficPolicies, nil
}
pkg/smi/client.go 中
func NewMeshSpecClient(smiKubeConfig *rest.Config, kubeClient kubernetes.Interface, osmNamespace string, namespaceController namespace.Controller, stop chan struct{}) (MeshSpec, error) {
smiTrafficSplitClientSet := smiTrafficSplitClient.NewForConfigOrDie(smiKubeConfig)
smiTrafficSpecClientSet := smiTrafficSpecClient.NewForConfigOrDie(smiKubeConfig)
smiTrafficTargetClientSet := smiTrafficTargetClient.NewForConfigOrDie(smiKubeConfig)
var backpressureClientSet *backpressureClient.Clientset
if featureflags.IsBackpressureEnabled() {
backpressureClientSet = backpressureClient.NewForConfigOrDie(smiKubeConfig)
}
client := newSMIClient(
kubeClient,
smiTrafficSplitClientSet,
smiTrafficSpecClientSet,
smiTrafficTargetClientSet,
backpressureClientSet,
osmNamespace,
namespaceController,
kubernetesClientName,
)
err := client.run(stop)
if err != nil {
return client, errors.Errorf("Could not start %s client", kubernetesClientName)
}
return client, nil
}
func (c *Client) run(stop <-chan struct{}) error {
log.Info().Msg("SMI Client started")
var hasSynced []cache.InformerSynced
if c.informers == nil {
return errInitInformers
}
sharedInformers := map[string]cache.SharedInformer{
"TrafficSplit": c.informers.TrafficSplit,
"Services": c.informers.Services,
"TrafficSpec": c.informers.TrafficSpec,
"TrafficTarget": c.informers.TrafficTarget,
}
if featureflags.IsBackpressureEnabled() {
sharedInformers["Backpressure"] = c.informers.Backpressure
}
var names []string
for name, informer := range sharedInformers {
// Depending on the use-case, some Informers from the collection may not have been initialized.
if informer == nil {
continue
}
names = append(names, name)
log.Info().Msgf("Starting informer: %s", name)
go informer.Run(stop)
hasSynced = append(hasSynced, informer.HasSynced)
}
log.Info().Msgf("[SMI Client] Waiting for informers' cache to sync: %+v", strings.Join(names, ", "))
if !cache.WaitForCacheSync(stop, hasSynced...) {
return errSyncingCaches
}
// Closing the cacheSynced channel signals to the rest of the system that... caches have been synced.
close(c.cacheSynced)
log.Info().Msgf("[SMI Client] Cache sync finished for %+v", names)
return nil
}
pkg/catalog/catalog.go 中
func NewMeshCatalog(namespaceController namespace.Controller, kubeClient kubernetes.Interface, meshSpec smi.MeshSpec, certManager certificate.Manager, ingressMonitor ingress.Monitor, stop <-chan struct{}, cfg configurator.Configurator, endpointsProviders ...endpoint.Provider) *MeshCatalog {
log.Info().Msg("Create a new Service MeshCatalog.")
sc := MeshCatalog{
endpointsProviders: endpointsProviders,
meshSpec: meshSpec,
certManager: certManager,
ingressMonitor: ingressMonitor,
configurator: cfg,
expectedProxies: make(map[certificate.CommonName]expectedProxy),
connectedProxies: make(map[certificate.CommonName]connectedProxy),
disconnectedProxies: make(map[certificate.CommonName]disconnectedProxy),
announcementChannels: set.NewSet(),
// Kubernetes needed to determine what Services a pod that connects to XDS belongs to.
// In multicluster scenarios this would be a map of cluster ID to Kubernetes client.
// The certificate itself would contain the cluster ID making it easy to lookup the client in this map.
kubeClient: kubeClient,
namespaceController: namespaceController,
}
for _, announcementChannel := range sc.getAnnouncementChannels() {
sc.announcementChannels.Add(announcementChannel)
}
go sc.repeater()
return &sc
}
func (mc *MeshCatalog) getAnnouncementChannels() []announcementChannel {
ticking := make(chan interface{})
announcementChannels := []announcementChannel{
{"MeshSpec", mc.meshSpec.GetAnnouncementsChannel()},
{"CertManager", mc.certManager.GetAnnouncementsChannel()},
{"IngressMonitor", mc.ingressMonitor.GetAnnouncementsChannel()},
{"Ticker", ticking},
{"Namespace", mc.namespaceController.GetAnnouncementsChannel()},
}
for _, ep := range mc.endpointsProviders {
annCh := announcementChannel{ep.GetID(), ep.GetAnnouncementsChannel()}
announcementChannels = append(announcementChannels, annCh)
}
go func() {
ticker := time.NewTicker(updateAtLeastEvery)
ticking <- ticker.C
}()
return announcementChannels
}
pkg/catalog/repeater.go 中
func (mc *MeshCatalog) repeater() {
lastUpdateAt := time.Now().Add(-1 * updateAtMostEvery)
for {
cases, caseNames := mc.getCases()
for {
if chosenIdx, message, ok := reflect.Select(cases); ok {
log.Info().Msgf("[repeater] Received announcement from %s", caseNames[chosenIdx])
delta := time.Since(lastUpdateAt)
if delta >= updateAtMostEvery {
mc.broadcast(message)
lastUpdateAt = time.Now()
}
}
}
}
}
func (mc *MeshCatalog) getCases() ([]reflect.SelectCase, []string) {
var caseNames []string
var cases []reflect.SelectCase
for _, channelInterface := range mc.announcementChannels.ToSlice() {
annCh := channelInterface.(announcementChannel)
cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(annCh.channel)})
caseNames = append(caseNames, annCh.announcer)
}
return cases, caseNames
}
func (mc *MeshCatalog) broadcast(message interface{}) {
mc.connectedProxiesLock.Lock()
for _, connectedEnvoy := range mc.connectedProxies {
log.Debug().Msgf("[repeater] Broadcast announcement to envoy %s", connectedEnvoy.proxy.GetCommonName())
select {
// send the message if possible - do not block
case connectedEnvoy.proxy.GetAnnouncementsChannel() <- message:
default:
}
}
mc.connectedProxiesLock.Unlock()
}
pkg/ingress/client.go 中
func NewIngressClient(kubeClient kubernetes.Interface, namespaceController namespace.Controller, stop chan struct{}, cfg configurator.Configurator) (Monitor, error) {
informerFactory := informers.NewSharedInformerFactory(kubeClient, k8s.DefaultKubeEventResyncInterval)
informer := informerFactory.Extensions().V1beta1().Ingresses().Informer()
client := Client{
informer: informer,
cache: informer.GetStore(),
cacheSynced: make(chan interface{}),
announcements: make(chan interface{}),
namespaceController: namespaceController,
}
shouldObserve := func(obj interface{}) bool {
ns := reflect.ValueOf(obj).Elem().FieldByName("ObjectMeta").FieldByName("Namespace").String()
return namespaceController.IsMonitoredNamespace(ns)
}
informer.AddEventHandler(k8s.GetKubernetesEventHandlers("Ingress", "Kubernetes", client.announcements, shouldObserve))
if err := client.run(stop); err != nil {
log.Error().Err(err).Msg("Could not start Kubernetes Ingress client")
return nil, err
}
return client, nil
}
// run executes informer collection.
func (c *Client) run(stop <-chan struct{}) error {
log.Info().Msg("Ingress client started")
if c.informer == nil {
return errInitInformers
}
go c.informer.Run(stop)
log.Info().Msgf("Waiting for Ingress informer cache sync")
if !cache.WaitForCacheSync(stop, c.informer.HasSynced) {
return errSyncingCaches
}
// Closing the cacheSynced channel signals to the rest of the system that... caches have been synced.
close(c.cacheSynced)
log.Info().Msgf("Cache sync finished for Ingress informer")
return nil
}
pkg/endpoint/providers/kube/client.go 中
func NewProvider(kubeClient kubernetes.Interface, namespaceController namespace.Controller, stop chan struct{}, providerIdent string, cfg configurator.Configurator) (*Client, error) {
informerFactory := informers.NewSharedInformerFactory(kubeClient, k8s.DefaultKubeEventResyncInterval)
informerCollection := InformerCollection{
Endpoints: informerFactory.Core().V1().Endpoints().Informer(),
Deployments: informerFactory.Apps().V1().Deployments().Informer(),
}
cacheCollection := CacheCollection{
Endpoints: informerCollection.Endpoints.GetStore(),
Deployments: informerCollection.Deployments.GetStore(),
}
client := Client{
providerIdent: providerIdent,
kubeClient: kubeClient,
informers: &informerCollection,
caches: &cacheCollection,
cacheSynced: make(chan interface{}),
announcements: make(chan interface{}),
namespaceController: namespaceController,
}
shouldObserve := func(obj interface{}) bool {
ns := reflect.ValueOf(obj).Elem().FieldByName("ObjectMeta").FieldByName("Namespace").String()
return namespaceController.IsMonitoredNamespace(ns)
}
informerCollection.Endpoints.AddEventHandler(k8s.GetKubernetesEventHandlers("Endpoints", "Kubernetes", client.announcements, shouldObserve))
informerCollection.Deployments.AddEventHandler(k8s.GetKubernetesEventHandlers("Deployments", "Kubernetes", client.announcements, shouldObserve))
if err := client.run(stop); err != nil {
return nil, errors.Errorf("Failed to start Kubernetes EndpointProvider client: %+v", err)
}
return &client, nil
}
func (c *Client) run(stop <-chan struct{}) error {
var hasSynced []cache.InformerSynced
if c.informers == nil {
return errInitInformers
}
sharedInformers := map[string]cache.SharedInformer{
"Endpoints": c.informers.Endpoints,
"Deployments": c.informers.Deployments,
}
var names []string
for name, informer := range sharedInformers {
// Depending on the use-case, some Informers from the collection may not have been initialized.
if informer == nil {
continue
}
names = append(names, name)
log.Debug().Msgf("Starting informer %s", name)
go informer.Run(stop)
hasSynced = append(hasSynced, informer.HasSynced)
}
log.Info().Msgf("Waiting for informer's cache to sync: %+v", strings.Join(names, ", "))
if !cache.WaitForCacheSync(stop, hasSynced...) {
return errSyncingCaches
}
// Closing the cacheSynced channel signals to the rest of the system that... caches have been synced.
close(c.cacheSynced)
log.Info().Msgf("Cache sync finished for %+v", names)
return nil
}
pkg/namespace/client.go 中
func NewNamespaceController(kubeClient kubernetes.Interface, meshName string, stop chan struct{}) Controller {
// Only monitor namespaces that are labeled with this OSM's mesh name
monitorNamespaceLabel := map[string]string{MonitorLabel: meshName}
labelSelector := fields.SelectorFromSet(monitorNamespaceLabel).String()
option := informers.WithTweakListOptions(func(opt *metav1.ListOptions) {
opt.LabelSelector = labelSelector
})
informerFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod, option)
informer := informerFactory.Core().V1().Namespaces().Informer()
client := Client{
informer: informer,
cache: informer.GetStore(),
cacheSynced: make(chan interface{}),
announcements: make(chan interface{}),
}
if err := client.run(stop); err != nil {
log.Fatal().Err(err).Msg("Could not start Kubernetes Namespaces client")
}
informer.AddEventHandler(k8s.GetKubernetesEventHandlers("Namespace", "NamespaceClient", client.announcements, nil))
log.Info().Msgf("Monitoring namespaces with the label: %s=%s", MonitorLabel, meshName)
return client
}
// run executes informer collection.
func (c *Client) run(stop <-chan struct{}) error {
log.Info().Msg("Namespace controller client started")
if c.informer == nil {
return errInitInformers
}
go c.informer.Run(stop)
log.Info().Msgf("Waiting namespace.Monitor informer cache sync")
if !cache.WaitForCacheSync(stop, c.informer.HasSynced) {
return errSyncingCaches
}
// Closing the cacheSynced channel signals to the rest of the system that... caches have been synced.
close(c.cacheSynced)
log.Info().Msgf("Cache sync finished for namespace.Monitor informer")
return nil
}
网友评论