以源码中 sqlserver 为例
var context = _tracingContext.CreateExitSegmentContext(ResolveOperationName(sqlCommand), _peerFormatter.GetDbPeer(sqlCommand.Connection)); context.Span.SpanLayer = Tracing.Segments.SpanLayer.DB; context.Span.Component = Common.Components.SQLCLIENT; context.Span.AddTag(Common.Tags.DB_TYPE, "sql"); context.Span.AddTag(Common.Tags.DB_INSTANCE, sqlCommand.Connection.Database); context.Span.AddTag(Common.Tags.DB_STATEMENT, sqlCommand.CommandText);
public void AfterExecuteCommand() { var context = _contextAccessor.Context; if (context != null) { _tracingContext.Release(context); } }
上面连第一个方法为 初始化上下文,设置 当前请求 的sql,数据库, 第二个方法 为 可以理解为 将刚刚记录 请求 sql 的数据推送到 发送准备区,添加到 ConcurrentQueue<SegmentRequest> 这个 对列中 下面为添加到队列代码,中间有个map映射,将SegmentContext转为了SegmentRequest public bool Dispatch(SegmentContext segmentContext) { if (!_runtimeEnvironment.Initialized || segmentContext == null || !segmentContext.Sampled) return false; // todo performance optimization for ConcurrentQueue if (_config.QueueSize < _offset || _cancellation.IsCancellationRequested) return false; var segment = _segmentContextMapper.Map(segmentContext); if (segment == null) return false; _segmentQueue.Enqueue(segment); Interlocked.Increment(ref _offset); _logger.Debug($"Dispatch trace segment. [SegmentId]={segmentContext.SegmentId}."); return true; }
最后在 InstrumentationHostedService找到StartAsync方法在查找下一层,此处有一个循环 遍历 IEnumerable<IExecutionService> _server 集合下接口的对象, IExecutionService=>ExecutionService(抽象类)=>具体实现类(CLRStatsService.SegmentReportService等) 然后在ExecutionService 定义timer 定时,timer 参数根据子类实现,大部份为30s 然后调用timer 中的回调函数,在通过此函数调用到实现类的函数。 到了具体实现类 protected override Task ExecuteAsync(CancellationToken cancellationToken) { return _dispatcher.Flush(cancellationToken); } public Task Flush(CancellationToken token = default(CancellationToken)) { // todo performance optimization for ConcurrentQueue //var queued = _segmentQueue.Count; //var limit = queued <= _config.PendingSegmentLimit ? queued : _config.PendingSegmentLimit; var limit = _config.BatchSize; var index = 0; var segments = new List<SegmentRequest>(limit); while (index++ < limit && _segmentQueue.TryDequeue(out var request)) { segments.Add(request); Interlocked.Decrement(ref _offset); } // send async if (segments.Count > 0) _segmentReporter.ReportAsync(segments, token); Interlocked.Exchange(ref _offset, _segmentQueue.Count); return Task.CompletedTask; }
Flush() 主要是把 前面ConcurrentQueue<SegmentRequest> 队列里的数据取出来 ,转为 List<SegmentRequest>里,最后执行Grpc发送