Skyapn-Dotnet源码 推送消息到远程服务器

发布时间 2023-10-08 22:29:01作者: 孤海飞雁

以源码中 sqlserver 为例

  var context = _tracingContext.CreateExitSegmentContext(ResolveOperationName(sqlCommand),
      _peerFormatter.GetDbPeer(sqlCommand.Connection));
  context.Span.SpanLayer = Tracing.Segments.SpanLayer.DB;
  context.Span.Component = Common.Components.SQLCLIENT;
  context.Span.AddTag(Common.Tags.DB_TYPE, "sql");
  context.Span.AddTag(Common.Tags.DB_INSTANCE, sqlCommand.Connection.Database);
  context.Span.AddTag(Common.Tags.DB_STATEMENT, sqlCommand.CommandText);


  

 public void AfterExecuteCommand()
 {
     var context = _contextAccessor.Context;
     if (context != null)
     {
         _tracingContext.Release(context);
     }
 }


上面连第一个方法为 初始化上下文,设置 当前请求 的sql,数据库,

第二个方法 为 可以理解为 将刚刚记录 请求 sql 的数据推送到 发送准备区,添加到 ConcurrentQueue<SegmentRequest> 这个 对列中
下面为添加到队列代码,中间有个map映射,将SegmentContext转为了SegmentRequest
public bool Dispatch(SegmentContext segmentContext)
{
if (!_runtimeEnvironment.Initialized || segmentContext == null || !segmentContext.Sampled)
return false;

// todo performance optimization for ConcurrentQueue
if (_config.QueueSize < _offset || _cancellation.IsCancellationRequested)
return false;

var segment = _segmentContextMapper.Map(segmentContext);

if (segment == null)
return false;

_segmentQueue.Enqueue(segment);

Interlocked.Increment(ref _offset);

_logger.Debug($"Dispatch trace segment. [SegmentId]={segmentContext.SegmentId}.");
return true;
}
最后在  InstrumentationHostedService找到StartAsync方法在查找下一层,此处有一个循环

遍历 IEnumerable<IExecutionService>  _server 集合下接口的对象,

IExecutionService=>ExecutionService(抽象类)=>具体实现类(CLRStatsService.SegmentReportService等)

然后在ExecutionService  定义timer 定时,timer 参数根据子类实现,大部份为30s

然后调用timer 中的回调函数,在通过此函数调用到实现类的函数。

到了具体实现类 

protected override Task ExecuteAsync(CancellationToken cancellationToken)
{
  return _dispatcher.Flush(cancellationToken);
}



public Task Flush(CancellationToken token = default(CancellationToken))
{
// todo performance optimization for ConcurrentQueue
//var queued = _segmentQueue.Count;
//var limit = queued <= _config.PendingSegmentLimit ? queued : _config.PendingSegmentLimit;
var limit = _config.BatchSize;
var index = 0;
var segments = new List<SegmentRequest>(limit);
while (index++ < limit && _segmentQueue.TryDequeue(out var request))
{
segments.Add(request);
Interlocked.Decrement(ref _offset);
}

// send async
if (segments.Count > 0)
_segmentReporter.ReportAsync(segments, token);

Interlocked.Exchange(ref _offset, _segmentQueue.Count);

return Task.CompletedTask;
}
Flush() 主要是把 前面ConcurrentQueue<SegmentRequest> 队列里的数据取出来 ,转为 List<SegmentRequest>里,最后执行Grpc发送