手写RPC框架---01 Posted on 2024-01-04 18:46:48 2024-12-21 14:33:51 by Author 摘要 手写RPC框架--01,该部分具体讲解了调用方的实现方式,以及SPI的具体过程。 # 手写RPC框架---01 1. 一个完整的RPC框架包含:客户端存根,服务端存根,服务注册与发现,序列化和反序列化,通信协议,以及负载均衡。本项目,对于服务注册与发现,序列化和反序列化,以及负载均衡模块使用SPI形式进行加载,可以自己扩展。对于通信协议选用的TCP协议,负载均衡模块选用的算法是轮询算法和一致性hash算法,服务注册与发现使用的Zookeeper,本人经过自己扩展,加入了nacos。客户端与服务端之间网络应用程序框架选用的是Netty,下图展示了整体的框架图。  2. 上面讲解了RPC框架图,接下来,就是具体实现,首先,对于调用方(消费者)而言,需要调用远程提供的服务,那么在调用方程序启动的时候,需要进行自动装配一些组件,比如启动服务发现组件,根据配置文件,选择合适的序列化组件,以及负载均衡组件。这些组件在消费方服务启动的时候会自动装配,所以这里借鉴SpringBoot的自动装配的方式,使用SPI机制以及bean加载过程和反射机制来完成这项工作。具体方式如下所示: - 首先定义一个注解,该注解在Spring项目启动的时候会自动加载一个ConsumerPostProcessor配置类,该类通过反射机制以及SPI机制动态加载相应的组件 ```java @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Import(ConsumerPostProcessor.class) public @interface EnableConsumerRpc { } ``` - ConsumerPostProcessor类是一个配置类,该类实现了BeanPostProcessor, EnvironmentAware, InitializingBean 三个接口。 - 其中,实现BeanPostProcessor接口的作用是通过反射机制,找到项目中以@RpcReference注解的自动,然后将该字段对应的属性更改成代理对象,该代理对象封装了服务发现,负载均衡,协议编码以及和服务端进行网络通信的功能模块,即封装了底层的实现功能,使得消费者调用该对象的方法时,就和调用本地方法一样,屏蔽了底层具体实现的过程。 ```java public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { // 获取所有字段 final Field[] fields = bean.getClass().getDeclaredFields(); // 遍历所有字段找到 RpcReference 注解的字段 for (Field field : fields) { if(field.isAnnotationPresent(RpcReference.class)){ final RpcReference rpcReference = field.getAnnotation(RpcReference.class); final Class<?> aClass = field.getType(); field.setAccessible(true); Object object = null; try { // 创建代理对象 object = Proxy.newProxyInstance( aClass.getClassLoader(), new Class<?>[]{aClass}, new RpcInvokerProxy(rpcReference.serviceVersion() rpcReference.timeout(),rpcReference.faultTolerant(), rpcReference.loadBalancer(),rpcReference.retryCount())); } catch (Exception e) { e.printStackTrace(); } try { // 将代理对象设置给字段 field.set(bean,object); field.setAccessible(false); logger.info(beanName + " field:" + field.getName() + "注入成功"); } catch (IllegalAccessException e) { e.printStackTrace(); logger.info(beanName + " field:" + field.getName() + "注入失败"); } } } return bean; } ``` - 实现InitializingBean 接口由SPI机制根据用户自定义配置动态加载其配置的组件,比如加载序列化方式(Hessian,xml, json),注册中心的类型(Zookeeper,Nacos),以及负载均衡的策略(一致性hash,轮询机制)。 ```java @Override public void afterPropertiesSet() throws Exception { SerializationFactory.init(); RegistryFactory.init(); LoadBalancerFactory.init(); FilterConfig.initClientFilter(); } ``` - 实现EnvironmentAware接口的作用是将配置文件(application.yml或者其他配置文件)初始化为项目的上下文环境,使得消费端应用的环境设置为用户自定义的。 ```java @Override public void setEnvironment(Environment environment) { RpcProperties properties = RpcProperties.getInstance(); PropertiesUtils.init(properties,environment); rpcProperties = properties; logger.info("读取配置文件成功"); } ``` - 接下来就是具体讲讲代理对象RpcInvokerProxy,该对象使用的JDK方式创建的。该代理对象在invoke方法中主要做了以下工作,首先,构造了对请求的数据进行序列,然后进行协议编码,之后根据调用的服务从请求中心发现该服务的配置信息,然后根据负载均衡策略选择合适的服务提供方,进行调用远程服务。在进行调用远程服务时候,使用了Netty框架,该框架的调用是由RpcConsumer封装起来的。具体的代理对象实现如下所示。 ```java public class RpcInvokerProxy implements InvocationHandler { private String serviceVersion; private long timeout; private String loadBalancerType; private String faultTolerantType; private long retryCount; public RpcInvokerProxy(String serviceVersion, long timeout,String faultTolerantType,String loadBalancerType,long retryCount) throws Exception { this.serviceVersion = serviceVersion; this.timeout = timeout; this.loadBalancerType = loadBalancerType; this.faultTolerantType = faultTolerantType; this.retryCount = retryCount; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RpcProtocol<RpcRequest> protocol = new RpcProtocol<>(); // 构建消息头 MsgHeader header = new MsgHeader(); long requestId = RpcRequestHolder.REQUEST_ID_GEN.incrementAndGet(); header.setMagic(ProtocolConstants.MAGIC); header.setVersion(ProtocolConstants.VERSION); header.setRequestId(requestId); final byte[] serialization = RpcProperties.getInstance().getSerialization().getBytes(); header.setSerializationLen(serialization.length); header.setSerializations(serialization); header.setMsgType((byte) MsgType.REQUEST.ordinal()); header.setStatus((byte) 0x1); protocol.setHeader(header); // 构建请求体 RpcRequest request = new RpcRequest(); request.setServiceVersion(this.serviceVersion); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); for (int i = 0; i < args.length; i++) { System.out.println("-------"); System.out.println(args[i].getClass().getTypeName()); System.out.println("-------"); } request.setData(args); Class<?>[] classes = new Class[args.length]; for (int i = 0; i < args.length; i++) { classes[i] = args[i].getClass(); } request.setDataClass(classes); request.setServiceAttachments(RpcProperties.getInstance().getServiceAttachments()); request.setClientAttachments(RpcProperties.getInstance().getClientAttachments()); // 拦截器的上下文 final FilterData filterData = new FilterData(request); try { FilterConfig.getClientBeforeFilterChain().doFilter(filterData); }catch (Throwable e){ throw e; } protocol.setBody(request); RpcConsumer rpcConsumer = new RpcConsumer(); String serviceName = RpcServiceNameBuilder.buildServiceKey(request.getClassName(), request.getServiceVersion()); Object[] params = {request.getData()}; // 1.获取负载均衡策略 final LoadBalancer loadBalancer = LoadBalancerFactory.get(loadBalancerType); // 2.根据策略获取对应服务 final ServiceMetaRes serviceMetaRes = loadBalancer.select(params, serviceName); ServiceMeta curServiceMeta = serviceMetaRes.getCurServiceMeta(); final Collection<ServiceMeta> otherServiceMeta = serviceMetaRes.getOtherServiceMeta(); long count = 1; long retryCount = this.retryCount; RpcResponse rpcResponse = null; // 重试机制 while (count <= retryCount ){ // 处理返回数据 RpcFuture<RpcResponse> future = new RpcFuture<>(new DefaultPromise<>(new DefaultEventLoop()), timeout); // XXXHolder RpcRequestHolder.REQUEST_MAP.put(requestId, future); try { // 发送消息 rpcConsumer.sendRequest(protocol, curServiceMeta); // 等待响应数据返回 rpcResponse = future.getPromise().get(future.getTimeout(), TimeUnit.MILLISECONDS); // 如果有异常并且没有其他服务 if(rpcResponse.getException()!=null && otherServiceMeta.size() == 0){ throw rpcResponse.getException(); } if (rpcResponse.getException()!=null){ throw rpcResponse.getException(); } log.info("rpc 调用成功, serviceName: {}",serviceName); try { FilterConfig.getClientAfterFilterChain().doFilter(filterData); }catch (Throwable e){ throw e; } return rpcResponse.getData(); }catch (Throwable e){ String errorMsg = e.toString(); // todo 这里的容错机制可拓展,留作业自行更改 switch (faultTolerantType){ // 快速失败 case FailFast: log.warn("rpc 调用失败,触发 FailFast 策略,异常信息: {}",errorMsg); return rpcResponse.getException(); // 故障转移 case Failover: log.warn("rpc 调用失败,第{}次重试,异常信息:{}",count,errorMsg); count++; if (!ObjectUtils.isEmpty(otherServiceMeta)){ final ServiceMeta next = otherServiceMeta.iterator().next(); curServiceMeta = next; otherServiceMeta.remove(next); }else { final String msg = String.format("rpc 调用失败,无服务可用 serviceName: {%s}, 异常信息: {%s}", serviceName, errorMsg); log.warn(msg); throw new RuntimeException(msg); } break; // 忽视这次错误 case Failsafe: return null; } } } throw new RuntimeException("rpc 调用失败,超过最大重试次数: {}" + retryCount); } } ``` - 其中RpcConsumer类封装了Netty的相关组件,向开启一个客户端,并且向请求的服务端进行连接,之后讲封装好的协议编码进行发送给服务端,已完成数据的发送,之后异步的等待服务端进行数据的返回。具体实现如下所示: ```java public class RpcConsumer { private final Bootstrap bootstrap; private final EventLoopGroup eventLoopGroup; private Logger logger = LoggerFactory.getLogger(RpcConsumer.class); public RpcConsumer() { bootstrap = new Bootstrap(); eventLoopGroup = new NioEventLoopGroup(4); bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new RpcEncoder()) .addLast(new RpcDecoder()) .addLast(new RpcResponseHandler()); } }); } /** * 发送请求 * @param protocol 消息 * @param serviceMetadata 服务 * @return 当前服务 * @throws Exception */ public void sendRequest(RpcProtocol<RpcRequest> protocol, ServiceMeta serviceMetadata) throws Exception { if (serviceMetadata != null) { // 连接 ChannelFuture future = bootstrap.connect(serviceMetadata.getServiceAddr(), serviceMetadata.getServicePort()).sync(); future.addListener((ChannelFutureListener) arg0 -> { if (future.isSuccess()) { logger.info("连接 rpc server {} 端口 {} 成功.", serviceMetadata.getServiceAddr(), serviceMetadata.getServicePort()); } else { logger.error("连接 rpc server {} 端口 {} 失败.", serviceMetadata.getServiceAddr(), serviceMetadata.getServicePort()); future.cause().printStackTrace(); eventLoopGroup.shutdownGracefully(); } }); // 写入数据 future.channel().writeAndFlush(protocol); } } } ``` 以上内容就是RPC对调用方具体的封装,调用方只需要根据接口调用相应的方法,RPC框架底层已经封装了服务发现,负载均衡,网络请求以及编码相应的功能。其中部分的组件是由SPI机制进行动态配置。接下来就具体讲讲SPI机制。 3. SPI机制:首先SPI机制使用了Java反射的机制,一般需要在Resource的META-INF下配置开发者对某个组件的接口具体实现的类,每个组件的文件名称为接口的类全路径名称,而文件内容,通常是键值对形式,或者开发者定义的形式,只是在进行解析的时候,根据自定义的形式解析出来即可。 - 文件名称定义如下:  - 里面的内容定义如下:  - SPI具体实现如下: ```java public class ExtensionLoader{ private Logger logger = LoggerFactory.getLogger(ExtensionLoader.class); // 系统SPI private static String SYS_EXTENSION_LOADER_DIR_PREFIX = "META-INF/xrpc/"; // 用户SPI private static String DIY_EXTENSION_LOADER_DIR_PREFIX = "META-INF/rpc/"; private static String[] prefixs = {SYS_EXTENSION_LOADER_DIR_PREFIX, DIY_EXTENSION_LOADER_DIR_PREFIX}; // bean定义信息 key: 定义的key value:具体类 private static Map<String, Class> extensionClassCache = new ConcurrentHashMap<>(); // bean 定义信息 key:接口 value:接口子类s private static Map<String, Map<String,Class>> extensionClassCaches = new ConcurrentHashMap<>(); // 实例化的bean private static Map<String, Object> singletonsObject = new ConcurrentHashMap<>(); private static ExtensionLoader extensionLoader; static { extensionLoader = new ExtensionLoader(); } public static ExtensionLoader getInstance(){ return extensionLoader; } private ExtensionLoader(){ } /** * 获取bean * @param name * @return * @throws IOException * @throws ClassNotFoundException */ public <V> V get(String name) { if (!singletonsObject.containsKey(name)) { try { singletonsObject.put(name, extensionClassCache.get(name).newInstance()); } catch (InstantiationException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } } return (V) singletonsObject.get(name); } /** * 获取接口下所有的类 * @param clazz * @return */ public List<Object> gets(Class clazz) { final String name = clazz.getName(); if (!extensionClassCaches.containsKey(name)) { try { throw new ClassNotFoundException(clazz + "未找到"); } catch (ClassNotFoundException e) { e.printStackTrace(); } } final Map<String, Class> stringClassMap = extensionClassCaches.get(name); List<Object> objects = new ArrayList<>(); if (stringClassMap.size() > 0){ stringClassMap.forEach((k,v)->{ try { objects.add(singletonsObject.getOrDefault(k,v.newInstance())); } catch (InstantiationException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } }); } return objects; } /** * 根据spi机制初加载bean的信息放入map * @param clazz * @throws IOException * @throws ClassNotFoundException */ public void loadExtension(Class clazz) throws IOException, ClassNotFoundException { if (clazz == null) { throw new IllegalArgumentException("class 没找到"); } ClassLoader classLoader = this.getClass().getClassLoader(); Map<String, Class> classMap = new HashMap<>(); // 从系统SPI以及用户SPI中找bean for (String prefix : prefixs) { String spiFilePath = prefix + clazz.getName(); Enumeration<URL> enumeration = classLoader.getResources(spiFilePath); while (enumeration.hasMoreElements()) { URL url = enumeration.nextElement(); InputStreamReader inputStreamReader = null; inputStreamReader = new InputStreamReader(url.openStream()); BufferedReader bufferedReader = new BufferedReader(inputStreamReader); String line; while ((line = bufferedReader.readLine()) != null) { String[] lineArr = line.split("="); String key = lineArr[0]; String name = lineArr[1]; final Class<?> aClass = Class.forName(name); extensionClassCache.put(key, aClass); classMap.put(key, aClass); logger.info("加载bean key:{} , value:{}",key,name); } } } extensionClassCaches.put(clazz.getName(),classMap); } } ``` - 该机制首先从根据根据要加载的组件,获得其对应的全路径名称,然后根据路径文件名称在Resource的META-INF下寻找该文件,之后读取并且加载该文件的内容,之后根据自定义的内容形式,解析文件,然后利用反射机制获取该接口的具体实现的类型,把实现的类名称当作key,实现的类对象当作值方法ConcurrentHashMap中实现缓存机制,之后可以根据具体实现类名称,就能得到具体的类对象,以实现动态加载。
{{ item.content }}
{{ child.content }}