本文的写行信模宗旨在于通过简单干净实践的方式教会读者,为什么要使用Dubbo、代码怎么使用Dubbo、把通Dubbo通信的解释原理是什么。在学习本文后,清楚你可以避开很多关于 Dubbo 使用时的手式坑,也能更清楚自己的写行信模编码是在做什么。
本文涉及的代码工程:
随着互联网场景中所要面对的用户规模和体量的增加,系统的也需要做相应的拆分设计和实现。随之而来的,以前的一套系统,现在成了多个微服务。如;电商系统,以前就在一个工程中写就可以了,现在需要拆分出,用户、支付、商品、配送、活动、风控等各个模块。那么这些模块拆分后,如何高效的通信呢?
图片
图片
Dubbo 的使用分为2方,一个是接口的提供方,另外一个是接口的调用方。接口的提供方需要提供出被调用方使用接口的描述性信息。这个信息包括;接口名称、接口入参、接口出参,只有让调用方拿到这些信息以后,它才能依托于这样的接口信息做一个代理操作,并在代理类中使用 Socket 完成双方的信息交互。
所以你看上去调用 RPC 接口好像和使用 HTTP 也没啥区别,无非就是引入了 POM 配置,之后再配置了注解就可以使用了。但其实,它是把你的 Jar 当做代理的必要参数使用了。本文也会介绍,具体是怎么代理的
对于编程的学习来说,其实最开始的那一下,不是搞明白所有原理,而是先让自己可以看到运行出来的效果。哎,之后就去分析原理,这样会舒服的多。
所以小傅哥这里提供了一套简单的 Dubbo 使用案例,只要你满足最基本的配置条件,就可以运行出效果;
工程案例创建结构,采用的是 DDD 结构。但和 DDD 一点关系没有。如果你对工程创建有疑惑,可以参考 《Java 简明教程》之 DDD 架构
图片
源码:cn.bugstack.dev.tech.dubbo.api.IUserService
public interface IUserService { Response<UserResDTO> queryUserInfo(UserReqDTO reqDTO);}
源码:cn.bugstack.dev.tech.dubbo.trigger.rpc.UserService
@Slf4j@DubboService(version = "1.0.0")public class UserService implements IUserService { @Override public Response<UserResDTO> queryUserInfo(UserReqDTO reqDTO) { log.info("查询用户信息 userId: { } reqStr: { }", reqDTO.getUserId(), JSON.toJSONString(reqDTO)); try { // 1. 模拟查询【你可以从数据库或者Redis缓存获取数据】 UserResDTO resDTO = UserResDTO.builder() .userId(reqDTO.getUserId()) .userName("小傅哥") .userAge(20) .build(); // 2. 返回结果 return Response.<UserResDTO>builder() .code(Constants.ResponseCode.SUCCESS.getCode()) .info(Constants.ResponseCode.SUCCESS.getInfo()) .data(resDTO).build(); } catch (Exception e) { log.error("查询用户信息失败 userId: { } reqStr: { }", reqDTO.getUserId(), JSON.toJSONString(reqDTO), e); return Response.<UserResDTO>builder() .code(Constants.ResponseCode.UN_ERROR.getCode()) .info(Constants.ResponseCode.UN_ERROR.getInfo()) .build(); } }}
application.yml
dubbo: application: name: xfg-dev-tech-dubbo version: 1.0.0 registry: address: zookeeper://127.0.0.1:2181 # N/A - 无zookeeper可配置 N/A 走直连模式测试 protocol: name: dubbo port: 20881 scan: base-packages: cn.bugstack.dev.tech.dubbo.api
以上信息都准备了,一群小卡拉米开始掉到第4个坑里了!
你有2个应用,一个Dubbo接口提供方、一个Dubbo接口使用方。那么你在给你另外一个应用使用接口的时候,你在 InelliJ IDEA 的 Maven 中执行 Install 了吗?
Install 是干啥的?它是为了让你使用了同一个本地 Maven 配置的应用,可以引入到对方提供的 Jar 包。你 Install 以后,这个 Jar 包就会进入到本地 Maven 仓库了。如果是公司里开发,会有专门的自己家部署的,私有Maven中心仓库,就可以通过 deploy 把本地 Jar 发布上去,那么公司里的伙伴,也就都可以引用了。
图片
有些小卡拉米觉得前面的抗都扫干净了,就完事了。没有接下来还有坑,让你一搞搞一天,半夜也睡不好。
<dependency> <groupId>cn.bugstack</groupId> <artifactId>xfg-dev-tech-dubbo-api</artifactId> <version>1.0-SNAPSHOT</version></dependency>
源码:application.yml
dubbo: application: name: xfg-dev-tech-dubbo version: 1.0.0 registry: address: zookeeper://127.0.0.1:2181# address: N/A protocol: name: dubbo port: 20881
源码:cn.bugstack.dev.tech.dubbo.consumer.test.ApiTest
// 直连模式;@DubboReference(interfaceClass = IUserService.class, url = "dubbo://127.0.0.1:20881", version = "1.0.0")@DubboReference(interfaceClass = IUserService.class, version = "1.0.0")private IUserService userService;@Testpublic void test_userService() { UserReqDTO reqDTO = UserReqDTO.builder().userId("10001").build(); Response<UserResDTO> resDTO = userService.queryUserInfo(reqDTO); log.info("测试结果 req: { } res: { }", JSON.toJSONString(reqDTO), JSON.toJSONString(resDTO));}
测试结果
2023-07-08 15:37:22.291 INFO 62481 --- [ main] c.b.d.tech.dubbo.consumer.test.ApiTest : 测试结果 req: { "userId":"10001"} res: { "code":"0000","data":{ "userAge":20,"userId":"10001","userName":"小傅哥"},"info":"成功"}2023-07-08 15:37:22.324 INFO 62481 --- [tor-Framework-0] o.a.c.f.imps.CuratorFrameworkImpl : backgroundOperationsLoop exiting
都说 Jar 是提供可描述性信息的,对方才能代理调用。那么这个过程是怎么干的呢,总不能一问这个,就让小卡拉米们去手写 Dubbo 呀!所以小傅哥会通过最简单模型结构,让你了解这个 Dubbo 通信的原理,方便小卡拉米们上手。
图片
好,核心的原理就这么点。接下来,我们从代码中看看。
源码:cn.bugstack.dev.tech.dubbo.trigger.socket.RpcServerSocket
@Slf4j@Servicepublic class RpcServerSocket implements Runnable { private ApplicationContext applicationContext; public RpcServerSocket(ApplicationContext applicationContext) { this.applicationContext = applicationContext; new Thread(this).start(); } @Override public void run() { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) { channel.pipeline().addLast(new ObjectEncoder()); channel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null))); channel.pipeline().addLast(new SimpleChannelInboundHandler<Map<String, Object>>() { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Map<String, Object> request) throws Exception { // 解析参数 Class<?> clazz = (Class<?>) request.get("clazz"); String methodName = (String) request.get("methodName"); Class<?>[] paramTypes = (Class<?>[]) request.get("paramTypes"); Object[] args = (Object[]) request.get("args"); // 反射调用 Method method = clazz.getMethod(methodName, paramTypes); Object invoke = method.invoke(applicationContext.getBean(clazz), args); // 封装结果 Map<String, Object> response = new HashMap<>(); response.put("data", invoke); log.info("RPC 请求调用 clazz:{ } methodName:{ }, response:{ }", clazz.getName(), methodName, JSON.toJSON(response)); // 回写数据 channelHandlerContext.channel().writeAndFlush(response); } }); } }); ChannelFuture f = b.bind(22881).sync(); f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}
这段代码主要提供的功能包括;
打开工程:xfg-dev-tech-dubbo-test
源码:cn.bugstack.dev.tech.dubbo.consumer.config.RPCProxyBeanFactory
@Slf4j@Component("rpcProxyBeanFactory")public class RPCProxyBeanFactory implements FactoryBean<IUserService>, Runnable { private Channel channel; // 缓存数据,实际RPC会对每次的调用生成一个ID来标记获取 private Object responseCache; public RPCProxyBeanFactory() throws InterruptedException { new Thread(this).start(); while (null == channel) { Thread.sleep(150); log.info("Rpc Socket 链接等待..."); } } @Override public IUserService getObject() throws Exception { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); Class<?>[] classes = { IUserService.class}; InvocationHandler handler = new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (Object.class.equals(method.getDeclaringClass())) { return method.invoke(this, args); } Map<String, Object> request = new HashMap<>(); request.put("clazz", IUserService.class); request.put("methodName", method.getName()); request.put("paramTypes", method.getParameterTypes()); request.put("args", args); channel.writeAndFlush(request); // 模拟超时等待,一般RPC接口请求,都有一个超时等待时长。 Thread.sleep(350); return responseCache; } }; return (IUserService) Proxy.newProxyInstance(classLoader, classes, handler); } @Override public Class<?> getObjectType() { return IUserService.class; } @Override public void run() { EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(workerGroup) .channel(NioSocketChannel.class) .option(ChannelOption.AUTO_READ, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { channel.pipeline().addLast(new ObjectEncoder()); channel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null))); channel.pipeline().addLast(new SimpleChannelInboundHandler<Map<String, Object>>() { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Map<String, Object> data) throws Exception { responseCache = data.get("data"); } }); } }); ChannelFuture channelFuture = b.connect("127.0.0.1", 22881).syncUninterruptibly(); this.channel = channelFuture.channel(); channelFuture.channel().closeFuture().syncUninterruptibly(); } finally { workerGroup.shutdownGracefully(); } }}
这段代码主要提供的功能包括;
@Resource(name = "rpcProxyBeanFactory")private IUserService proxyUserService;@Testpublic void test_proxyUserService(){ UserReqDTO reqDTO = UserReqDTO.builder().userId("10001").build(); Response<UserResDTO> resDTO = proxyUserService.queryUserInfo(reqDTO); log.info("测试结果 req: { } res: { }", JSON.toJSONString(reqDTO), JSON.toJSONString(resDTO));}
测试结果
2023-07-08 16:14:51.322 INFO 74498 --- [ main] c.b.d.tech.dubbo.consumer.test.ApiTest : 测试结果 req: { "userId":"10001"} res: { "code":"0000","data":{ "userAge":20,"userId":"10001","userName":"小傅哥"},"info":"成功"}
(责任编辑:百科)
彩讯股份(300634.SZ):股东广东达盛累计减持437.99万股