一、依赖任务
1.1 k8s运行任务
# https://kubernetes.io/docs/tasks/job/automated-tasks-with-cron-jobs/
# https://github.com/jaegertracing/jaeger-kubernetes
apiVersion: batch/v1beta1
kind: CronJob
metadata:
name: jaeger-spark-dependencies
spec:
schedule: "*/1 * * * *"
jobTemplate:
spec:
template:
spec:
containers:
- name: hello
image: busybox
args:
- /bin/sh
- -c
- date; echo Hello from the Kubernetes cluster
- name: dependencies
image: jaegertracing/spark-dependencies
env:
- name: STORAGE
value: "elasticsearch"
- name: ES_NODES
value: "jaeger-elasticsearch:9200"
- name: ES_USERNAME
value: "elastic"
- name: ES_PASSWORD
value: "changeme"
restartPolicy: OnFailure
1.2 查看任务状态
kubectl get cj
kubectl describe cj jaeger-spark-dependencies
1.3 依赖关系结果
[
{
"parent": "frontend",
"callCount": 92,
"source": "jaeger",
"child": "frontend"
},
{
"parent": "frontend",
"callCount": 4,
"source": "jaeger",
"child": "driver"
},
{
"parent": "frontend",
"callCount": 40,
"source": "jaeger",
"child": "route"
},
{
"parent": "frontend",
"callCount": 4,
"source": "jaeger",
"child": "customer"
},
{
"parent": "mysql",
"callCount": 4,
"source": "jaeger",
"child": "mysql"
},
{
"parent": "boot-service",
"callCount": 1030,
"source": "jaeger",
"child": "boot-service"
},
{
"parent": "customer",
"callCount": 4,
"source": "jaeger",
"child": "mysql"
},
{
"parent": "driver",
"callCount": 54,
"source": "jaeger",
"child": "redis"
}
]
二、依赖任务实现
io.jaegertracing.spark.dependencies.elastic.ElasticsearchDependenciesJob
整体思路是:
1)从ES集群查询数据,获取span列表
2)根据traceId将Span进行分组
3)计算每个分组中所有span的依赖关系
image.png
4)聚合依赖关系,并写入ES
三、依赖Http接口实现
cmd/query/app/http_handler.go:dependencies
用户需要传入时间范围,然后获取数据。
// 请求示例URL http://localhost:16686/api/dependencies?endTs=1574408925476&lookback=604800000
func (aH *APIHandler) dependencies(w http.ResponseWriter, r *http.Request) {
endTsMillis, err := strconv.ParseInt(r.FormValue(endTsParam), 10, 64)
if aH.handleError(w, errors.Wrapf(err, "unable to parse %s", endTimeParam), http.StatusBadRequest) {
return
}
var lookback time.Duration
if formValue := r.FormValue(lookbackParam); len(formValue) > 0 {
lookback, err = time.ParseDuration(formValue + "ms")
if aH.handleError(w, errors.Wrapf(err, "unable to parse %s", lookbackParam), http.StatusBadRequest) {
return
}
}
service := r.FormValue(serviceParam)
if lookback == 0 {
lookback = defaultDependencyLookbackDuration
}
endTs := time.Unix(0, 0).Add(time.Duration(endTsMillis) * time.Millisecond)
// 查询指定范围内的依赖关系
dependencies, err := aH.queryService.GetDependencies(endTs, lookback)
if aH.handleError(w, err, http.StatusInternalServerError) {
return
}
// 如果用户传入服务名,根据服务名过滤依赖
filteredDependencies := aH.filterDependenciesByService(dependencies, service)
// 以parent和child为key,聚合数据
structuredRes := structuredResponse{
Data: aH.deduplicateDependencies(filteredDependencies),
}
aH.writeJSON(w, r, &structuredRes)
}
四、参考文档
https://github.com/openzipkin/zipkin-dependencies
https://github.com/jaegertracing/spark-dependencies
https://segmentfault.com/a/1190000021063578
https://www.jianshu.com/p/e26c5a9b5261
网友评论