【jaeger源码分析系列】jaeger collector源码分析
TChannel 接收 agent 提交过来的数据
github.com/jaegertracing/jaeger/cmd/collector/app/span_handler.go #69
func (jbh *jaegerBatchesHandler) SubmitBatches(ctx thrift.Context, batches []*jaeger.Batch) ([]*jaeger.BatchSubmitResponse, error) { |
经过处理后数据放入BoundedQueue(有界队列)
github.com/jaegertracing/jaeger/cmd/collector/app/span_processor.go #130
func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat string) bool { |
启用50个goroutine,处理队列消息
github.com/jaegertracing/jaeger/pkg/queue/bounded_queue.go #53
func (q *BoundedQueue) StartConsumers(num int, consumer func(item interface{})) { |
collector 在处理队列数据的时候和agent一样,处理不完会直接扔掉
可以通过配置参数优化
–collector.num-workers (default 50)
–collector.queue-size (default 2000)
增加collector服务节点
从队列拿出来后经过处理,把数据存入cassandra数据库
github.com/jaegertracing/jaeger/cmd/collector/app/span_processor.go #101
func (sp *spanProcessor) saveSpan(span *model.Span) { |
github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore/writer.go #122
func (s *SpanWriter) WriteSpan(span *model.Span) error { |
保存saveServiceNameAndOperationName,collector借助缓存(key/value 和 lru)
借助缓存,Jaeger实现不重复写入Service和OperationName,是否已经写入通过缓存判断,不查询cassandra,减少了查询压力。
github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore/service_names.go #69
func (s *ServiceNamesStorage) Write(serviceName string) error { |
在默认情况下ServiceName缓存长度为10000,OperationName缓存长度十万,如果超过限制重复写入。从实际上考虑这样的限制是否够用?其实Uber发展到现在也只有1000多个ServiceName,所以这个设置可以满足很多公司。