美文网首页
osm 源码简单分析

osm 源码简单分析

作者: wwq2020 | 来源:发表于2020-08-09 15:52 被阅读0次

    简介

    作为 xds 的 server,链接上来的 envoy 都会注册到 meshcatalog,断开则取消注册
    每一分钟或者 meshspec/cert/ingress/namespace/endpoint/deployment 有变化,则下发配置到 meshcatalog 中已注册的 envoy
    当然也有 envoy 主动获取的

    源码

    cmd/osm-controller/osm-controller.go 中

    func main() {
        ...
        kubeClient := kubernetes.NewForConfigOrDie(kubeConfig)
        ...
        cfg := configurator.NewConfigurator(kubernetes.NewForConfigOrDie(kubeConfig), stop, osmNamespace, osmConfigMapName)
        ...
        namespaceController := namespace.NewNamespaceController(kubeClient, meshName, stop)
        meshSpec, err := smi.NewMeshSpecClient(*smiKubeConfig, kubeClient, osmNamespace, namespaceController, stop)
        ...
        provider, err := kube.NewProvider(kubeClient, namespaceController, stop, constants.KubeProviderName, cfg)
        ...
        endpointsProviders := []endpoint.Provider{provider}
        ...
        ingressClient, err := ingress.NewIngressClient(kubeClient, namespaceController, stop, cfg)
        ...
        meshCatalog := catalog.NewMeshCatalog(
            namespaceController,
            kubeClient,
            meshSpec,
            certManager,
            ingressClient,
            stop,
            cfg,
            endpointsProviders...)
        ...
        xdsServer := ads.NewADSServer(ctx, meshCatalog, enableDebugServer, osmNamespace, cfg)
        ...
        grpcServer, lis := utils.NewGrpc(serverType, *port, adsCert.GetCertificateChain(), adsCert.GetPrivateKey(), adsCert.GetIssuingCA())
        xds_discovery.RegisterAggregatedDiscoveryServiceServer(grpcServer, xdsServer)
    
        go utils.GrpcServe(ctx, grpcServer, lis, cancel, serverType)
    }
    

    pkg/utils/grpc.go 中

    func NewGrpc(serverType string, port int, certPem, keyPem, rootCertPem []byte) (*grpc.Server, net.Listener) {
        log.Info().Msgf("Setting up %s gRPC server...", serverType)
        addr := fmt.Sprintf(":%d", port)
        lis, err := net.Listen("tcp", addr)
        if err != nil {
            log.Fatal().Err(err).Msgf("Could not start %s gRPC server on %s", serverType, addr)
        }
    
        log.Info().Msgf("Parameters for %s gRPC server: MaxConcurrentStreams=%d;  KeepAlive=%+v", serverType, maxStreams, streamKeepAliveDuration)
    
        grpcOptions := []grpc.ServerOption{
            grpc.MaxConcurrentStreams(maxStreams),
            grpc.KeepaliveParams(keepalive.ServerParameters{
                Time: streamKeepAliveDuration,
            }),
        }
    
        mutualTLS, err := setupMutualTLS(false, serverType, certPem, keyPem, rootCertPem)
        if err != nil {
            log.Fatal().Err(err).Msg("Failed to setup mutual tls for GRPC server")
        }
        grpcOptions = append(grpcOptions, mutualTLS)
    
        return grpc.NewServer(grpcOptions...), lis
    }
    
    

    pkg/envoy/ads/server.go 中

    func NewADSServer(ctx context.Context, meshCatalog catalog.MeshCataloger, enableDebug bool, osmNamespace string, cfg configurator.Configurator) *Server {
        server := Server{
            catalog:      meshCatalog,
            ctx:          ctx,
            xdsHandlers:  getHandlers(),
            enableDebug:  enableDebug,
            osmNamespace: osmNamespace,
            cfg:          cfg,
        }
    
        if enableDebug {
            server.xdsLog = make(map[certificate.CommonName]map[envoy.TypeURI][]time.Time)
        }
    
        return &server
    }
    
    func getHandlers() map[envoy.TypeURI]func(context.Context, catalog.MeshCataloger, *envoy.Proxy, *xds_discovery.DiscoveryRequest, configurator.Configurator) (*xds_discovery.DiscoveryResponse, error) {
        return map[envoy.TypeURI]func(context.Context, catalog.MeshCataloger, *envoy.Proxy, *xds_discovery.DiscoveryRequest, configurator.Configurator) (*xds_discovery.DiscoveryResponse, error){
            envoy.TypeEDS: eds.NewResponse,
            envoy.TypeCDS: cds.NewResponse,
            envoy.TypeRDS: rds.NewResponse,
            envoy.TypeLDS: lds.NewResponse,
            envoy.TypeSDS: sds.NewResponse,
        }
    }
    

    pkg/envoy/ads/stream.go 中

    func (s *Server) StreamAggregatedResources(server xds_discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
        ...
        go receive(requests, &server, proxy, quit)
    
        for {
            select {
                ...
                case discoveryRequest, ok := <-requests:
                    ...
                    resp, err := s.newAggregatedDiscoveryResponse(proxy, &discoveryRequest, s.cfg)
                    ...
    
                    if err := server.Send(resp); err != nil {
                        log.Error().Err(err).Msgf("Error sending DiscoveryResponse")
                    }
    
                case <-proxy.GetAnnouncementsChannel():
                    s.sendAllResponses(proxy, &server, s.cfg)
            }
        }
    }
    
    

    pkg/envoy/ads/grpc.go 中

    func receive(requests chan xds_discovery.DiscoveryRequest, server *xds_discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer, proxy *envoy.Proxy, quit chan struct{}) {
        defer close(requests)
        defer close(quit)
        for {
            var request *xds_discovery.DiscoveryRequest
            request, recvErr := (*server).Recv()
            if recvErr != nil {
                if status.Code(recvErr) == codes.Canceled || recvErr == io.EOF {
                    log.Error().Msgf("[grpc] Connection terminated: %+v", recvErr)
                    return
                }
                log.Error().Msgf("[grpc] Connection terminated with error: %+v", recvErr)
                return
            }
            if request.TypeUrl != "" {
                log.Trace().Msgf("[grpc] Received DiscoveryRequest from Envoy %s: %+v", proxy.GetCommonName(), request)
                requests <- *request
            } else {
                log.Warn().Msgf("[grpc] Unknown resource: %+v", request)
            }
        }
    }
    
    

    pkg/envoy/ads/response.go 中

    func (s *Server) newAggregatedDiscoveryResponse(proxy *envoy.Proxy, request *xds_discovery.DiscoveryRequest, cfg configurator.Configurator) (*xds_discovery.DiscoveryResponse, error) {
        typeURL := envoy.TypeURI(request.TypeUrl)
        handler, ok := s.xdsHandlers[typeURL]
        if !ok {
            log.Error().Msgf("Responder for TypeUrl %s is not implemented", request.TypeUrl)
            return nil, errUnknownTypeURL
        }
        ...
        response, err := handler(s.ctx, s.catalog, proxy, request, cfg)
        ...
        return response, nil
    }
    

    拿 eds 举例
    pkg/envoy/eds/response.go 中

    
    func NewResponse(ctx context.Context, catalog catalog.MeshCataloger, proxy *envoy.Proxy, request *xds_discovery.DiscoveryRequest, cfg configurator.Configurator) (*xds_discovery.DiscoveryResponse, error) {
        svc, err := catalog.GetServiceFromEnvoyCertificate(proxy.GetCommonName())
        ...
        proxyServiceName := *svc
    
        allTrafficPolicies, err := catalog.ListTrafficPolicies(proxyServiceName)
        ...
    
        allServicesEndpoints := make(map[service.MeshService][]endpoint.Endpoint)
        for _, trafficPolicy := range allTrafficPolicies {
            isSourceService := trafficPolicy.Source.Equals(proxyServiceName)
            if isSourceService {
                destService := trafficPolicy.Destination
                serviceEndpoints, err := catalog.ListEndpointsForService(destService)
                if err != nil {
                    log.Error().Err(err).Msgf("Failed listing endpoints")
                    return nil, err
                }
                allServicesEndpoints[destService] = serviceEndpoints
            }
        }
        ...
        var protos []*any.Any
        for serviceName, serviceEndpoints := range allServicesEndpoints {
            loadAssignment := cla.NewClusterLoadAssignment(serviceName, serviceEndpoints)
    
            proto, err := ptypes.MarshalAny(loadAssignment)
            if err != nil {
                log.Error().Err(err).Msgf("Error marshalling EDS payload %+v", loadAssignment)
                continue
            }
            protos = append(protos, proto)
        }
    
        resp := &xds_discovery.DiscoveryResponse{
            Resources: protos,
            TypeUrl:   string(envoy.TypeEDS),
        }
        return resp, nil
    }
    
    

    pkg/catalog/routes.go 中

    func (mc *MeshCatalog) ListTrafficPolicies(service service.MeshService) ([]trafficpolicy.TrafficTarget, error) {
        log.Info().Msgf("Listing traffic policies for service: %s", service)
    
        if mc.configurator.IsPermissiveTrafficPolicyMode() {
            // Build traffic policies from service discovery for allow-all policy
            trafficPolicies, err := mc.buildAllowAllTrafficPolicies(service)
            if err != nil {
                log.Error().Err(err).Msgf("Failed to build allow-all traffic policy for service %s", service)
                return nil, err
            }
            return trafficPolicies, nil
        }
    
        // Build traffic policies from SMI
        allRoutes, err := mc.getHTTPPathsPerRoute()
        if err != nil {
            log.Error().Err(err).Msgf("Could not get all routes")
            return nil, err
        }
    
        allTrafficPolicies, err := getTrafficPolicyPerRoute(mc, allRoutes, service)
        if err != nil {
            log.Error().Err(err).Msgf("Could not get all traffic policies")
            return nil, err
        }
        return allTrafficPolicies, nil
    }
    

    pkg/smi/client.go 中

    func NewMeshSpecClient(smiKubeConfig *rest.Config, kubeClient kubernetes.Interface, osmNamespace string, namespaceController namespace.Controller, stop chan struct{}) (MeshSpec, error) {
        smiTrafficSplitClientSet := smiTrafficSplitClient.NewForConfigOrDie(smiKubeConfig)
        smiTrafficSpecClientSet := smiTrafficSpecClient.NewForConfigOrDie(smiKubeConfig)
        smiTrafficTargetClientSet := smiTrafficTargetClient.NewForConfigOrDie(smiKubeConfig)
    
        var backpressureClientSet *backpressureClient.Clientset
        if featureflags.IsBackpressureEnabled() {
            backpressureClientSet = backpressureClient.NewForConfigOrDie(smiKubeConfig)
        }
    
        client := newSMIClient(
            kubeClient,
            smiTrafficSplitClientSet,
            smiTrafficSpecClientSet,
            smiTrafficTargetClientSet,
            backpressureClientSet,
            osmNamespace,
            namespaceController,
            kubernetesClientName,
        )
    
        err := client.run(stop)
        if err != nil {
            return client, errors.Errorf("Could not start %s client", kubernetesClientName)
        }
        return client, nil
    }
    
    func (c *Client) run(stop <-chan struct{}) error {
        log.Info().Msg("SMI Client started")
        var hasSynced []cache.InformerSynced
    
        if c.informers == nil {
            return errInitInformers
        }
    
        sharedInformers := map[string]cache.SharedInformer{
            "TrafficSplit":  c.informers.TrafficSplit,
            "Services":      c.informers.Services,
            "TrafficSpec":   c.informers.TrafficSpec,
            "TrafficTarget": c.informers.TrafficTarget,
        }
    
        if featureflags.IsBackpressureEnabled() {
            sharedInformers["Backpressure"] = c.informers.Backpressure
        }
    
        var names []string
        for name, informer := range sharedInformers {
            // Depending on the use-case, some Informers from the collection may not have been initialized.
            if informer == nil {
                continue
            }
            names = append(names, name)
            log.Info().Msgf("Starting informer: %s", name)
            go informer.Run(stop)
            hasSynced = append(hasSynced, informer.HasSynced)
        }
    
        log.Info().Msgf("[SMI Client] Waiting for informers' cache to sync: %+v", strings.Join(names, ", "))
        if !cache.WaitForCacheSync(stop, hasSynced...) {
            return errSyncingCaches
        }
    
        // Closing the cacheSynced channel signals to the rest of the system that... caches have been synced.
        close(c.cacheSynced)
    
        log.Info().Msgf("[SMI Client] Cache sync finished for %+v", names)
        return nil
    }
    

    pkg/catalog/catalog.go 中

    func NewMeshCatalog(namespaceController namespace.Controller, kubeClient kubernetes.Interface, meshSpec smi.MeshSpec, certManager certificate.Manager, ingressMonitor ingress.Monitor, stop <-chan struct{}, cfg configurator.Configurator, endpointsProviders ...endpoint.Provider) *MeshCatalog {
        log.Info().Msg("Create a new Service MeshCatalog.")
        sc := MeshCatalog{
            endpointsProviders: endpointsProviders,
            meshSpec:           meshSpec,
            certManager:        certManager,
            ingressMonitor:     ingressMonitor,
            configurator:       cfg,
    
            expectedProxies:      make(map[certificate.CommonName]expectedProxy),
            connectedProxies:     make(map[certificate.CommonName]connectedProxy),
            disconnectedProxies:  make(map[certificate.CommonName]disconnectedProxy),
            announcementChannels: set.NewSet(),
    
            // Kubernetes needed to determine what Services a pod that connects to XDS belongs to.
            // In multicluster scenarios this would be a map of cluster ID to Kubernetes client.
            // The certificate itself would contain the cluster ID making it easy to lookup the client in this map.
            kubeClient: kubeClient,
    
            namespaceController: namespaceController,
        }
    
        for _, announcementChannel := range sc.getAnnouncementChannels() {
            sc.announcementChannels.Add(announcementChannel)
    
        }
    
        go sc.repeater()
        return &sc
    }
    func (mc *MeshCatalog) getAnnouncementChannels() []announcementChannel {
        ticking := make(chan interface{})
        announcementChannels := []announcementChannel{
            {"MeshSpec", mc.meshSpec.GetAnnouncementsChannel()},
            {"CertManager", mc.certManager.GetAnnouncementsChannel()},
            {"IngressMonitor", mc.ingressMonitor.GetAnnouncementsChannel()},
            {"Ticker", ticking},
            {"Namespace", mc.namespaceController.GetAnnouncementsChannel()},
        }
        for _, ep := range mc.endpointsProviders {
            annCh := announcementChannel{ep.GetID(), ep.GetAnnouncementsChannel()}
            announcementChannels = append(announcementChannels, annCh)
        }
    
        go func() {
            ticker := time.NewTicker(updateAtLeastEvery)
            ticking <- ticker.C
        }()
        return announcementChannels
    }
    
    

    pkg/catalog/repeater.go 中

    func (mc *MeshCatalog) repeater() {
        lastUpdateAt := time.Now().Add(-1 * updateAtMostEvery)
        for {
            cases, caseNames := mc.getCases()
            for {
                if chosenIdx, message, ok := reflect.Select(cases); ok {
                    log.Info().Msgf("[repeater] Received announcement from %s", caseNames[chosenIdx])
                    delta := time.Since(lastUpdateAt)
                    if delta >= updateAtMostEvery {
                        mc.broadcast(message)
                        lastUpdateAt = time.Now()
                    }
                }
            }
        }
    }
    
    func (mc *MeshCatalog) getCases() ([]reflect.SelectCase, []string) {
        var caseNames []string
        var cases []reflect.SelectCase
        for _, channelInterface := range mc.announcementChannels.ToSlice() {
            annCh := channelInterface.(announcementChannel)
            cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(annCh.channel)})
            caseNames = append(caseNames, annCh.announcer)
        }
        return cases, caseNames
    }
    
    func (mc *MeshCatalog) broadcast(message interface{}) {
        mc.connectedProxiesLock.Lock()
        for _, connectedEnvoy := range mc.connectedProxies {
            log.Debug().Msgf("[repeater] Broadcast announcement to envoy %s", connectedEnvoy.proxy.GetCommonName())
            select {
            // send the message if possible - do not block
            case connectedEnvoy.proxy.GetAnnouncementsChannel() <- message:
            default:
            }
        }
        mc.connectedProxiesLock.Unlock()
    }
    
    

    pkg/ingress/client.go 中

    func NewIngressClient(kubeClient kubernetes.Interface, namespaceController namespace.Controller, stop chan struct{}, cfg configurator.Configurator) (Monitor, error) {
        informerFactory := informers.NewSharedInformerFactory(kubeClient, k8s.DefaultKubeEventResyncInterval)
        informer := informerFactory.Extensions().V1beta1().Ingresses().Informer()
    
        client := Client{
            informer:            informer,
            cache:               informer.GetStore(),
            cacheSynced:         make(chan interface{}),
            announcements:       make(chan interface{}),
            namespaceController: namespaceController,
        }
    
        shouldObserve := func(obj interface{}) bool {
            ns := reflect.ValueOf(obj).Elem().FieldByName("ObjectMeta").FieldByName("Namespace").String()
            return namespaceController.IsMonitoredNamespace(ns)
        }
        informer.AddEventHandler(k8s.GetKubernetesEventHandlers("Ingress", "Kubernetes", client.announcements, shouldObserve))
    
        if err := client.run(stop); err != nil {
            log.Error().Err(err).Msg("Could not start Kubernetes Ingress client")
            return nil, err
        }
    
        return client, nil
    }
    
    // run executes informer collection.
    func (c *Client) run(stop <-chan struct{}) error {
        log.Info().Msg("Ingress client started")
    
        if c.informer == nil {
            return errInitInformers
        }
    
        go c.informer.Run(stop)
        log.Info().Msgf("Waiting for Ingress informer cache sync")
        if !cache.WaitForCacheSync(stop, c.informer.HasSynced) {
            return errSyncingCaches
        }
    
        // Closing the cacheSynced channel signals to the rest of the system that... caches have been synced.
        close(c.cacheSynced)
    
        log.Info().Msgf("Cache sync finished for Ingress informer")
        return nil
    }
    

    pkg/endpoint/providers/kube/client.go 中

    func NewProvider(kubeClient kubernetes.Interface, namespaceController namespace.Controller, stop chan struct{}, providerIdent string, cfg configurator.Configurator) (*Client, error) {
        informerFactory := informers.NewSharedInformerFactory(kubeClient, k8s.DefaultKubeEventResyncInterval)
    
        informerCollection := InformerCollection{
            Endpoints:   informerFactory.Core().V1().Endpoints().Informer(),
            Deployments: informerFactory.Apps().V1().Deployments().Informer(),
        }
    
        cacheCollection := CacheCollection{
            Endpoints:   informerCollection.Endpoints.GetStore(),
            Deployments: informerCollection.Deployments.GetStore(),
        }
    
        client := Client{
            providerIdent:       providerIdent,
            kubeClient:          kubeClient,
            informers:           &informerCollection,
            caches:              &cacheCollection,
            cacheSynced:         make(chan interface{}),
            announcements:       make(chan interface{}),
            namespaceController: namespaceController,
        }
    
        shouldObserve := func(obj interface{}) bool {
            ns := reflect.ValueOf(obj).Elem().FieldByName("ObjectMeta").FieldByName("Namespace").String()
            return namespaceController.IsMonitoredNamespace(ns)
        }
        informerCollection.Endpoints.AddEventHandler(k8s.GetKubernetesEventHandlers("Endpoints", "Kubernetes", client.announcements, shouldObserve))
        informerCollection.Deployments.AddEventHandler(k8s.GetKubernetesEventHandlers("Deployments", "Kubernetes", client.announcements, shouldObserve))
    
        if err := client.run(stop); err != nil {
            return nil, errors.Errorf("Failed to start Kubernetes EndpointProvider client: %+v", err)
        }
    
        return &client, nil
    }
    
    func (c *Client) run(stop <-chan struct{}) error {
        var hasSynced []cache.InformerSynced
    
        if c.informers == nil {
            return errInitInformers
        }
    
        sharedInformers := map[string]cache.SharedInformer{
            "Endpoints":   c.informers.Endpoints,
            "Deployments": c.informers.Deployments,
        }
    
        var names []string
        for name, informer := range sharedInformers {
            // Depending on the use-case, some Informers from the collection may not have been initialized.
            if informer == nil {
                continue
            }
            names = append(names, name)
            log.Debug().Msgf("Starting informer %s", name)
            go informer.Run(stop)
            hasSynced = append(hasSynced, informer.HasSynced)
        }
    
        log.Info().Msgf("Waiting for informer's cache to sync: %+v", strings.Join(names, ", "))
        if !cache.WaitForCacheSync(stop, hasSynced...) {
            return errSyncingCaches
        }
    
        // Closing the cacheSynced channel signals to the rest of the system that... caches have been synced.
        close(c.cacheSynced)
    
        log.Info().Msgf("Cache sync finished for %+v", names)
        return nil
    }
    
    

    pkg/namespace/client.go 中

    func NewNamespaceController(kubeClient kubernetes.Interface, meshName string, stop chan struct{}) Controller {
        // Only monitor namespaces that are labeled with this OSM's mesh name
        monitorNamespaceLabel := map[string]string{MonitorLabel: meshName}
        labelSelector := fields.SelectorFromSet(monitorNamespaceLabel).String()
        option := informers.WithTweakListOptions(func(opt *metav1.ListOptions) {
            opt.LabelSelector = labelSelector
        })
        informerFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod, option)
        informer := informerFactory.Core().V1().Namespaces().Informer()
    
        client := Client{
            informer:      informer,
            cache:         informer.GetStore(),
            cacheSynced:   make(chan interface{}),
            announcements: make(chan interface{}),
        }
    
        if err := client.run(stop); err != nil {
            log.Fatal().Err(err).Msg("Could not start Kubernetes Namespaces client")
        }
    
        informer.AddEventHandler(k8s.GetKubernetesEventHandlers("Namespace", "NamespaceClient", client.announcements, nil))
    
        log.Info().Msgf("Monitoring namespaces with the label: %s=%s", MonitorLabel, meshName)
        return client
    }
    
    // run executes informer collection.
    func (c *Client) run(stop <-chan struct{}) error {
        log.Info().Msg("Namespace controller client started")
    
        if c.informer == nil {
            return errInitInformers
        }
    
        go c.informer.Run(stop)
        log.Info().Msgf("Waiting namespace.Monitor informer cache sync")
        if !cache.WaitForCacheSync(stop, c.informer.HasSynced) {
            return errSyncingCaches
        }
    
        // Closing the cacheSynced channel signals to the rest of the system that... caches have been synced.
        close(c.cacheSynced)
    
        log.Info().Msgf("Cache sync finished for namespace.Monitor informer")
        return nil
    }
    
    

    相关文章

      网友评论

          本文标题:osm 源码简单分析

          本文链接:https://www.haomeiwen.com/subject/swwldktx.html