https://github.com/kubesphere/fluentbit-operator
先看结构体
type FluentBitConfigReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
}
执行代码
func (r *FluentBitConfigReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
_ = r.Log.WithValues("fluentbitconfig", req.NamespacedName)
var cfgs logging.FluentBitConfigList
if err := r.List(ctx, &cfgs, client.InNamespace(req.Namespace)); err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
这常规代码,没什么好说的
for _, cfg := range cfgs.Items {
// List all inputs matching the label selector.
var inputs logging.InputList
selector, err := metav1.LabelSelectorAsSelector(&cfg.Spec.InputSelector)
if err != nil {
return ctrl.Result{}, err
}
if err = r.List(ctx, &inputs, client.InNamespace(req.Namespace), client.MatchingLabelsSelector{Selector: selector}); err != nil {
return ctrl.Result{}, err
}
var cfgs logging.FluentBitConfigList中的结构
ype FluentBitConfigList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []FluentBitConfig `json:"items"`
}
轮询列出所有与label匹配的input
selector, err := metav1.LabelSelectorAsSelector(&cfg.Spec.InputSelector)
// List all filters matching the label selector.
var filters logging.FilterList
selector, err = metav1.LabelSelectorAsSelector(&cfg.Spec.FilterSelector)
if err != nil {
return ctrl.Result{}, err
}
if err = r.List(ctx, &filters, client.InNamespace(req.Namespace), client.MatchingLabelsSelector{Selector: selector}); err != nil {
return ctrl.Result{}, err
}
// List all outputs matching the label selector.
var outputs logging.OutputList
selector, err = metav1.LabelSelectorAsSelector(&cfg.Spec.OutputSelector)
if err != nil {
return ctrl.Result{}, err
}
if err = r.List(ctx, &outputs, client.InNamespace(req.Namespace), client.MatchingLabelsSelector{Selector: selector}); err != nil {
return ctrl.Result{}, err
}
// List all parsers matching the label selector.
var parsers logging.ParserList
selector, err = metav1.LabelSelectorAsSelector(&cfg.Spec.ParserSelector)
if err != nil {
return ctrl.Result{}, err
}
if err = r.List(ctx, &parsers, client.InNamespace(req.Namespace), client.MatchingLabelsSelector{Selector: selector}); err != nil {
return ctrl.Result{}, err
}
这几个一样,了解fluent-bit的结构。
// Inject config data into Secret
sl := plugins.NewSecretLoader(r.Client, cfg.Namespace, r.Log)
mainCfg, err := cfg.RenderMainConfig(sl, inputs, filters, outputs)
if err != nil {
return ctrl.Result{}, err
}
parserCfg, err := cfg.RenderParserConfig(sl, parsers)
if err != nil {
return ctrl.Result{}, err
}
cl := plugins.NewConfigMapLoader(r.Client, cfg.Namespace)
scripts, err := cfg.RenderLuaScript(cl, filters)
if err != nil {
return ctrl.Result{}, err
}
在这里代码引用了一段lua脚本
function add_time(tag, timestamp, record)
new_record = {}
timeStr = os.date("!*t", timestamp["sec"])
t = string.format("%4d-%02d-%02dT%02d:%02d:%02d.%sZ",
timeStr["year"], timeStr["month"], timeStr["day"],
timeStr["hour"], timeStr["min"], timeStr["sec"],
timestamp["nsec"])
kubernetes = {}
kubernetes["pod_name"] = record["_HOSTNAME"]
kubernetes["container_name"] = record["SYSLOG_IDENTIFIER"]
kubernetes["namespace_name"] = "kube-system"
new_record["time"] = t
new_record["log"] = record["MESSAGE"]
new_record["kubernetes"] = kubernetes
return 1, timestamp, new_record
end
func NewSecretLoader(c client.Client, ns string, l logr.Logger) SecretLoader {
return SecretLoader{
client: c,
namespace: ns,
}
}
创造或者更新一致的秘钥
// Create or update the corresponding Secret
sec := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: cfg.Name,
Namespace: cfg.Namespace,
},
Data: map[string][]byte{
"fluent-bit.conf": []byte(mainCfg),
"parsers.conf": []byte(parserCfg),
},
}
for k, v := range scripts {
sec.Data[k] = []byte(v)
}
if err := ctrl.SetControllerReference(&cfg, sec, r.Scheme); err != nil {
return ctrl.Result{}, err
}
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, sec, func() error {
sec.Data = map[string][]byte{
"fluent-bit.conf": []byte(mainCfg),
"parsers.conf": []byte(parserCfg),
}
for k, v := range scripts {
sec.Data[k] = []byte(v)
}
sec.SetOwnerReferences(nil)
if err := ctrl.SetControllerReference(&cfg, sec, r.Scheme); err != nil {
return err
}
return nil
}); err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
网友评论