美文网首页
读fluentbit-oprator代码1.fluentbit_

读fluentbit-oprator代码1.fluentbit_

作者: 文茶君 | 来源:发表于2021-06-07 21:53 被阅读0次

    https://github.com/kubesphere/fluentbit-operator

    func (r *FluentBitReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
        ctx := context.Background()
          _ = r.Log.WithValues("fluent-bit", req.NamespacedName)
    
        var fb logging.FluentBit
        if err := r.Get(ctx, req.NamespacedName, &fb); err != nil {
            if errors.IsNotFound(err) {
                return ctrl.Result{}, nil
            }
            return ctrl.Result{}, err
        }
    
        if fb.IsBeingDeleted() {
            if err := r.handleFinalizer(ctx, &fb); err != nil {
                return ctrl.Result{}, fmt.Errorf("error when handling finalizer: %v", err)
            }
            return ctrl.Result{}, nil
        }
    
        if !fb.HasFinalizer(logging.FluentBitFinalizerName) {
            if err := r.addFinalizer(ctx, &fb); err != nil {
                return ctrl.Result{}, fmt.Errorf("error when adding finalizer: %v", err)
            }
            return ctrl.Result{}, nil
        }
    
    
    
    

    _ = r.Log.WithValues("fluent-bit", req.NamespacedName)
    解释:如果说次接口没有被实现,那么一方面ide会有红横线出现,另一方面在编译的时候会出现报错。两方面的提示来保证写底层代码的接口是有被实现的

     if fb.IsBeingDeleted() {
            if err := r.handleFinalizer(ctx, &fb); err != nil {
                return ctrl.Result{}, fmt.Errorf("error when handling finalizer: %v", err)
            }
            return ctrl.Result{}, nil
        }
    

    这里handleFinalize()函数来自于

    func (r *FluentBitReconciler) addFinalizer(ctx context.Context, instance *logging.FluentBit) error {
        instance.AddFinalizer(logging.FluentBitFinalizerName)
        return r.Update(ctx, instance)
    }
    
    func (r *FluentBitReconciler) handleFinalizer(ctx context.Context, instance *logging.FluentBit) error {
        if !instance.HasFinalizer(logging.FluentBitFinalizerName) {
            return nil
        }
        if err := r.delete(ctx, instance); err != nil {
            return err
        }
        instance.RemoveFinalizer(logging.FluentBitFinalizerName)
        return r.Update(ctx, instance)
    }
    

    // FluentBitFinalizerName is the name of the fluentbit finalizer
    const FluentBitFinalizerName = "fluentbit.logging.kubesphere.io"

    finalizer
    Kubernetes 实战-Operator Finalizers 实现

    Finalizers 允许 Operator 控制器实现异步的 pre-delete hook。比如你给 API 类型中的每个对象都创建了对应的外部资源,你希望在 k8s 删除对应资源时同时删除关联的外部资源,那么可以通过 Finalizers 来实现。

    Finalizers 是由字符串组成的列表,当 Finalizers 字段存在时,相关资源不允许被强制删除。存在 Finalizers 字段的的资源对象接收的第一个删除请求设置 metadata.deletionTimestamp 字段的值, 但不删除具体资源,在该字段设置后, finalizer 列表中的对象只能被删除,不能做其他操作。

    当 metadata.deletionTimestamp 字段非空时,controller watch 对象并执行对应 finalizers 的动作,当所有动作执行完后,需要清空 finalizers ,之后 k8s 会删除真正想要删除的资源。

    介绍2:
    k8s资源中ObjectMeta.DeletionTimestamp 和 DemoMicroService.ObjectMeta.Finalizers 这两个元信息。前者表示删除该资源的这一行为的具体发生时间,如果不为0则表示删除资源的指令已经下达;而后者表示在真正删除 资源之前,,还有哪些逻辑没有被执行,如果 Finalizers 不为空,那么该资源则不会真正消失。

    // Check if Secret exists and requeue when not found
        var sec corev1.Secret
        if err := r.Get(ctx, client.ObjectKey{Namespace: fb.Namespace, Name: fb.Spec.FluentBitConfigName}, &sec); err != nil {
            if errors.IsNotFound(err) {
                return ctrl.Result{Requeue: true}, nil
            }
            return ctrl.Result{}, err
        }
    
    
    
    

    ctrl "sigs.k8s.io/controller-runtime" ctrl 是该包

    检查Secret存不存在,不存在 return ctrl.Result{Requeue: true}, nil

    type Result struct {
    // Requeue tells the Controller to requeue the reconcile key. Defaults to false.
    Requeue bool
    // RequeueAfter if greater than 0, tells the Controller to requeue the reconcile key after the Duration.
    // Implies that Requeue is true, there is no need to set Requeue to true at the same time as RequeueAfter.
    RequeueAfter time.Duration
    }

    安装RBAC验证

        // Install RBAC resources for the filter plugin kubernetes
        cr, sa, crb := operator.MakeRBACObjects(fb.Name, fb.Namespace)
        // Set ServiceAccount's owner to this fluentbit
        if err := ctrl.SetControllerReference(&fb, &sa, r.Scheme); err != nil {
            return ctrl.Result{}, err
        }
        if err := r.Create(ctx, &cr); err != nil && !errors.IsAlreadyExists(err) {
            return ctrl.Result{}, err
        }
        if err := r.Create(ctx, &sa); err != nil && !errors.IsAlreadyExists(err) {
            return ctrl.Result{}, err
        }
        if err := r.Create(ctx, &crb); err != nil && !errors.IsAlreadyExists(err) {
            return ctrl.Result{}, err
        }
    

    create函数是

    func (c *client) Create(ctx context.Context, obj runtime.Object, opts ...CreateOption) error {
        _, ok := obj.(*unstructured.Unstructured)
        if ok {
            return c.unstructuredClient.Create(ctx, obj, opts...)
        }
        return c.typedClient.Create(ctx, obj, opts...)
    }
    
    

    在我们编码中,经常会碰到读取数据时,要判断数据是哪种类型,典型的是json格式文本的读取和识别。在golang中主要用 x.(T)的方式来识别类型:x是变量,而且是不确定类型的变量,interface,如果是已知类型的,比如x是string,那么就会报错:invalid type assertion: data.(string) (non-interface type string on left),当然也不能是常量,常量的类型已知,不需要做类型判断。T是类型字面量,就是类型的名称,举例来说:
    var data interface{} = "hello"
    strValue, ok := data.(string)
    if ok {
    fmt.Printf("%s is string type\n", strValue)
    }
    作者:古月晨风
    链接:https://www.jianshu.com/p/e9da88e91ce9
    来源:简书

    部署守护进程fluentbit

    // Deploy Fluent Bit DaemonSet
        logPath := r.getContainerLogPath(fb)
        ds := operator.MakeDaemonSet(fb, logPath)
        if err := ctrl.SetControllerReference(&fb, &ds, r.Scheme); err != nil {
            return ctrl.Result{}, err
        }
    
        if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, &ds, r.mutate(&ds, fb)); err != nil {
            return ctrl.Result{}, err
        }
    
        return ctrl.Result{}, nil
    

    logPath := r.getContainerLogPath(fb)

    func (r *FluentBitReconciler) getContainerLogPath(fb logging.FluentBit) string {
        if fb.Spec.ContainerLogRealPath != "" {
            return fb.Spec.ContainerLogRealPath
        } else if r.ContainerLogRealPath != "" {
            return r.ContainerLogRealPath
        } else {
            return "/var/lib/docker/containers"
        }
    }
    

    对于函数中的第一行fb.Spec.ContainerLogRealPath里的解释如下
    fb对象的spec的ContainerLogRealPath,它是多个结构体对象。

    // FluentBit is the Schema for the fluentbits API
    type FluentBit struct {
        metav1.TypeMeta   `json:",inline"`
        metav1.ObjectMeta `json:"metadata,omitempty"`
    
        Spec   FluentBitSpec   `json:"spec,omitempty"`
        Status FluentBitStatus `json:"status,omitempty"`
    }
    // FluentBitSpec defines the desired state of FluentBit
    type FluentBitSpec struct {
        // Fluent Bit image.
        Image string `json:"image,omitempty"`
        // Fluent Bit image pull policy.
        ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"`
        // Storage for position db. You will use it if tail input is enabled.
        PositionDB corev1.VolumeSource `json:"positionDB,omitempty"`
        // Container log path
        ContainerLogRealPath string `json:"containerLogRealPath,omitempty"`
        // Compute Resources required by container.
        Resources corev1.ResourceRequirements `json:"resources,omitempty"`
        // NodeSelector
        NodeSelector map[string]string `json:"nodeSelector,omitempty"`
        // Pod's scheduling constraints.
        Affinity *corev1.Affinity `json:"affinity,omitempty"`
        // Tolerations
        Tolerations []corev1.Toleration `json:"tolerations,omitempty"`
        // Fluentbitconfig object associated with this Fluentbit
        FluentBitConfigName string `json:"fluentBitConfigName,omitempty"`
        // The Secrets are mounted into /fluent-bit/secrets/<secret-name>.
        Secrets []string `json:"secrets,omitempty"`
    }
    
    

    守护进程

    ds := operator.MakeDaemonSet(fb, logPath)

    appsv1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    rbacv1 "k8s.io/api/rbac/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "kubesphere.io/fluentbit-operator/api/v1alpha2"

    
    func MakeDaemonSet(fb v1alpha2.FluentBit, logPath string) appsv1.DaemonSet {
        ds := appsv1.DaemonSet{
            ObjectMeta: metav1.ObjectMeta{
                Name:      fb.Name,
                Namespace: fb.Namespace,
                Labels:    fb.Labels,
            },
            Spec: appsv1.DaemonSetSpec{
                Selector: &metav1.LabelSelector{
                    MatchLabels: fb.Labels,
                },
                Template: corev1.PodTemplateSpec{
                    ObjectMeta: metav1.ObjectMeta{
                        Name:      fb.Name,
                        Namespace: fb.Namespace,
                        Labels:    fb.Labels,
                    },
                    Spec: corev1.PodSpec{
                        ServiceAccountName: fb.Name,
                        Volumes: []corev1.Volume{
                            {
                                Name: "varlibcontainers",
                                VolumeSource: corev1.VolumeSource{
                                    HostPath: &corev1.HostPathVolumeSource{
                                        Path: logPath,
                                    },
                                },
                            },
                            {
                                Name: "config",
                                VolumeSource: corev1.VolumeSource{
                                    Secret: &corev1.SecretVolumeSource{
                                        SecretName: fb.Spec.FluentBitConfigName,
                                    },
                                },
                            },
                            {
                                Name: "varlogs",
                                VolumeSource: corev1.VolumeSource{
                                    HostPath: &corev1.HostPathVolumeSource{
                                        Path: "/var/log",
                                    },
                                },
                            },
                            {
                                Name: "systemd",
                                VolumeSource: corev1.VolumeSource{
                                    HostPath: &corev1.HostPathVolumeSource{
                                        Path: "/var/log/journal",
                                    },
                                },
                            },
                        },
                        Containers: []corev1.Container{
                            {
                                Name:            "fluent-bit",
                                Image:           fb.Spec.Image,
                                ImagePullPolicy: fb.Spec.ImagePullPolicy,
                                Ports: []corev1.ContainerPort{
                                    {
                                        Name:          "metrics",
                                        ContainerPort: 2020,
                                        Protocol:      "TCP",
                                    },
                                },
                                Env: []corev1.EnvVar{
                                    corev1.EnvVar{
                                        Name: "NODE_NAME",
                                        ValueFrom: &corev1.EnvVarSource{
                                            FieldRef: &corev1.ObjectFieldSelector{
                                                FieldPath: "spec.nodeName",
                                            },
                                        },
                                    },
                                },
                                VolumeMounts: []corev1.VolumeMount{
                                    {
                                        Name:      "varlibcontainers",
                                        ReadOnly:  true,
                                        MountPath: logPath,
                                    },
                                    {
                                        Name:      "config",
                                        ReadOnly:  true,
                                        MountPath: "/fluent-bit/config",
                                    },
                                    {
                                        Name:      "varlogs",
                                        ReadOnly:  true,
                                        MountPath: "/var/log/",
                                    },
                                    {
                                        Name:      "systemd",
                                        ReadOnly:  true,
                                        MountPath: "/var/log/journal",
                                    },
                                },
                                Resources: fb.Spec.Resources,
                            },
                        },
                        NodeSelector: fb.Spec.NodeSelector,
                        Tolerations:  fb.Spec.Tolerations,
                        Affinity:     fb.Spec.Affinity,
                    },
                },
            },
        }
    // Mount Position DB
        if fb.Spec.PositionDB != (corev1.VolumeSource{}) {
            ds.Spec.Template.Spec.Volumes = append(ds.Spec.Template.Spec.Volumes, corev1.Volume{
                Name:         "positions",
                VolumeSource: fb.Spec.PositionDB,
            })
            ds.Spec.Template.Spec.Containers[0].VolumeMounts = append(ds.Spec.Template.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{
                Name:      "positions",
                MountPath: "/fluent-bit/tail",
            })
        }
    
        // Mount Secrets
        for _, secret := range fb.Spec.Secrets {
            ds.Spec.Template.Spec.Volumes = append(ds.Spec.Template.Spec.Volumes, corev1.Volume{
                Name: secret,
                VolumeSource: corev1.VolumeSource{
                    Secret: &corev1.SecretVolumeSource{
                        SecretName: secret,
                    },
                },
            })
            ds.Spec.Template.Spec.Containers[0].VolumeMounts = append(ds.Spec.Template.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{
                Name:      secret,
                ReadOnly:  true,
                MountPath: fmt.Sprintf("/fluent-bit/secrets/%s", secret),
            })
        }
    
        return ds
    }
    

    相关文章

      网友评论

          本文标题:读fluentbit-oprator代码1.fluentbit_

          本文链接:https://www.haomeiwen.com/subject/uuskeltx.html