based on springBoot, hand-written a simple RPC framework(3)
Continuing from the previous chapter, after implementing server registration and invocation, the next step is to implement the functionality on the client side, which mainly includes load balancing, rate limiting, request sending, and service discovery. We will proceed to implement these functionalities in the order of an RPC call flow.
A Single Request:
Before implementing the client, it is important to understand what needs to be included in a single request.
Firstly, the request should contain the current service name, method name, and their corresponding parameters and parameter types. This information is necessary for the server to perform the appropriate reflection-based invocation.
Secondly, the request should include the parameters specified within the @RpcConsumer annotation. This allows the server to identify the correct service to be invoked.
Lastly, the request should include a unique identifier for tracing purposes, enabling tracking of the request within the system.
With these considerations, the basic parameters required for a request are fulfilled.
/** * generate service name,use to distinguish different service,and * can be * split to get the service name */ public String fetchRpcServiceName() { returnthis.getProject() + "*" + this.getGroup() + "*" + this.getServiceName() + "*" + this.getVersion(); }
}
Service Proxy
The first step is to scan all classes with the @RpcConsumer annotation during the Spring startup process and generate proxies for them. When a method of such a class is subsequently invoked, it will be routed through the proxy’s method, triggering the request.
By generating a proxy for the class, we can intercept the method invocations and add the necessary logic for request initiation. This allows us to transparently handle the communication with the remote service without modifying the original class implementation.
The proxy can be implemented using various techniques, such as JDK dynamic proxies or CGLIB proxies, depending on the specific requirements and framework being used.
Once the proxies are generated, any method calls on the annotated classes will be intercepted and forwarded to the proxy. The proxy will then handle the request by initiating the RPC call to the corresponding service.
This approach simplifies the integration of remote service invocations within the application codebase, as it abstracts away the details of communication and provides a seamless experience for invoking remote methods.
/** * proxy and injection of consumers * * @param bean * @param beanName * @return * @throws BeansException */ @Override public Object postProcessAfterInitialization(Object bean, String beanName)throws BeansException { Class<?> toBeProcessedBean = bean.getClass(); Field[] declaredFields = toBeProcessedBean.getDeclaredFields(); for (Field declaredField : declaredFields) { if (declaredField.isAnnotationPresent(RpcConsumer.class)) { RpcConsumerannotation= declaredField.getAnnotation(RpcConsumer.class); // build rpc service config RpcServiceConfigserviceConfig= RpcServiceConfig.builder() .project(annotation.project()) .version(annotation.version()) .group(annotation.group()) .build(); // create the proxy bean Factory and the proxy bean RpcServiceProxyproxy=newRpcServiceProxy(sendingServiceAdapter, serviceConfig); ObjectrpcProxy= proxy.getProxy(declaredField.getType()); declaredField.setAccessible(true); try { LogUtil.info("create service proxy: {}", bean); declaredField.set(bean, rpcProxy); } catch (IllegalAccessException e) { e.printStackTrace(); } } } return bean; } }
Correct, the next step is to implement the assembly and invocation of the request within the invoke method of the proxy class. Additionally, you need to retrieve the response value from the Future and return it to the caller.
Inside the invoke method of the proxy class, you will receive the method name, parameters, and other necessary information.
/** * get the proxy object */ @SuppressWarnings("unchecked") public <T> T getProxy(Class<T> clazz) { return (T)Proxy.newProxyInstance(clazz.getClassLoader(), newClass<?>[] {clazz}, this); }
The core method of the client is to send a request. There are various ways to send a request, here it is only implemented based on netty’s Nio. Here is a complete timing sequence.
Create a send service
First implement the send method, which should contain the functionality to find the address and send the request.
/** * Waiting process request queue */ privatefinal WaitingProcessRequestQueue waitingProcessRequestQueue;
publicRpcSendingServiceAdapterImpl() { this.findingAdapter = ExtensionLoader.getExtensionLoader(RpcServiceFindingAdapter.class) .getExtension(ServiceDiscoveryEnum.ZK.getName()); this.addressChannelManager = SingletonFactory.getInstance(AddressChannelManager.class); this.waitingProcessRequestQueue = SingletonFactory.getInstance(WaitingProcessRequestQueue.class); // initialize eventLoopGroup = newNioEventLoopGroup(); bootstrap = newBootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(newLoggingHandler(LogLevel.INFO)) // The timeout period for the connection. // If this time is exceeded or if the connection cannot be // established, the connection fails. .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) .handler(newChannelInitializer<SocketChannel>() { @Override protectedvoidinitChannel(SocketChannel ch) { ChannelPipelinep= ch.pipeline(); // If no data is sent to the server within 15 seconds, a // heartbeat request is sent p.addLast(newIdleStateHandler(0, 5, 0, TimeUnit.SECONDS)); p.addLast(newRpcMessageEncoder()); p.addLast(newRpcMessageDecoder()); p.addLast(newNettyRpcClientHandler()); } }); }
private Channel fetchAndConnectChannel(InetSocketAddress address) { Channelchannel= addressChannelManager.get(address); if (channel == null) { // connect to service to get new address and rebuild the channel channel = connect(address); addressChannelManager.set(address, channel); } return channel; }
private Channel connect(InetSocketAddress address) { CompletableFuture<Channel> completableFuture = newCompletableFuture<>(); bootstrap.connect(address).addListener((ChannelFutureListener)future -> { if (future.isSuccess()) { // set channel to future LogUtil.info("The client has connected [{}] successful!", address.toString()); completableFuture.complete(future.channel()); } else { LogUtil.error("The client failed to connect to the server [{}],future", address.toString(), future); thrownewIllegalStateException(); } }); Channelchannel=null; try { channel = completableFuture.get(); } catch (Exception e) { LogUtil.error("occur exception when connect to server:", e); } return channel; }
The core method in this class is sendRpcRequest, which is responsible for fetching the service, creating the link, creating a Future task and sending the request.
Discovery Services
The process of discovering a service can include:
pulling a list of service addresses from the registry
Obtaining the service specific type through a load balancing algorithm.
Get the address
The first step is implemented first (here a cache can be used for further optimisation, in this project zk uses a ConcurrentHashMap instead of a cache, see CuratorClient for detailed code):
The Consistent Hash Algorithm is an algorithm for data slicing and load balancing in distributed systems. It minimises the need for data migration when dynamically expanding and reducing the capacity of nodes by introducing the concepts of virtual nodes and hash rings, improving the stability and performance of the system. It is widely used in distributed caching, load balancing and other scenarios.
Implementation:
Hash calculation
First, according to the consistency hashing algorithm we need to have the hash value generated according to the corresponding service. In the following implementation, the input is first passed through the SHA-256 algorithm to generate a 32-byte (256-bit) hash
However, this hash is too long and not easy to handle, so we need to shorten it. Also, a node mapping multiple hashes can improve the uniformity of distribution of the consistent hashing algorithm, as each node will have multiple hashes in the hash space, which can help reduce the impact of hash space redistribution due to the addition or subtraction of nodes.
calculateHash will generate a new hash of type Long by taking 8 bytes backwards from the starting point j for the hash value of 256 already obtained.
protectedstatic Long calculateHash(byte[] digest, int idx) { if (digest.length < (idx + 1) * 8) { thrownewIllegalArgumentException("Insufficient length of digest"); }
longhash=0; // 8 bytes digest,a byte is 8 bits like :1321 2432 // each loop choose a byte to calculate hash,and shift i*8 bits for (inti=0; i < 8; i++) { hash |= (255L & (long)digest[i + idx * 8]) << (8 * i); } return hash; }
Implement a virtual node selector.
According to the definition of a consistent hashing algorithm, a virtual node selector needs to generate multiple virtual nodes from services and map each node to multiple hash values, and finally get the nearest node based on the incoming hash value to return to the caller.
privatestaticclassConsistentHashLoadBalanceSelector { // hash to virtual node list privatefinal TreeMap<Long, String> virtualInvokers;
privateConsistentHashLoadBalanceSelector(List<String> serviceUrlList, int virtualNodeNumber) { this.virtualInvokers = newTreeMap<>(); // generate service address virtual node] // one address may map to multiple virtual nodes // use the md5 hash algorithm to generate the hash value of the // virtual node LogUtil.info("init add serviceUrlList:{}", serviceUrlList); for (String serviceNode : serviceUrlList) { addVirtualNode(serviceNode, virtualNodeNumber); }
}
privatevoidaddVirtualNode(String serviceNode, int virtualNodeNumber) { for (inti=0; i < virtualNodeNumber / 8; i++) { StringvirtualNodeName= serviceNode + "#" + i; byte[] md5Hash = md5Hash(virtualNodeName); // md5Hash have 32 bytes // use 8 byte for each virtual node for (intj=0; j < 4; j++) { Longhash= calculateHash(md5Hash, j); virtualInvokers.put(hash, serviceNode); } } }
public String select(String rpcServiceKey) { byte[] digest = md5Hash(rpcServiceKey); // use first 8 byte to get hash return selectForKey(calculateHash(digest, 0)); }
if (entry == null) { entry = virtualInvokers.firstEntry(); }
return entry.getValue(); }
}
Implementing the full load balancing method
Use the hash of the interface name and the list of available services as the key, cache the corresponding consistent hash selector, and get a load node directly from the existing hash selector if it exists, or create a new one if it does not.
privatestaticclassConsistentHashLoadBalanceSelector { // hash to virtual node list privatefinal TreeMap<Long, String> virtualInvokers;
privateConsistentHashLoadBalanceSelector(List<String> serviceUrlList, int virtualNodeNumber) { this.virtualInvokers = newTreeMap<>(); // generate service address virtual node] // one address may map to multiple virtual nodes // use the md5 hash algorithm to generate the hash value of the // virtual node LogUtil.info("init add serviceUrlList:{}", serviceUrlList); for (String serviceNode : serviceUrlList) { addVirtualNode(serviceNode, virtualNodeNumber); }
}
privatevoidaddVirtualNode(String serviceNode, int virtualNodeNumber) { for (inti=0; i < virtualNodeNumber / 8; i++) { StringvirtualNodeName= serviceNode + "#" + i; byte[] md5Hash = md5Hash(virtualNodeName); // md5Hash have 32 bytes // use 8 byte for each virtual node for (intj=0; j < 4; j++) { Longhash= calculateHash(md5Hash, j); virtualInvokers.put(hash, serviceNode); } } }
public String select(String rpcServiceKey) { byte[] digest = md5Hash(rpcServiceKey); // use first 8 byte to get hash return selectForKey(calculateHash(digest, 0)); }
protectedstatic Long calculateHash(byte[] digest, int idx) { if (digest.length < (idx + 1) * 8) { thrownewIllegalArgumentException("Insufficient length of digest"); }
longhash=0; // 8 bytes digest,a byte is 8 bits like :1321 2432 // each loop choose a byte to calculate hash,and shift i*8 bits for (inti=0; i < 8; i++) { hash |= (255L & (long)digest[i + idx * 8]) << (8 * i); } return hash; }
/** * Choose one from the list of existing service addresses list * * @param serviceUrlList Service address list * @param rpcRequest * @return */ @Override public String selectServiceAddress(List<String> serviceUrlList, RpcRequest rpcRequest) { intserviceListHash= System.identityHashCode(serviceUrlList); StringinterfaceName= rpcRequest.getServiceName(); StringselectorKey= interfaceName + serviceListHash;
private Channel fetchAndConnectChannel(InetSocketAddress address) { Channelchannel= addressChannelManager.get(address); if (channel == null) { // connect to service to get new address and rebuild the channel channel = connect(address); addressChannelManager.set(address, channel); } return channel; }
private Channel connect(InetSocketAddress address) { CompletableFuture<Channel> completableFuture = newCompletableFuture<>(); bootstrap.connect(address).addListener((ChannelFutureListener)future -> { if (future.isSuccess()) { // set channel to future LogUtil.info("The client has connected [{}] successful!", address.toString()); completableFuture.complete(future.channel()); } else { LogUtil.error("The client failed to connect to the server [{}],future", address.toString(), future); thrownewIllegalStateException(); } }); Channelchannel=null; try { channel = completableFuture.get(); } catch (Exception e) { LogUtil.error("occur exception when connect to server:", e); } return channel; }
/** * Called when an exception occurs in processing a client message */ @Override publicvoidexceptionCaught(ChannelHandlerContext ctx, Throwable cause) { LogUtil.error("server exceptionCaught"); cause.printStackTrace(); ctx.close(); }