RPC远程服务调用
RPC的诞生

RPC远程过程调用(Remote Procedure Call)
调用远程计算机上的服务,就像调用本地服务一样。

1、首先我们调用需要知道jar包在哪里
2、所以我们需要一个注册中心,存放jar包的位置
3、当我们使用jar包的时候我们的controller层会先从注册中心中查询获得jar包的位置,然后再进行调用
后期会有框架封装这些操作,虽然代码上没有变化但是底层进行了封装。
RPC的JAVA版本–RMI
RMI(remote method invocation),可以认为是RPC的java版本,允许运行在一个java 虚拟机的对象调用运行在另一个java虚拟机上对象的方法。
这是一个纯java版本的,如果你调用的对象不是java编写的,就无法利用RMI.
实现原理
RMI使用的是JRMP(Java Remote Messageing Protocol)协议, JRMP是专门为java定制的通信协议,所以是纯java的分布式解决方案
实现RMI程序步骤
1、创建一个远程接口,继承java.rmi.Remote接口
2、实现远程接口,并继承UnicastRemoteObject
这里前两个步骤类似于注册中心
3、创建服务器程序,同时使用createRegistry方法注册远程接口对象
4、创建客户端程序,通过Naming类的lookup方法来远程调用接口中的方法

当然这种方式的效率很低
企业级RPC解决方案
Dubbo
阿里巴巴公司开源的一个高性能优秀的服务框架,使得应用可通过高性能的RPC实现服务的输出和输入功能,可以和Spring框架无缝集成。

先大体上感知一下这个框架的功能:
第0步启动,服务的提供者,进行第一步先注册(端口,IP,路径),然后第二步从注册中心拿到服务的地址,然后如果服务有更新的话,就会有第三步notify,然后第4步开始调用,然后会有第5步记录监控。
Dubbo的主要特性:面相接口代理的高性能服务,封装了远程调用的细节,所以我们就只需要调用接口就好了。
1、如果我的provider是多个的话,都提供了服务A,在这个框架的时候提供了负载均衡,可能调用provider1的A服务,第二次2的A服务
2、在进行服务注册的时候是自动的,自动的提醒和感知,服务的上下线
3、提供了各种各样的插件,运行期间可以利用脚本进行路由控制(灰度发布,通往优先,可视化的运维)
RPC的原理手写RPC
1、创建一个查询用户的接口
1 2 3
   | public interface IUserService {     public User findUserById(Integer id); }
  | 
 
2、创建实体类并且实现序列化接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
   | package com.bitzh.rpc;
  import java.io.Serializable;
 
 
 
 
 
  public class User implements Serializable {     private static final long serialVersionUID = 6245288973191873978L;     private String username;     private Integer id;
      public User(String username, Integer id) {         this.username = username;         this.id = id;     }
      public String getUsername() {         return username;     }
      public void setUsername(String username) {         this.username = username;     }
      public Integer getId() {         return id;     }
      public void setId(Integer id) {         this.id = id;     } }
 
   | 
 
3、接口实现类
1 2 3 4 5 6 7 8 9 10 11 12 13 14
   | package com.bitzh.rpc;
 
 
 
 
 
  public class UserServiceImpl implements IUserService {     @Override     public User findUserById(Integer id) {         return new User("张三", id);     } }
 
   | 
 
4、RPC调用类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
   | package com.bitzh.rpc;
 
 
 
 
 
  public class RpcDemo {          public static void main(String[] args) {         IUserService service = new UserServiceImpl();                  service.findUserById(13);
                                     
                   service.findUserById(13);     } }
 
   | 
 
解决服务在远端的问题
服务在远端我们肯定不会用Http来进行通讯,因为Http里面包含了很多头,因此我们用TCP/IP的通讯


1、服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
   | package com.bitzh.rpc.server;
  import com.bitzh.rpc.IUserService; import com.bitzh.rpc.User;
  import java.io.*; import java.net.ServerSocket; import java.net.Socket;
 
 
 
 
 
 
  public class Server {          public static void main(String[] args) throws Exception {         ServerSocket serverSocket = new ServerSocket(8888);         while(true){             Socket socket = serverSocket.accept();                          process(socket);             socket.close();         }     }     private static void process(Socket socket)throws Exception {         InputStream inputStream = socket.getInputStream();         OutputStream outputStream = socket.getOutputStream();         DataInputStream dataInputStream = new DataInputStream(inputStream);         DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
          int id = dataInputStream.readInt();         IUserService userService = new UserServiceImpl();         User user = userService.findUserById(id);         dataOutputStream.writeInt(user.getId());         dataOutputStream.writeUTF(user.getUsername());
          dataOutputStream.flush();
      } }
 
   | 
 
2、客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
   | package com.bitzh.rpc.client;
  import com.bitzh.rpc.User;
  import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.OutputStream; import java.net.Socket;
 
 
 
 
 
 
 
  public class Client {          public static void main(String[] args) throws Exception{                  Socket socket = new Socket("127.0.0.1",8888);
 
                   ByteArrayOutputStream bos = new ByteArrayOutputStream();         DataOutputStream dataOutputStream = new DataOutputStream(bos);         dataOutputStream.writeInt(13);                                             socket.getOutputStream().write(bos.toByteArray());         socket.getOutputStream().flush();
 
 
 
                   DataInputStream in = new DataInputStream(socket.getInputStream());         String name = in.readUTF();         int id = in.readInt();         User user = new User(name,id);         System.out.println(user.toString());
          bos.close();         dataOutputStream.close();         socket.close();
      } }
 
   | 
 
解决客户端简单调用的问题
刚刚我们的代码,一大堆的处理,本质上我们就只想调用接口,这时候我们就利用到技术:客户端存根(就是封装了一层)


把网络连接的操作封装到Stub里面
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
   | package com.bitzh.rpc.client;
  import com.bitzh.rpc.User;
  import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.net.Socket;
 
 
 
 
 
  public class Stub {     public User findUserById(Integer id) throws Exception{                  Socket socket = new Socket("127.0.0.1",8888);
 
                   ByteArrayOutputStream bos = new ByteArrayOutputStream();         DataOutputStream dataOutputStream = new DataOutputStream(bos);         dataOutputStream.writeInt(13);                                             socket.getOutputStream().write(bos.toByteArray());         socket.getOutputStream().flush();
 
 
 
                   DataInputStream in = new DataInputStream(socket.getInputStream());         String name = in.readUTF();         id = in.readInt();         User user = new User(name,id);         System.out.println(user.toString());
          bos.close();         dataOutputStream.close();         socket.close();         return new User("张三", id);     } }
 
   | 
 
而在本地就只用跟调用本地的方法类似
1 2 3 4 5 6 7 8
   | public class Client {          public static void main(String[] args) throws Exception{         Stub stub = new Stub();         stub.findUserById(13);
      } }
  | 
 
RPC的核心技术动态代理
动态代理–隐藏网络细节

修改Stub类的代码实现动态代理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
   | package com.bitzh.rpc.client;
  import com.bitzh.rpc.IUserService; import com.bitzh.rpc.User;
  import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.Socket;
 
 
 
 
 
  public class Stub {     public static IUserService getStub() throws Exception {
          InvocationHandler handler = new InvocationHandler() {             public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {                                  Socket socket = new Socket("127.0.0.1", 8888);
 
                                   ByteArrayOutputStream bos = new ByteArrayOutputStream();                 DataOutputStream dataOutputStream = new DataOutputStream(bos);                 dataOutputStream.writeInt(13);                                                                                     socket.getOutputStream().write(bos.toByteArray());                 socket.getOutputStream().flush();
 
                                   DataInputStream in = new DataInputStream(socket.getInputStream());                 String name = in.readUTF();                 int id = in.readInt();                 User user = new User(name, id);                 System.out.println(user.toString());
                  bos.close();                 dataOutputStream.close();                 socket.close();                 return user;
              }         };                  Object o = Proxy.newProxyInstance(IUserService.class.getClassLoader(), new Class[]{IUserService.class}, handler);         return (IUserService) o;     } }
 
   | 
 
服务端利用反射提高灵活性
传递的时候不传一些死的方法,如果我们传递类、方法、参数,在服务端利用反射就灵活了很多

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
   | package com.bitzh.rpc.server;
  import com.bitzh.rpc.IUserService; import com.bitzh.rpc.User;
  import java.io.*; import java.lang.reflect.Method; import java.net.ServerSocket; import java.net.Socket;
 
 
 
 
 
 
  public class Server {          public static void main(String[] args) throws Exception {         ServerSocket serverSocket = new ServerSocket(8888);         while(true){             Socket socket = serverSocket.accept();                          process(socket);             socket.close();         }     }     private static void process(Socket socket)throws Exception {         InputStream inputStream = socket.getInputStream();         OutputStream outputStream = socket.getOutputStream();         ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);         DataOutputStream dataOutputStream = new DataOutputStream(outputStream);                  String methodName = objectInputStream.readUTF();         Class[] parameterTypes = (Class[]) objectInputStream.readObject();         Object[] arguments = (Object[]) objectInputStream.readObject();         
          IUserService service = new UserServiceImpl();         Method method = service.getClass().getMethod(methodName, parameterTypes);
          User user = (User) method.invoke(service, arguments);         dataOutputStream.writeInt(user.getId());         dataOutputStream.writeUTF(user.getUsername());
          dataOutputStream.flush();
      } }
 
   | 
 
终级方案(把整个类也发过去)

客户端
1 2 3 4 5 6 7 8 9 10
   |  public class Client {          public static void main(String[] args) throws Exception{         IUserService stub = Stub.getStub(IUserService.class);         stub.findUserById(13);         System.out.println();
      } }
 
  | 
 
Stub
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
   | package com.bitzh.rpc.client;
  import com.bitzh.rpc.IUserService; import com.bitzh.rpc.User;
  import java.io.*; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.Socket;
 
 
 
 
 
  public class Stub {     public static IUserService getStub(Class clazz) throws Exception {
          InvocationHandler handler = new InvocationHandler() {             public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {                                  Socket socket = new Socket("127.0.0.1", 8888);
 
                                   ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());                 String methodName = method.getName();                                  Class[] parameterTypes = method.getParameterTypes();                                  objectOutputStream.writeUTF(clazz.getName());                 objectOutputStream.writeUTF(methodName);                 objectOutputStream.writeObject(parameterTypes);                 objectOutputStream.writeObject(args);                 objectOutputStream.flush();
 
 
 
                                   ObjectInputStream in = new ObjectInputStream(socket.getInputStream());                 Object o = in.readObject();
 
                                   objectOutputStream.close();                 socket.close();                 return o;
              }         };                  Object o = Proxy.newProxyInstance(IUserService.class.getClassLoader(), new Class[]{IUserService.class}, handler);         return (IUserService) o;     }
 
 
 
 
  }
 
   | 
 
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
   | package com.bitzh.rpc.server;
  import com.bitzh.rpc.IUserService; import com.bitzh.rpc.User;
  import java.io.*; import java.lang.reflect.Method; import java.net.ServerSocket; import java.net.Socket;
 
 
 
 
 
 
  public class Server {          public static void main(String[] args) throws Exception {         ServerSocket serverSocket = new ServerSocket(8888);         while(true){             Socket socket = serverSocket.accept();                          process(socket);             socket.close();         }     }     private static void process(Socket socket)throws Exception {         InputStream inputStream = socket.getInputStream();         OutputStream outputStream = socket.getOutputStream();         ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);         ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);                  String clazzName = objectInputStream.readUTF();         String methodName = objectInputStream.readUTF();         Class[] parameterTypes = (Class[]) objectInputStream.readObject();         Object[] arguments = (Object[]) objectInputStream.readObject();                           Class clazz = UserServiceImpl.class;         Method method = clazz.getMethod(methodName, parameterTypes);
          User user = (User) method.invoke(clazz.newInstance(), arguments);         objectOutputStream.writeObject(user);         objectOutputStream.flush();
      } }
 
   | 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
   | package com.bitzh.rpc.server;
  import com.bitzh.rpc.IUserService; import com.bitzh.rpc.User;
 
 
 
 
 
  public class UserServiceImpl implements IUserService {     @Override     public User findUserById(Integer id) {         return new User("张三", id);     } }
 
   | 
 
RPC框架流程及技术

RPC在架构中的位置

RPC框架序列化问题
对象在网络上传输都是二进制!
所以对象要经过序列化转成Byte数组二进制数组,这个过程称为序列化,这个流程反过来就是反序列化
1、JDK的原生序列化(ObjectInputStream反序列化,ObjectOutputStream序列化)

具体如何操作

当然原生的序列化效率太低了,所以衍生出了其他序列化的手段
比如JSON(典型的key-value形式,优点很方便,缺点是进行序列化的开销大,没有类型),
Hessian :
    优点:支持跨语言,动态的二进制的紧凑的,生成字节数更少,性能好
    缺点:常见对象不支持,Linked类型需要拓展修复。Byte integer
Protobuf(IDL:Interface description language):支持主流语言,专注于效率的协议,
    不便之处:工具(User -> UserProtoBuf类),使用门槛高  
RPC框架如何选择序列化?

通常是用ProtoBuf或者Hessian
如何架构设计一个RPC框架

如何提升RPC的吞吐量

使用全异步的方法
Zookeeper安装
1     Zookeeper简介
    zookeeper分布式管理软件。常用它做注册中心(依赖zookeeper的发布/订阅功能)、配置文件中心、分布式锁配置、集群管理等。
    zookeeper一共就有两个版本。主要使用的是java语言写的。
2     安装
2.1   上传压缩文件
    上传到 /usr/local/tmp中
2.2   解压
1 2
   | # tar zxf apache-zookeeper-3.5.5-bin.tar.gz # cp -r apache-zookeeper-3.5.5-bin ../zookeeper
   | 
 
2.3   新建data目录
进入到zookeeper中
1 2
   | # cd /usr/local/zookeeper # mkdir data
   | 
 
2.4   修改配置文件
进入conf中
1 2 3
   | # cd conf # cp zoo_sample.cfg zoo.cfg # vim zoo.cfg
   | 
 
修改dataDir为data文件夹路径
1
   | dataDir=/usr/local/zookeeper/data
   | 
 
2.5   启动zookeeper
进入bin文件夹
1 2
   | # cd /usr/local/zookeeper/bin # ./zkServer.sh start
   | 
 
通过status查看启动状态。稍微有个等待时间
# ./zkServer.sh status
Zookeeper客户端常用命令
    进入到./zkCli.sh命令行工具后,可以使用下面常用命令
1    ls
    ls [-s][-R] /path
    -s 详细信息,替代老版的ls2
    -R 当前目录和子目录中内容都罗列出来
    例如:ls -R / 显示根目录下所有内容
2     create
    create /path [data]
    [data] 包含内容
    创建指定路径信息
    例如:create /demo 创建/demo
3     get
    get [-s] /path
    [-s] 详细信息
    查看指定路径下内容。
    例如: get -s /demo

    null:存放的数据
    cZxid:创建时zxid(znode每次改变时递增的事务id)
    ctime:创建时间戳
    mZxid:最近一次更新的zxid
    mtime:最近一次更新的时间戳
    pZxid:子节点的zxid
    cversion:子节点更新次数
    dataversion:节点数据更新次数
    aclVersion:节点ACL(授权信息)的更新次数
    ephemeralOwner:如果该节点为ephemeral节点(临时,生命周期与session一样), ephemeralOwner值表示与该节点绑定的session id. 如果该节点不是ephemeral节点, ephemeralOwner值为0.
    dataLength:节点数据字节数
    numChildren:子节点数量
4     set
    set /path data
    设置节点内容
5     delete
    delete /path
    删除节点
向Zookeeper中注册内容
    新建项目ZookeeperClient
1     创建/demo
    使用zookeeper的客户端命令工具创建/demo
1 2
   | ./zkCli.sh create /demos
   | 
 
2     添加依赖
1 2 3 4 5 6 7
   | <dependencies>     <dependency>         <groupId>org.apache.zookeeper</groupId>         <artifactId>zookeeper</artifactId>         <version>3.5.5</version>     </dependency> </dependencies>
   | 
 
3     编写代码
    创建类com.msb.MyApp。
    ZooDefs.Ids.OPEN_ACL_UNSAFE 表示权限。
    CreateMode.PERSISTENT_SEQUENTIAL 永久存储,文件内容编号递增。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
   | public static void main(String [] args){     try {         ZooKeeper zookeeper = new ZooKeeper("192.168.32.128:2181", 10000, new Watcher() {             @Override             public void process(WatchedEvent watchedEvent) {                 System.out.println("获取连接");             }         });         String content = zookeeper.create("/demo/nn", "content".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);         System.out.println("content"+content);     } catch (IOException e) {         e.printStackTrace();     } catch (KeeperException e) {         e.printStackTrace();     } catch (InterruptedException e) {         e.printStackTrace();     } }
  | 
 
4     查看上传数据
    ls -R /            :查看列表
    get /demo/nn0000000002     :查看内容
从zookeeper中发现内容
    在原有项目中新建一个类,类中编写主方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
   | public static void main(String[] args) {     try {         ZooKeeper zookeeper = new ZooKeeper("192.168.32.128:2181", 10000, new Watcher() {             @Override             public void process(WatchedEvent watchedEvent) {                 System.out.println("获取连接");             }         });         //获取列表         List<String> list = zookeeper.getChildren("/demo", false);         for (String child : list) {             byte[] result = zookeeper.getData("/demo/" + child, false, null);             System.out.println(new String(result));         }     } catch (IOException e) {         e.printStackTrace();     } catch (KeeperException e) {         e.printStackTrace();     } catch (InterruptedException e) {         e.printStackTrace();     } }
  |