package com.youxia.userinfo.config;
  import brave.Span; import brave.Tracer; import brave.Tracing; import brave.internal.Platform; import brave.propagation.Propagation; import brave.propagation.TraceContext; import brave.propagation.TraceContextOrSamplingFlags;
  import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.remoting.exchange.ResponseCallback; import com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter; import org.apache.dubbo.common.extension.Activate; import org.apache.dubbo.rpc.*;
  import org.apache.dubbo.rpc.support.RpcUtils;
 
  import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.Future;
  @Activate(group = {Constants.PROVIDER, Constants.CONSUMER}, value = "tracing") public final class TracingFilter implements Filter {
      Tracer tracer;     TraceContext.Extractor<Map<String, String>> extractor;     TraceContext.Injector<Map<String, String>> injector;     volatile boolean isInit = false;
      public void setTracing(Tracing tracing) {         tracer = tracing.tracer();         extractor = tracing.propagation().extractor(GETTER);         injector = tracing.propagation().injector(SETTER);         isInit = true;     }
 
      static void parseRemoteAddress(RpcContext rpcContext, Span span) {         InetSocketAddress remoteAddress = rpcContext.getRemoteAddress();         if (remoteAddress == null) return;         span.remoteIpAndPort(Platform.get().getHostString(remoteAddress), remoteAddress.getPort());     }
      static void onError(Throwable error, Span span) {         span.error(error);         if (error instanceof RpcException) {             span.tag("userinfo.error_code", Integer.toString(((RpcException) error).getCode()));         }     }
      static final Propagation.Getter<Map<String, String>, String> GETTER =             new Propagation.Getter<Map<String, String>, String>() {                 @Override                 public String get(Map<String, String> carrier, String key) {                     return carrier.get(key);                 }
                  @Override                 public String toString() {                     return "Map::get";                 }             };
      static final Propagation.Setter<Map<String, String>, String> SETTER =             new Propagation.Setter<Map<String, String>, String>() {                 @Override                 public void put(Map<String, String> carrier, String key, String value) {                     carrier.put(key, value);                 }
                  @Override                 public String toString() {                     return "Map::set";                 }             };
      @Override     public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {         if (isInit == false) return invoker.invoke(invocation);
          RpcContext rpcContext = RpcContext.getContext();         Span.Kind kind = rpcContext.isProviderSide() ? Span.Kind.SERVER : Span.Kind.CLIENT;         final Span span;         if (kind.equals(Span.Kind.CLIENT)) {             span = tracer.nextSpan();             injector.inject(span.context(), invocation.getAttachments());         } else {             TraceContextOrSamplingFlags extracted = extractor.extract(invocation.getAttachments());             span = extracted.context() != null                     ? tracer.joinSpan(extracted.context())                     : tracer.nextSpan(extracted);         }
          if (!span.isNoop()) {             span.kind(kind);             String service = invoker.getInterface().getSimpleName();             String method =RpcUtils.getMethodName(invocation);             span.name(service + "/" + method);             parseRemoteAddress(rpcContext, span);             span.start();         }
          boolean isOneway = false, deferFinish = false;         try (Tracer.SpanInScope scope = tracer.withSpanInScope(span)) {             Result result = invoker.invoke(invocation);             if (result.hasException()) {                 onError(result.getException(), span);             }             isOneway = RpcUtils.isOneway(invoker.getUrl(), invocation);             Future<Object> future = rpcContext.getFuture(); // the case on async client invocation             if (future instanceof FutureAdapter) {                 deferFinish = true;                 ((FutureAdapter) future).getFuture().setCallback(new TracingFilter.FinishSpanCallback(span));             }             return result;         } catch (Error | RuntimeException e) {             onError(e, span);             throw e;         } finally {             if (isOneway) {                 span.flush();             } else if (!deferFinish) {                 span.finish();             }         }     }
      static final class FinishSpanCallback implements ResponseCallback {         final Span span;
          FinishSpanCallback(Span span) {             this.span = span;         }
          @Override public void done(Object response) {             span.finish();         }
          @Override public void caught(Throwable exception) {             onError(exception, span);             span.finish();         }     } }
   |