Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

sealpp/My-RPC

Open more actions menu

Repository files navigation

My-RPC

example workflow OSCS Status GitHub jdk

MY-RPC 是一个RPC框架,支持接入多种服务管理平台(目前接入Nacos)、多种序列化算法、多种负载均衡算法,使用 Java 原生 Socket 于 Natty 实现了两套网络传输模块。

OSCS

OSCS Status

架构

系统架构

特性

  • 接口设计合理,模块之间低耦合,可以灵活配置诸如序列化算法、负载均衡算法等。
  • 实现通过配置文件设置参数,包括日志级别、序列化算法、负载均衡算法、IP地址等。
  • 实现基于 Java 原生 Socket 和 Netty 两套网络传输方式。
    • 使用 Netty 的 帧解码器(ProtocolFrameDecoder)配合协议长度字段,解决粘包半包问题
    • Netty 自定义编解码器和请求响应 handler 使用 Sharable 设计进行 handler 复用,避免不必要的实例化。
    • Netty 客户端采用 Channel 池进行连接复用,避免重复连接同一服务器。
    • Netty 采用心跳机制避免客户端假死占用服务端资源。
  • 客户端接收 response 使用生产者消费者模型,配合 CompletableFuture 实现客户端同时多次 rpc 请求间不会相互阻塞。
  • 解决 json 类序列化 Object 集合中类型信息丢失问题(通过接口内部类实现公共方法)。
  • 可以方便的接入不同序列化算法,目前接入:
    • Gson
    • Hessian
    • Jackson
    • Kryo
    • Java原生
    • Protostuff (Protobuf)
  • 可以方便的接入不同的负载均衡算法,目前实现:
    • 随机算法
    • 轮询算法
  • 使用 Nacos 作为服务注册和发现平台。
    • 服务端下线通过回调钩子自动向 Nacos 注销对应服务。
  • 可通过注解自动扫描服务实现并注册。
  • 实现单例工厂(用于),支持无参构造,有参构造和构造工厂三种方式。
  • 项目注释完整,逻辑清晰。

模块

  • rpc-api: 通用接口包
  • rpc-common: 消息实体对象、工具类等共用类
  • rpc-core: rpc核心
  • test-client: 测试用服务端
  • test-server: 测试用客户端

传输协议 MFF

+---------------+---------------+-----------------+-------------+
|  Magic Number |  Package Type | Serializer Type | Data Length |
|    4 bytes    |    4 bytes    |     4 bytes     |   4 bytes   |
+---------------+---------------+-----------------+-------------+
|                          Data Bytes                           |
|                   Length: ${Data Length}                      |
+---------------------------------------------------------------+
字段 解释
Magic Number 标识一个数据包是否为MFF协议
Package Type 标识数据是一个请求还是一个响应
Serializer Type 标识数据包内容的序列化方式
Data Length 标识数据字段的字节数
Data Bytes 传输对象,RpcRequestMessage或RpcResponseMessage对象

使用

定义调用接口

接口定义:

package top.mffseal.rpc.api;

/**
 * 测试用某服务调用接口。
 *
 * @author mffseal
 */
public interface HelloService {
    /**
     * 测试服务
     *
     * @param object 测试用的调用参数
     * @return 测试用的调用结果
     */
    String hello(HelloObject object);

    String bye(HelloObject object);
}

接口参数定义:

package top.mffseal.rpc.api;

import lombok.AllArgsConstructor;
import lombok.Data;

import java.io.Serializable;

/**
 * 测试调用接口中,客户端向服务端传递的参数对应的类。
 *
 * @author mffseal
 */
@Data
@AllArgsConstructor
public class HelloObject implements Serializable {
    private Integer id;
    private String message;

    public HelloObject() {
    }
}

服务端实现接口

package top.mffseal.test;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import top.mffseal.rpc.api.HelloObject;
import top.mffseal.rpc.api.HelloService;

/**
 * 服务端测试用服务实现。
 *
 * @author mffseal
 */

public class HelloServiceImpl implements HelloService {
    private final Logger logger = LoggerFactory.getLogger(HelloServiceImpl.class);

    @Override
    public String hello(HelloObject object) {
        logger.info("接收到: {}", object.getMessage());
        return "hello 这是rpc调用的返回值, id=" + object.getId();
    }

    @Override
    public String bye(HelloObject object) {
        logger.info("接收到: {}", object.getMessage());
        return "bye 这是rpc调用的返回值, id=" + object.getId();
    }
}

定义服务端

Netty为例:

package top.mffseal.test;

import top.mffseal.rpc.api.HelloService;
import top.mffseal.rpc.transport.RpcServer;
import top.mffseal.rpc.transport.netty.server.NettyServer;

/**
 * 测试用Netty服务端
 *
 * @author mffseal
 */
public class NettyTestServer {
    public static void main(String[] args) {
        HelloService helloService = new HelloServiceImpl();
        RpcServer rpcServer = new NettyServer();
        rpcServer.publishService(helloService, HelloService.class);
        rpcServer.start();
    }
}

定义客户端

Netty为例:

package top.mffseal.test;

import top.mffseal.rpc.api.HelloObject;
import top.mffseal.rpc.api.HelloService;
import top.mffseal.rpc.transport.RpcClient;
import top.mffseal.rpc.transport.RpcClientProxy;
import top.mffseal.rpc.transport.netty.client.NettyClient;

/**
 * 测试用Netty客户端。
 *
 * @author mffseal
 */
public class NettyTestClient {
    public static void main(String[] args) {
        RpcClient client = new NettyClient();
        RpcClientProxy rpcClientProxy = new RpcClientProxy(client);
        HelloService helloServer = rpcClientProxy.getProxy(HelloService.class);
        HelloObject helloObject = new HelloObject(12, "this is a message");
        // 轮询算法的效果要同一个客户端多次调用同一个接口才能看出
        String res = helloServer.hello(helloObject);
        String res2 = helloServer.hello(helloObject);
        String res3 = helloServer.hello(helloObject);
        System.out.println(res);
    }
}

配置客户端

  • 序列化算法选用Gson
  • 负载均衡算法选择轮询算法
serializer.library=Gson
loadBalancer=RoundRobin
serializer.kryo.compress=true
netty.loglevel=INFO
netty.retry=5
netty.timeout=5000
namingServer.platform=Nacos
namingServer.host=Nacos服务器ip地址
namingServer.port=Nacos服务器端口

配置服务端

  • 序列化算法选择Protostuff (Protobuf)
serializer.library=Protostuff
serializer.kryo.compress=true
host=localhost
port=8080
netty.loglevel=INFO
namingServer.platform=Nacos
namingServer.host=Nacos服务器ip地址
namingServer.port=Nacos服务器端口

启动

  1. 启动前,请确保服务端和客户端均能连接到Nacos服务器,并且在服务端和客户端的配置文件中配置了Nacos服务器地址和端口。
  2. 分别启动服务端和客户端,观察控制台输出内容。

关键设计

Netty客户端生产消费模型

  1. 客户端通过动态代理,实际调用到RpcClientProxy的invoke方法。
  2. 该方法会调用NettyClient的sendRequest方法,并通过completableFuture.get()方法阻塞等待结果。
  3. NettyClient发送rpc请求,并向ResponseLocker存入一个CompletableFuture。
  4. 这样NettyClient就可以接着执行其它的invoke调用,不用等待response阻塞。
  5. 服务器收到请求后会调用方法,并返回一个RpcResponse。
  6. 客户端的ResponseHandler收到RpcResponse后会向ResponseLocker对应位置的CompletableFuture调用complete(填充response)。
  7. complete会唤醒RpcClientProxy调用的get处,并将结果传递过去。

服务自动注册

通过注解扫描的方式自动注册服务:

  1. 扫描当前包下所有Class类。
  2. 遍历查找含有@Service注解的类对象。
  3. 检查@Service的name属性是否为空。
  • 不为空则使用指定的name值作为所实现接口进行发布。
  • 为空则通过反射获取该类所实现的接口,进行发布。

单例工厂

通过双重检查锁实现单例模式的创建,单例工厂类通过Map维护所有已创建的单例对象。

支持三种方式的创建:

  • 无参构造
    • 通过 clazz.newInstance() 实现。
  • 有参构造
    • 通过 clazz.getDeclaredConstructor(paramTypes) 获取有参构造器。
  • 工厂方法
    • 通过 Factory.getMethod(methodName, paramTypes) 获取工厂方法再 invoke。

LICENSE

My-RPC-Framework is under the MIT license. See the LICENSE file for details.

About

A custom RPC framework.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

Morty Proxy This is a proxified and sanitized view of the page, visit original site.