skyapm-dotnet 源码执行

发布时间 2023-09-03 15:42:09作者: 孤海飞雁
监听System.Data.SqlClient 为例
通过观察者模式和 DiagnosticListener获取监听数据,
在开始InstrumentationHostedService实现IHostedService启动
然后通过 DiagnosticListener.AllListeners.Subscribe();监听
 
然后 TracingDiagnosticProcessorObserver : IObserver<DiagnosticListener>{

        private readonly IEnumerable<ITracingDiagnosticProcessor> _tracingDiagnosticProcessors;

  public void OnNext(DiagnosticListener listener)
  {
      foreach (var diagnosticProcessor in _tracingDiagnosticProcessors.Distinct(x => x.ListenerName))
      {
          if (listener.Name == diagnosticProcessor.ListenerName)
          {
              Subscribe(listener, diagnosticProcessor);
              _logger.Information(
                  $"Loaded diagnostic listener [{diagnosticProcessor.ListenerName}].");
          }
      }
  }
}
通过next()获取所有监听,然后注入 ITracingDiagnosticProcessor 下的自定义诊断对象,获取诊断名称(ITracingDiagnosticProcessor 为所有诊断的对象的父接口 )
通过遍历对比,然后订阅,订阅前会先将诊断对象获取它的 方法,以及方法里的参数,和参数对应的特性类型(ObjectAttribute,PropertyAttribute
,这两个特性类型会存入对应特性属性值中后续在获取加监听对象的匿名参数时会获取到
),并将获取到的方法存入集合,还会将对象集合转为字典,转为字典作用就是在注册时
检查是否重复注册(   listener.Subscribe(diagnosticProcessor, diagnosticProcessor.IsEnabled);第二个参数执行委托返回bool,判断啊诊断名称是否存在)

//订阅代码

protected virtual void Subscribe(DiagnosticListener listener,
ITracingDiagnosticProcessor tracingDiagnosticProcessor)
{
   var diagnosticProcessor = new TracingDiagnosticObserver(tracingDiagnosticProcessor, _loggerFactory);
   listener.Subscribe(diagnosticProcessor, diagnosticProcessor.IsEnabled);
}






  

TracingDiagnosticObserver : IObserver<KeyValuePair<string, object>>  订阅明细

  _methodCollection = new TracingDiagnosticMethodCollection(tracingDiagnosticProcessor)
.  ToDictionary(method => method.DiagnosticName);


这里 通过 字典里的名称执行

public void OnNext(KeyValuePair<string, object> value)
{
  if (!_methodCollection.TryGetValue(value.Key, out var method))
  return;

  try
  {
  method.Invoke(value.Key, value.Value);
  }
  catch (Exception exception)
  {
  _logger.Error("Invoke diagnostic method exception.", exception);
  }
}

 

 

TracingDiagnosticMethod.cs

   private readonly IParameterResolver[] _parameterResolvers;

 

这里通过前面注册时会有一个 参数数组,和传入的参数

通过Resolve 里(这里ObjectAttribute和 PropertyAttribute)抽象类ParameterBinderAttribute 为了前面注册时获取两个特性值

ObjectAttribute 时直接返回传入对象,(_parameterResolvers[i] 里的参数)

ParameterBinderAttribute 是  

{

var property = value.GetType().GetProperty(Name);

return property?.GetReflector()?.GetValue(value);

}

//执行方法和参数

public void Invoke(string diagnosticName, object value)
{
  if (_diagnosticName != diagnosticName)
  {
  r  eturn;
  }

  var args = new object[_parameterResolvers.Length];
  or (var i = 0; i < _parameterResolvers.Length; i++)
  {
    args[i] = _parameterResolvers[i].Resolve(value);
  }

  _reflector.Invoke(_tracingDiagnosticProcessor, args);
}





  

 执行到 这诊断对象
 
 先创建上下文,添加前面传入的参数值到上下文

[DiagnosticName(SqlClientDiagnosticStrings.SqlBeforeExecuteCommand)] public void BeforeExecuteCommand([Property(Name = "Command")] DbCommand sqlCommand) { var context = _tracingContext.CreateExitSegmentContext(ResolveOperationName(sqlCommand), _peerFormatter.GetDbPeer(sqlCommand.Connection)); context.Span.SpanLayer = Tracing.Segments.SpanLayer.DB; context.Span.Component = Common.Components.SQLCLIENT; context.Span.AddTag(Common.Tags.DB_TYPE, "sql"); context.Span.AddTag(Common.Tags.DB_INSTANCE, sqlCommand.Connection.Database); context.Span.AddTag(Common.Tags.DB_STATEMENT, sqlCommand.CommandText); }
  //执行完成后发布,通过注入ITracingContext 上下文

  [DiagnosticName(SqlClientDiagnosticStrings.SqlAfterExecuteCommand)]
  public void AfterExecuteCommand()
  {
    var context = _contextAccessor.Context;
    if (context != null)
    {
      _tracingContext.Release(context);
    }
  }

 

TracingContext.cs

在Release 方法

通过ISegmentDispatcher _segmentDispatcher
 ConcurrentQueue<SegmentRequest> _segmentQueue;

_segmentDispatcher.Dispatch(segmentContext);将行下文 写入 队列中

InstrumentStartup.cs

最后通过GRPC发送
在监听的上一行有先买你的foreach
 DiagnosticListener.AllListeners.Subscribe(_observer);

注入了 IEnumerable<IExecutionService> _services;这个接口获取了 所有grpc 推送,差不多有些话要做一些数据准八日,如cpu 等

  foreach (var service in _services)
  await service.StartAsync(cancellationToken);


ExecutionService.cs
通过Time 定时 执行一次远程推送

public Task StartAsync(CancellationToken cancellationToken = default(CancellationToken))
{
  _cancellationTokenSource = new CancellationTokenSource();
  var source = CancellationTokenSource.CreateLinkedTokenSource(_cancellationTokenSource.Token, cancellationToken);
  _timer = new Timer(Callback, source, DueTime, Period);//
  Logger.Information($"Loaded instrument service [{GetType().FullName}].");
  return Task.CompletedTask;
}

private async void Callback(object state)
{
  if (!(state is CancellationTokenSource token) || token.IsCancellationRequested || !CanExecute()) return;

  try
  {
  await ExecuteAsync(token.Token);
  }
  catch (Exception ex)
  {
  Logger.Error(GetType().FullName + ".ExecuteAsync(token.Token) fail", ex);
  }
}