您现在的位置是:主页 > news > 今日重大新闻事件/谷歌seo网络公司
今日重大新闻事件/谷歌seo网络公司
admin2025/4/21 15:32:37【news】
简介今日重大新闻事件,谷歌seo网络公司,火影忍者做网站的超帅图片,石家庄今日疫情曝光首先先说一下Dubbo,一种RPC框架。常见的RPC框架有:Dubbo、Httpclient、grpc、feign客户端等。 1.服务提供者(生产者)(Provider):暴露服务的服务提供方,服务提供者在启动时,向注册中心注册自己提供的服务。…
首先先说一下Dubbo,一种RPC框架。常见的RPC框架有:Dubbo、Httpclient、grpc、feign客户端等。
1.服务提供者(生产者)(Provider):暴露服务的服务提供方,服务提供者在启动时,向注册中心注册自己提供的服务。
2.服务消费者(Consumer): 调用远程服务的服务消费方,服务消费者在启动时,向注册中心订阅自己所需的服务,服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行调用,如果调用失败,再选另一台调用。
3.注册中心(Registry):注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者
4.监控中心(Monitor):服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心
使用一个简单的订单调用会员的说明一下:会员服务就是服务的提供者,订单服务就是服务的消费者,订单服务想要调用会员服务使用httpClient是http://127.0.0.1:8080/member。但是我们的IP不是固定的,而且会员服务也部署了多台。注册中心就是用来管理会员服务的调用的,生产者注册到注册中心,注册中心进行管理,消费者订阅注册中心后使用member-service在注册中心获取IP和端口号。这样会员服务IP变化就不会影响到订单服务。监控中心就是用来检测订单服务调用会员服的一个监控(调用次数啊成功失败等)
我们先来看一个正常的dubbo协议在zk上面生成的文件。
我们暂且把它分成三个部分来看:
1、协议的名称,此处使用的是dubbo协议
2、注册的接口
3、接口的调用地址,这个地方是多个的(负载均衡)。看第三部分中每一个调用地址其实是这样的
dubbo://127.0.0.1:20880/com.mayikt.api.service.UserService…
协议类型://IP:端口号/注入接口。。。
这是我们生成的ZK信息。我们这只是个demo所以链接信息没有写这么多东西只是能获取到链接地址就行了。
Netty手写Dubbo框架
先写一个注解用来简化注册发布。以下代码只是部分代码,可以自己完善如有问题不懂美特教育官方1群:193086273
@Documented
@Inherited
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcAnnotation {Class value();
}
对应往ZK上面注册创建文件的
public class ServiceRegistrationImpl implements ServiceRegistration {// zk连接地址private final String zkServers = "127.0.0.1";// 会话超时时间private final int connectionTimeout = 5000;// zkClientprivate ZkClient zkClient;// 根目录协议名称private String rootNamePath = "/mayikt_rpc";public ServiceRegistrationImpl() {zkClient = new ZkClient(zkServers, connectionTimeout);}public void registr(String serviceName, String serviceAddres) {// 创建我们的根路径 mayikt_rpcif (!zkClient.exists(rootNamePath)) {zkClient.createPersistent(rootNamePath);}// 创建我们的接口路径 /mayikt_rpc/com.mayikt.UserServiceString serviceNodePath = rootNamePath + "/" + serviceName;if (!zkClient.exists(serviceNodePath)) {zkClient.createPersistent(serviceNodePath);}// 创建我们服务地址目录 /mayikt_rpc/com.mayikt.UserService+"/providers"String providerNodePath = serviceNodePath + "/" + "providers";if (!zkClient.exists(providerNodePath)) {zkClient.createPersistent(providerNodePath);}//创建我们服务地址 mayikt://192.168.11.11:8080/com.mayikt.sercice.UserSercice getUserString serviceAddresNodePath = providerNodePath + "/" + URLEncoder.encode(serviceAddres); //此处根据实际情况生成地址后面的信息if (zkClient.exists(serviceAddresNodePath)) {zkClient.delete(serviceAddresNodePath);}zkClient.createEphemeral(serviceAddresNodePath);}
}
调用注册注入接口信息,netty前两个监听是JBoss对象序列化二进制和反序列化用的,也可以使用fastJSON等第三个监听是反射执行我们的接口调用的。
public class MayiktRpcServer {/*** 存放注册的bean对象*/private Map<String, Object> serviceBean = new HashMap<>();private ServiceRegistration serviceRegistration;/*** 服务注册端口号*/private int port;/*** 服务地址*/private String host;public MayiktRpcServer() {}public MayiktRpcServer(String host, int port) {this.host = host;this.port = port;serviceRegistration = new ServiceRegistrationImpl();}/*** 启动Netty服务器*/public void start(Object object) {// 1.先将我们的服务注册到zkbind(object);// 2.启动我们的nettynettyStart();}private void bind(Object object) {// 获取需要发布的接口的注解class类RpcAnnotation declaredAnnotation = object.getClass().getDeclaredAnnotation(RpcAnnotation.class);if (declaredAnnotation == null) {return;}Class value = declaredAnnotation.value();String serviceName = value.toString().replace("interface ", "");String serviceAddres = "mayikt://" + host + ":" + port + "/";serviceRegistration.registr(serviceName, serviceAddres);System.out.println("serviceName:" + serviceName + ",serviceAddres:" + serviceAddres);serviceBean.put(serviceName, object);}private void nettyStart() {// 用于接受客户端连接的请求 (并没有处理请求)NioEventLoopGroup bossGroup = new NioEventLoopGroup();// 用于处理客户端连接的读写操作NioEventLoopGroup workGroup = new NioEventLoopGroup();// 用于创建我们的ServerBootstrapServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());socketChannel.pipeline().addLast(new DubboServerHandler(serviceBean));}});;// 绑定我们的端口号码try {// 绑定端口号,同步等待成功ChannelFuture future = serverBootstrap.bind(port).sync();System.out.println("会员服务启动成功:" + port);// 等待服务器监听端口future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}}
}
反射调用监听
public class DubboServerHandler extends ChannelInboundHandlerAdapter {/*** 存放注册的bean对象*/private Map<String, Object> serviceBean = new HashMap<>();public DubboServerHandler(Map<String, Object> serviceBean) {this.serviceBean = serviceBean;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {RpcRequest rpcRequest = (RpcRequest) msg;String className = rpcRequest.getClassName();Object objectImpl = serviceBean.get(className);if (objectImpl == null) {return;}Method method = objectImpl.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());// 使用反射技术执行我们的方法Object result = method.invoke(objectImpl, rpcRequest.getParamsValue());// 响应给客户端ctx.writeAndFlush(result);}
RPCrequest是一个实体类存放反射信息的
public class RpcRequest implements Serializable {private static final long SerialVersionUID = 1L;/*** 类的className*/private String className;/*** 方法名称*/private String methodName;/*** 参数类型*/Class<?> parameterTypes[];/*** 参数value*/Object paramsValue[];public RpcRequest(String className, String methodName, Class<?>[] parameterTypes, Object[] paramsValue) {this.className = className;this.methodName = methodName;this.parameterTypes = parameterTypes;this.paramsValue = paramsValue;}SET和get。。。
与上面对应的ZK上面服务的发现,就是找ZK图上第3部分
public class ServiceDiscoverImpl implements ServiceDiscover {
// zk连接地址private final String zkServers = "127.0.0.1";
// 会话超时时间private final int connectionTimeout = 5000;
// zkClientprivate ZkClient zkClient;
// 根目录协议名称private String rootNamePath = "/mayikt_rpc";public ServiceDiscoverImpl() {zkClient = new ZkClient(zkServers, connectionTimeout);}@Overridepublic List<String> getDiscover(String serviceName) {String serviceNameNodePath = rootNamePath + "/" + serviceName + "/providers";List<String> children = zkClient.getChildren(serviceNameNodePath);return children;}
}
负载均衡暂且使用轮训机制吧,实际使用中用策略模式实现不同的负载均衡的。
public class LoadBalancing implements RouteStrategy<String> {private int index = 0;@Overridepublic synchronized String select(List<String> repos) {int size = repos.size();if (index >= size) {index = 0;}String value = repos.get(index++);return value;}
}
客户端调用部分,同样ZK获取信息之后调用Netty建立与服务器端的链接后监听返回结果(此处使用的是代理模式):
public class RpcClientProxy {private ServiceDiscover serviceDiscover;public RpcClientProxy() {serviceDiscover = new ServiceDiscoverImpl();}public <T> T create(Class<T> interfaceClass) {return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 从zk上获取注册地址String serviceName = interfaceClass.getName();List<String> discover = serviceDiscover.getDiscover(serviceName);// 使用默认负载均衡器RouteStrategy<String> loadBalancing = new LoadBalancing();// mayikt://192.168.212.1:8080 获取ip和端口号String selectAddres = URLDecoder.decode(loadBalancing.select(discover));String[] split = selectAddres.split(":");String host = split[1].replace("//", "");String port = split[2].replace("/", "");// 建立Netty连接 发送数据RpcRequest rpcRequest = new RpcRequest(serviceName, method.getName(), method.getParameterTypes(), args);DubboClientHandler dubboClientHandler = new DubboClientHandler();//创建nioEventLoopGroupNioEventLoopGroup group = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(host, Integer.parseInt(port))).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());ch.pipeline().addLast(dubboClientHandler);}});try {// 发起同步连接ChannelFuture sync = bootstrap.connect().sync();sync.channel().writeAndFlush(rpcRequest);sync.channel().closeFuture().sync();} catch (Exception e) {} finally {group.shutdownGracefully();}return dubboClientHandler.getResponse();}});}
}
下面来捋一下上面代码执行的流程:
我们启动生产服务和消费服务的时候启动了netty通过netty进行服务的调用
如图订单服务向会员服务发起调用方法getUser方法请求后,通过Netty会员服务监听到了调用请求(channelRead),根据请求参数msg获取到了对应的方法和参数等信息。通过反射调用对应的方法执行后把结果返回Netty。订单监听到后获取到对应的返回结果。
Windows:zookeeper百度云链接
链接:https://pan.baidu.com/s/1nSnIcs2J6NHXch_FfIZQDA
提取码:z9ay
文章来源:
蚂蚁课堂