【zipkin/jaeger使用系列】dubbo v2.7.3使用zipkin进行链路监控

本文背景

因为:
1.dubbo已经从apache毕业,而zipkin官方的brave-instrumentation-dubbo-rpc的5.6版本支持的dubbo版本是2.6.6版本、且对2.7.3版本支持不太友好(2.6.6这个版本是未毕业前的版本,不太喜欢),只能把brave-instrumentation-dubbo-rpc抠出来,用以支撑2.7.3版本。

2.dubbo官方提供的例子还是xml配置版本,现在都9102年了,谁还用XML配置版本。

因为以上两点,有了此文。

provider方工程代码

添加pom依赖

1.brave对dubbo的集成:brave-instrumentation-dubbo-rpc
2.brave的spring bean支持:brave-spring-beans
3.在SLF4J的MDC(Mapped Diagnostic Context) 中支持 traceId 和 spanId
4.使用okhttp3作为 reporter:zipkin-sender-okhttp3
当然,最好是同时添加bom,做好版本控制。

将TracingFilter扣出来进行改造

将brave-instrumentation-dubbo-rpc的5.6.6版本的TracingFilter抠出来进行改造,即实现org.apache.dubbo.rpc.Fiter的invoke方法。
brave-instrumentation-dubbo-rpc的5.6.6版本的TracingFilter实现的是com.alibaba.dubbo.rpc.Fiter。

package com.youxia.userinfo.config;

import brave.Span;
import brave.Tracer;
import brave.Tracing;
import brave.internal.Platform;
import brave.propagation.Propagation;
import brave.propagation.TraceContext;
import brave.propagation.TraceContextOrSamplingFlags;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.remoting.exchange.ResponseCallback;
import com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.*;

import org.apache.dubbo.rpc.support.RpcUtils;


import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.Future;

@Activate(group = {Constants.PROVIDER, Constants.CONSUMER}, value = "tracing")
public final class TracingFilter implements Filter {

Tracer tracer;
TraceContext.Extractor<Map<String, String>> extractor;
TraceContext.Injector<Map<String, String>> injector;
volatile boolean isInit = false;

public void setTracing(Tracing tracing) {
tracer = tracing.tracer();
extractor = tracing.propagation().extractor(GETTER);
injector = tracing.propagation().injector(SETTER);
isInit = true;
}


static void parseRemoteAddress(RpcContext rpcContext, Span span) {
InetSocketAddress remoteAddress = rpcContext.getRemoteAddress();
if (remoteAddress == null) return;
span.remoteIpAndPort(Platform.get().getHostString(remoteAddress), remoteAddress.getPort());
}

static void onError(Throwable error, Span span) {
span.error(error);
if (error instanceof RpcException) {
span.tag("userinfo.error_code", Integer.toString(((RpcException) error).getCode()));
}
}

static final Propagation.Getter<Map<String, String>, String> GETTER =
new Propagation.Getter<Map<String, String>, String>() {
@Override
public String get(Map<String, String> carrier, String key) {
return carrier.get(key);
}

@Override
public String toString() {
return "Map::get";
}
};

static final Propagation.Setter<Map<String, String>, String> SETTER =
new Propagation.Setter<Map<String, String>, String>() {
@Override
public void put(Map<String, String> carrier, String key, String value) {
carrier.put(key, value);
}

@Override
public String toString() {
return "Map::set";
}
};

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (isInit == false) return invoker.invoke(invocation);

RpcContext rpcContext = RpcContext.getContext();
Span.Kind kind = rpcContext.isProviderSide() ? Span.Kind.SERVER : Span.Kind.CLIENT;
final Span span;
if (kind.equals(Span.Kind.CLIENT)) {
span = tracer.nextSpan();
injector.inject(span.context(), invocation.getAttachments());
} else {
TraceContextOrSamplingFlags extracted = extractor.extract(invocation.getAttachments());
span = extracted.context() != null
? tracer.joinSpan(extracted.context())
: tracer.nextSpan(extracted);
}

if (!span.isNoop()) {
span.kind(kind);
String service = invoker.getInterface().getSimpleName();
String method =RpcUtils.getMethodName(invocation);
span.name(service + "/" + method);
parseRemoteAddress(rpcContext, span);
span.start();
}

boolean isOneway = false, deferFinish = false;
try (Tracer.SpanInScope scope = tracer.withSpanInScope(span)) {
Result result = invoker.invoke(invocation);
if (result.hasException()) {
onError(result.getException(), span);
}
isOneway = RpcUtils.isOneway(invoker.getUrl(), invocation);
Future<Object> future = rpcContext.getFuture(); // the case on async client invocation
if (future instanceof FutureAdapter) {
deferFinish = true;
((FutureAdapter) future).getFuture().setCallback(new TracingFilter.FinishSpanCallback(span));
}
return result;
} catch (Error | RuntimeException e) {
onError(e, span);
throw e;
} finally {
if (isOneway) {
span.flush();
} else if (!deferFinish) {
span.finish();
}
}
}

static final class FinishSpanCallback implements ResponseCallback {
final Span span;

FinishSpanCallback(Span span) {
this.span = span;
}

@Override public void done(Object response) {
span.finish();
}

@Override public void caught(Throwable exception) {
onError(exception, span);
span.finish();
}
}
}

在resource目录增加/META-INF/dubbo/org.apache.dubbo.rpc.Filter文件

org.apache.dubbo.rpc.Filter文件的内容如下:

tracing=com.youxia.userinfo.config.TracingFilter

配置zipkin客户端

ZipkinConfig内容如下:

package com.youxia.userinfo.config;

import brave.context.slf4j.MDCScopeDecorator;
import brave.propagation.CurrentTraceContext;
import brave.spring.beans.CurrentTraceContextFactoryBean;
import brave.spring.beans.TracingFactoryBean;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import zipkin2.codec.SpanBytesEncoder;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.Sender;
import zipkin2.reporter.beans.AsyncReporterFactoryBean;
import zipkin2.reporter.okhttp3.OkHttpSender;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;

@Configuration
public class ZipkinConfig {

@Value("${zipkin.server.url}")
private String ZipkinServerUrl;

@Bean("okHttpSender")
public Sender okHttpSender() {
Sender sender= OkHttpSender.create(ZipkinServerUrl);
return sender;
}

@Bean("reporter")
public AsyncReporter getAsyncReporter(){
AsyncReporter asyncReporter=AsyncReporter.builder(okHttpSender()).closeTimeout(50000, TimeUnit.MILLISECONDS).build(SpanBytesEncoder.JSON_V2);
return asyncReporter;
}

@Bean
public AsyncReporterFactoryBean reporter(@Qualifier("okHttpSender")OkHttpSender sender){
AsyncReporterFactoryBean asyncReporterFactoryBean = new AsyncReporterFactoryBean();
asyncReporterFactoryBean.setSender(sender);
asyncReporterFactoryBean.setCloseTimeout(3000);
return asyncReporterFactoryBean;
}

@Bean
public TracingFactoryBean getTracingBean(@Qualifier("reporter") AsyncReporter reporter){
TracingFactoryBean tracingFactoryBean=new TracingFactoryBean();
tracingFactoryBean.setLocalServiceName("userinfo-service");
CurrentTraceContextFactoryBean currentTraceContextFactoryBean = new CurrentTraceContextFactoryBean();
CurrentTraceContext.ScopeDecorator scopeDecorator = MDCScopeDecorator.create();
currentTraceContextFactoryBean.setScopeDecorators(Arrays.asList(scopeDecorator));
tracingFactoryBean.setCurrentTraceContext(currentTraceContextFactoryBean.getObject());
tracingFactoryBean.setSpanReporter(reporter);
return tracingFactoryBean;
}
}

dubbo注解文件添加filter

业务实现类添加filter:

package com.youxia.userinfo.service.impl;

import com.alibaba.dubbo.config.annotation.Service;
import com.youxia.userinfo.domain.User;
import com.youxia.userinfo.service.UserService;

@Service(filter = {"tracing"})
public class UserServiceImpl implements UserService {
@Override
public User saveUser(User user) {
user.setUserId(1);
user.setUserName(user.getUserName());
System.out.println(user.toString());
return user;
}
}

provider配置文件

完整的application.properties:

spring.application.name=UserInfoService
server.address=10.3.20.57
server.port=28081

dubbo.registry.address=redis://192.168.172.4:6380
dubbo.protocol.name=dubbo
dubbo.protocol.port=28080
dubbo.scan.base-packages=com.youxia.userinfo.service
dubbo.application.qos.enable=true

zipkin.server.url=http://192.168.172.6:9411/api/v2/spans

consumer工程代码

业务逻辑处理类代码

package com.youxia.service.user;

import com.youxia.userinfo.domain.User;
import com.youxia.userinfo.service.UserService;
import org.apache.dubbo.config.annotation.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
public class UserServiceImpl {
private static final Logger LOGGER = LoggerFactory.getLogger(UserServiceImpl.class);

@Reference
private UserService userService;

public User sayHello(User user) {
return userService.saveUser(user);
}
}

application.properties文件内容

server.port=28082
server.address=10.3.20.57
dubbo.registry.protocol=redis
dubbo.registry.address=redis://192.168.172.4:6380
dubbo.application.name=dubbo-demo-service
dubbo.scan.base-packages=com.youxia.service.user

最终效果

完整代码地址

https://github.com/youxia999/spring_boot_service/apache_dubbo_zipkin_project.git