您当前的位置:首页 > 电脑百科 > 程序开发 > 编程百科

RPC远程调用原理浅析

时间:2019-12-16 13:24:25  来源:  作者:

RPC

基本概念

RPC(Remote Procedure Call)远程过程调用,简单的理解是一个节点请求另一个节点提供的服务

本地过程调用:如果需要将本地student对象的age+1,可以实现一个addAge()方法,将student对象传入,对年龄进行更新之后返回即可,本地方法调用的函数体通过函数指针来指定。

远程过程调用:上述操作的过程中,如果addAge()这个方法在服务端,执行函数的函数体在远程机器上,如何告诉机器需要调用这个方法呢?

  1. 首先客户端需要告诉服务器,需要调用的函数,这里函数和进程ID存在一个映射,客户端远程调用时,需要查一下函数,找到对应的ID,然后执行函数的代码。
  2. 客户端需要把本地参数传给远程函数,本地调用的过程中,直接压栈即可,但是在远程调用过程中不再同一个内存里,无法直接传递函数的参数,因此需要客户端把参数转换成字节流,传给服务端,然后服务端将字节流转换成自身能读取的格式,是一个序列化和反序列化的过程。
  3. 备好了之后,如何进行传输?网络传输层需要把调用的ID和序列化后的参数传给服务端,然后把计算好的结果序列化传给客户端,因此TCP层即可完成上述过程,gRPC中采用的是HTTP2协议。

总结一下上述过程:

// Client端

// Student student = Call(ServerAddr, addAge, student)

1. 将这个调用映射为Call ID。

2. 将Call ID,student(params)序列化,以二进制形式打包

3. 把2中得到的数据包发送给ServerAddr,这需要使用网络传输层

4. 等待服务器返回结果

5. 如果服务器调用成功,那么就将结果反序列化,并赋给student,年龄更新

// Server端

1. 在本地维护一个Call ID到函数指针的映射call_id_map,可以用Map<String, Method> callIdMap

2. 等待服务端请求

3. 得到一个请求后,将其数据包反序列化,得到Call ID

4. 通过在callIdMap中查找,得到相应的函数指针

5. 将student(params)反序列化后,在本地调用addAge()函数,得到结果

6. 将student结果序列化后通过网络返回给Client

程序员:RPC远程调用原理浅析
 
 
 
  • 在微服务的设计中,一个服务A如果访问另一个Module下的服务B,可以采用HTTP REST传输数据,并在两个服务之间进行序列化和反序列化操作,服务B把执行结果返回过来。
  • 由于HTTP在应用层中完成,整个通信的代价较高,远程过程调用中直接基于TCP进行远程调用,数据传输在传输层TCP层完成,更适合对效率要求比较高的场景,RPC主要依赖于客户端和服务端之间建立Socket链接进行,底层实现比REST更复杂。

创建三个maven项目

服务者

消费者

API

让服务者和消费者都依赖API

程序员:RPC远程调用原理浅析
 
 
 

 

程序员:RPC远程调用原理浅析
 
 
 

在消费者创建ConsumerApp

使用代理对象

具体代码在ProxyUtils中

public class ConsumerApp {

public static void main(String[] args) {

//while死循环是为了测试调用提供者是否为随机

while (true) {

try {

Thread.sleep(2000);

// 获得代理对象

AddService addService = ProxyUtils.getProxy(AddService.class);

// 只要调用方法就会进入代理对象invoke方法

int result = addService.add(15, 684);

System.out.println(result);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

API中创建Request,AddService,ProxyUtils,ZkUtils

创建Request(该类为传输对象,必须实现序列化)

public class Request implements Serializable{

/**

*

*/

private static final long serialVersionUID = 1L;

private String interfaceName;

private String methodName;

private Object[] args;

public String getInterfaceName() {

return interfaceName;

}

public void setInterfaceName(String interfaceName) {

this.interfaceName = interfaceName;

}

public String getMethodName() {

return methodName;

}

public void setMethodName(String methodName) {

this.methodName = methodName;

}

public Object[] getArgs() {

return args;

}

public void setArgs(Object[] args) {

this.args = args;

}

@Override

public String toString() {

return "Request [interfaceName=" + interfaceName + ", methodName=" + methodName + ", args="

+ Arrays.toString(args) + "]";

}

}

创建AddService

package com.chenlei.service;

public interface AddService {

public int add(Integer a, Integer b);

}

创建ProxyUtils(重点)

public class ProxyUtils {

private static Random RDM = new Random();

@SuppressWarnings("unchecked")

public static <T> T getProxy(Class<T> interfaces) {

T proxy = (T) Proxy.newProxyInstance(ProxyUtils.class.getClassLoader(), new Class<?>[] { interfaces },

new InvocationHandler() {

@Override

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

String methodName = method.getName();

if ("toString".equals(methodName)) {

return interfaces.getClass().getName() + "$Proxy";

}

if ("hashCode".equals(methodName)) {

return Object.class.hashCode();

}

if ("equals".equals(methodName)) {

return Object.class.equals(this);

}

// 消费者发送过去

Request request = new Request();

request.setInterfaceName(interfaces.getName());

request.setMethodName(methodName);

request.setArgs(args);

// 找到interfaces下的所有节点

List<String> serverList = ZkUtils.discover(interfaces.getName());

String one = randomOne(serverList);// 拿到的结果为ip:port 如127.0.0.1:8888

String[] split = one.split(":");

String address = split[0];

Integer port = Integer.valueOf(split[1]);

Socket socket = null;

// 打开书出管道,发送请求

Object result = null;

OutputStream outputStream = null;

ObjectOutputStream objectOutputStream = null;

InputStream inputStream = null;

ObjectInputStream objectInputStream = null;

try {

socket = new Socket(address, port);

outputStream = socket.getOutputStream();

objectOutputStream = new ObjectOutputStream(outputStream);

objectOutputStream.writeObject(request);

inputStream = socket.getInputStream();

objectInputStream = new ObjectInputStream(inputStream);

result = objectInputStream.readObject();

System.out.println("本次调用的是======" + port);

} catch (Exception e) {

e.printStackTrace();

} finally {

closeResources(objectInputStream, inputStream, objectOutputStream, outputStream, socket);

}

return result;

}

});

return proxy;

}

/**

* 从节点中随机找出一个

*

* @param serverList

* @return

*/

private static String randomOne(List<String> serverList) {

if (null == serverList || 0 == serverList.size()) {

return null;

}

int index = RDM.nextInt(serverList.size());

return serverList.get(index);

}

/**

* 关闭资源的方法

*/

public static void closeResources(Closeable... resources) {

for (Closeable resource : resources) {

if (null != resource) {

try {

resource.close();

} catch (IOException e) {

e.printStackTrace();

} finally {

resource = null;

}

}

}

}

}

创建ZkUtils(zookeeper注册和发现,另加缓存解决脏读)

在API项目中导入zkclient的依赖

public class ZkUtils {

private static final String ZK_URL = "自己的域名:2181";

private static ZkClient zkClient = null;

//创建zookeeper缓存

private static Map<String, List<String>> cache = new HashMap<String, List<String>>();

static {

zkClient = new ZkClient(ZK_URL, 10000, 10000);

}

/**

* 服务节点向zookeeper的注册

*

* @param serverName

* @param serverPort

*/

public static void register(String serverName, String serverPort) {

if (null == serverName || "".equals(serverName)) {

throw new RuntimeException("服务名不能为空");

}

if (null == serverPort || "".equals(serverPort)) {

throw new RuntimeException("服务ip和端口不能为空");

}

if (!zkClient.exists("/" + serverName)) {

zkClient.createPersistent("/" + serverName);

}

if (!zkClient.exists("/" + serverName + "/" + serverPort)) {

zkClient.createEphemeral("/" + serverName + "/" + serverPort);

}

System.out.println("注册一个服务节点为" + "/" + serverName + "/" + serverPort);

}

/**

* 向zookeeper发现服务节点

*

* @param serverName

* @return

*/

public static List<String> discover(String serverName) {

if (null == serverName || "".equals(serverName)) {

throw new RuntimeException("服务名不能为空");

}

// 先从缓存里找

if (cache.containsKey(serverName)) {

System.out.println("在缓存中找到" + serverName + "节点");

}

// 如果该节点在zookeeper中不存在,直接返回空

if (!zkClient.exists("/" + serverName)) {

return null;

}

zkClient.subscribeChildChanges("/" + serverName, new IZkChildListener() {

@Override

public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {

// 一旦进入此方法,证明有节点改变

cache.put(serverName, currentChilds);

System.out.println(serverName + "节点有变化-----" + "缓存完成更新");

}

});

return zkClient.getChildren("/" + serverName);

}

}

写提供者代码

创建AddServiceImpl

注意类名最好是AddService+Impl,并且类全路径也要对应com.chenlei.service.impl.AddServiceImpl,否则代码需要调整

package com.chenlei.service.impl;

import com.chenlei.service.AddService;

public class AddServiceImpl implements AddService {

@Override

public int add(Integer a, Integer b) {

return a + b;

}

}

创建ProviderApp(重点)

public class ProviderApp {

public static void main(String[] args) {

Integer port = 7777;

ServerSocket serverSocket = bind(port);

// 向zookeeper注册

ZkUtils.register(AddService.class.getName(), "127.0.0.1" + ":" + port);

// 监听+处理请求

listener(serverSocket);

}

/**

* 监听和处理请求

*

* @param serverSocket

*/

private static void listener(ServerSocket serverSocket) {

//此处死循环是为了让次提供者一直处于工作状态

while (true) {

Socket socket = null;

InputStream inputStream = null;

ObjectInputStream objectInputStream = null;

OutputStream outputStream = null;

ObjectOutputStream objectOutputStream = null;

try {

socket = serverSocket.accept();

inputStream = socket.getInputStream();

objectInputStream = new ObjectInputStream(inputStream);

Request request = (Request) objectInputStream.readObject();

Object answer = invoker(request);

outputStream = socket.getOutputStream();

objectOutputStream = new ObjectOutputStream(outputStream);

objectOutputStream.writeObject(answer);

} catch (Exception e) {

e.printStackTrace();

} finally {

ProxyUtils.closeResources(objectOutputStream, outputStream, objectInputStream, inputStream, socket);

}

}

}

/**

* 处理请求返回结果

*

* @param request

* @return

*/

private static Object invoker(Request request) {

// 获得从消费者传过来的信息

String interfaceName = request.getInterfaceName();

String methodName = request.getMethodName();

Object[] args = request.getArgs();

// 获得对应实现类全名

String className = getClassNameByInterfaceName(interfaceName);

Object answer = null;

try {

// 找到该类

Class<?> clazz = Class.forName(className);

// 创建一个对象

Object object = clazz.newInstance();

Class<?>[] argsType = new Class<?>[args.length];

if (null != args || 0 != args.length) {

for (int i = 0; i < args.length; i++) {

argsType[i] = args[i].getClass();

}

}

Method method = clazz.getMethod(methodName, argsType);

answer = method.invoke(object, args);

} catch (Exception e) {

e.printStackTrace();

}

return answer;

}

/**

* 通过请求者传来的类信息,获得对应实现类的所有信息,并返回实现类的全名

*

* @param interfaceName

* @return

*/

private static String getClassNameByInterfaceName(String interfaceName) {

// 传过来的接口名为com.chenlei.service.AddService

int index = interfaceName.lastIndexOf(".");

StringBuilder sb = new StringBuilder();

// com.chenlei.service

sb.append(interfaceName.subSequence(0, index));

// com.chenlei.service.impl.

sb.append(".impl.");

// com.chenlei.service.impl.AddService

sb.append(interfaceName.substring(index + 1)).append("Impl");

return sb.toString();

}

/**

* 绑定一个端口

*

* @param port

* @return

*/

private static ServerSocket bind(Integer port) {

ServerSocket serverSocket = null;

try {

serverSocket = new ServerSocket(port);

} catch (IOException e) {

e.printStackTrace();

}

return serverSocket;

}

/**

* 测试代码

*/

//public static void main(String[] args) {

// String interfaceName = "com.chenlei.service.AddService";

// int index = interfaceName.lastIndexOf(".");

// StringBuilder sb = new StringBuilder();

// // com.chenlei.service

// sb.append(interfaceName.subSequence(0, index));

// // com.chenlei.service.impl.

// sb.append(".impl.");

// // com.chenlei.service.impl.AddService

// sb.append(interfaceName.substring(index + 1)).append("Impl");

// System.out.println(sb.toString());

//}

}

更改提供者端口,分别启动三个提供者

 

程序员:RPC远程调用原理浅析
 
 
 

再启动消费者,查看结果

 

程序员:RPC远程调用原理浅析
 
 
 

其他错误和注意事项

 

程序员:RPC远程调用原理浅析
 
 
 

写代码思路

程序员:RPC远程调用原理浅析
 
 
 

 

程序员:RPC远程调用原理浅析
 
 
 


Tags:RPC   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
一、概述手动实现一款轻量,高效的RPC框架,基于TCP的二进制协议实现github源码: https://github.com/wosn00/srpc二、特征 基于netty的主从Reactor模型,NIO通信 支持同步,异步,携带...【详细内容】
2021-10-26  Tags: RPC  点击:(38)  评论:(0)  加入收藏
RPC远程过程调用(Remote Procedure Call,RPC)框架作为架构微服务化的基础组件,能大大降低架构微服务化的成本,提高服务调用方与服务提供方的开发效率,屏蔽跨进程调用函数(服务)的各...【详细内容】
2021-08-31  Tags: RPC  点击:(78)  评论:(0)  加入收藏
在使用gRpc的过程中,有一个想法:gRpc客户端、服务端是怎么交互的呢?从这个想法萌生出一个验证方法,通过抓包来分析其交互过程与底层数据,一起来看看吧。1. gRpc是什么gRpc是什么?g...【详细内容】
2021-07-12  Tags: RPC  点击:(136)  评论:(0)  加入收藏
RPC(Remote Procedure Call),是一个大家既熟悉又陌生的词,只要涉及到通信,必然需要某种网络协议。我们很可能用过HTTP,那么RPC又和HTTP有什么区别呢?RPC还有什么特点,常见的选型有哪...【详细内容】
2021-07-04  Tags: RPC  点击:(137)  评论:(0)  加入收藏
RPC全称Remote Procedure Call,即远程过程调用,对于调用者无感知这是一个远程调用功能。目前流行的开源RPC 框架有阿里的Dubbo、Google 的 gRPC、Twitter 的Finagle 等。本次R...【详细内容】
2021-04-09  Tags: RPC  点击:(332)  评论:(0)  加入收藏
本文选自“字节跳动基础架构实践”系列文章。 “字节跳动基础架构实践”系列文章是由字节跳动基础架构部门各技术团队及专家倾力打造的技术干货内容,和大家分享团队在基础架...【详细内容】
2021-01-18  Tags: RPC  点击:(259)  评论:(0)  加入收藏
两个独立的应用程序需要中介程序才能相互通信。 因此,开发人员经常建立桥梁-应用程序编程接口-来允许一个系统访问另一个系统的信息或功能。为了快速,大规模地集成应用程序,使...【详细内容】
2020-12-01  Tags: RPC  点击:(180)  评论:(0)  加入收藏
一、七层网络结构模型:我们先来了解一下OSI的七层网络结构模型(虽然实际应用中基本上都是五层),它可以分为以下几层:(从上到下) 第一层:应用层。定义了用于在网络中进行通信和传输数...【详细内容】
2020-11-26  Tags: RPC  点击:(101)  评论:(0)  加入收藏
本文是对几年前我在公司做的服务框架的梳理。本文假设你已经了解了什么是服务化,只阐述针对现有的服务框架所存在的问题,如何根据实际需求去考量并解决对应问题。首先,我们先来...【详细内容】
2020-09-23  Tags: RPC  点击:(83)  评论:(0)  加入收藏
与一般的HTTP REST框架不同,一个可用的RPC架构不仅解决了远程调用问题,也提供了用于服务注册和服务发现的基础设施,比如RMI(Java语言的RPC)里的RMI Registry,如下图所示。 在使用R...【详细内容】
2020-08-12  Tags: RPC  点击:(54)  评论:(0)  加入收藏
▌简易百科推荐
本文分为三个等级自顶向下地分析了glibc中内存分配与回收的过程。本文不过度关注细节,因此只是分别从arena层次、bin层次、chunk层次进行图解,而不涉及有关指针的具体操作。前...【详细内容】
2021-12-28  linux技术栈    Tags:glibc   点击:(3)  评论:(0)  加入收藏
摘 要 (OF作品展示)OF之前介绍了用python实现数据可视化、数据分析及一些小项目,但基本都是后端的知识。想要做一个好看的可视化大屏,我们还要学一些前端的知识(vue),网上有很多比...【详细内容】
2021-12-27  项目与数据管理    Tags:Vue   点击:(2)  评论:(0)  加入收藏
程序是如何被执行的&emsp;&emsp;程序是如何被执行的?许多开发者可能也没法回答这个问题,大多数人更注重的是如何编写程序,却不会太注意编写好的程序是如何被运行,这并不是一个好...【详细内容】
2021-12-23  IT学习日记    Tags:程序   点击:(9)  评论:(0)  加入收藏
阅读收获✔️1. 了解单点登录实现原理✔️2. 掌握快速使用xxl-sso接入单点登录功能一、早期的多系统登录解决方案 单系统登录解决方案的核心是cookie,cookie携带会话id在浏览器...【详细内容】
2021-12-23  程序yuan    Tags:单点登录(   点击:(8)  评论:(0)  加入收藏
下载Eclipse RCP IDE如果你电脑上还没有安装Eclipse,那么请到这里下载对应版本的软件进行安装。具体的安装步骤就不在这赘述了。创建第一个标准Eclipse RCP应用(总共分为六步)1...【详细内容】
2021-12-22  阿福ChrisYuan    Tags:RCP应用   点击:(7)  评论:(0)  加入收藏
今天想简单聊一聊 Token 的 Value Capture,就是币的价值问题。首先说明啊,这个话题包含的内容非常之光,Token 的经济学设计也可以包含诸多问题,所以几乎不可能把这个问题说的清...【详细内容】
2021-12-21  唐少华TSH    Tags:Token   点击:(10)  评论:(0)  加入收藏
实现效果:假如有10条数据,分组展示,默认在当前页面展示4个,点击换一批,从第5个开始继续展示,到最后一组,再重新返回到第一组 data() { return { qList: [], //处理后...【详细内容】
2021-12-17  Mason程    Tags:VUE   点击:(14)  评论:(0)  加入收藏
什么是性能调优?(what) 为什么需要性能调优?(why) 什么时候需要性能调优?(when) 什么地方需要性能调优?(where) 什么时候来进行性能调优?(who) 怎么样进行性能调优?(How) 硬件配...【详细内容】
2021-12-16  软件测试小p    Tags:性能调优   点击:(20)  评论:(0)  加入收藏
Tasker 是一款适用于 Android 设备的高级自动化应用,它可以通过脚本让重复性的操作自动运行,提高效率。 不知道从哪里听说的抖音 app 会导致 OLED 屏幕烧屏。于是就现学现卖,自...【详细内容】
2021-12-15  ITBang    Tags:抖音防烧屏   点击:(25)  评论:(0)  加入收藏
11 月 23 日,Rust Moderation Team(审核团队)在 GitHub 上发布了辞职公告,即刻生效。根据公告,审核团队集体辞职是为了抗议 Rust 核心团队(Core team)在执行社区行为准则和标准上...【详细内容】
2021-12-15  InfoQ    Tags:Rust   点击:(25)  评论:(0)  加入收藏
最新更新
栏目热门
栏目头条