public class MyTMutiplexedProcessor implements TProcessor { private static final Logger logger = LoggerFactory.getLogger(ThriftPoolServerStartThread.class); private final Map<String, TProcessor> SERVICE_PROCESSOR_MAP = new HashMap();
public void registerProcessor(String serviceName, TProcessor processor) { this.SERVICE_PROCESSOR_MAP.put(serviceName, processor); } @Override public boolean process(TProtocol inProtocol, TProtocol outProtocol1) throws TException { TMessage message = inProtocol.readMessageBegin(); logger.info("service name {}",message.name); if (message.type != 1 && message.type != 4) { throw new TException("This should not have happened!?"); } else { int index = message.name.indexOf(":"); if (index < 0) { throw new TException("Service name not found in message name: " + message.name + ". Did you " + "forget to use a TMultiplexProtocol in your client?"); } else { String serviceName = message.name.substring(0, index);
TProcessor actualProcessor = (TProcessor)this.SERVICE_PROCESSOR_MAP.get(serviceName); if (actualProcessor == null) { throw new TException("Service name not found: " + serviceName + ". Did you forget " + "to call registerProcessor()?"); } else { logger.info("rpc start"); TMessage standardMessage = new TMessage(message.name.substring(serviceName.length() + ":".length()), message.type, message.seqid); Span span=TracerConfig.getTracer().newTrace().name(message.name.substring(serviceName.length() + ":".length())).start(); boolean result=actualProcessor.process(new MyTMutiplexedProcessor.StoredMessageProtocol(inProtocol, standardMessage), outProtocol1); span.finish(); logger.info("rpc end"); return result; } } } }
private static class StoredMessageProtocol extends TProtocolDecorator { TMessage messageBegin;