/** * generate service name,use to distinguish different service,and * can be * split to get the service name */ public String fetchRpcServiceName() { returnthis.getProject() + "*" + this.getGroup() + "*" + this.getServiceName() + "*" + this.getVersion(); }
/** * get the proxy object */ @SuppressWarnings("unchecked") public <T> T getProxy(Class<T> clazz) { return (T)Proxy.newProxyInstance(clazz.getClassLoader(), newClass<?>[] {clazz}, this); }
/** * Waiting process request queue */ privatefinal WaitingProcessRequestQueue waitingProcessRequestQueue;
publicRpcSendingServiceAdapterImpl() { this.findingAdapter = ExtensionLoader.getExtensionLoader(RpcServiceFindingAdapter.class) .getExtension(ServiceDiscoveryEnum.ZK.getName()); this.addressChannelManager = SingletonFactory.getInstance(AddressChannelManager.class); this.waitingProcessRequestQueue = SingletonFactory.getInstance(WaitingProcessRequestQueue.class); // initialize eventLoopGroup = newNioEventLoopGroup(); bootstrap = newBootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(newLoggingHandler(LogLevel.INFO)) // The timeout period for the connection. // If this time is exceeded or if the connection cannot be // established, the connection fails. .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) .handler(newChannelInitializer<SocketChannel>() { @Override protectedvoidinitChannel(SocketChannel ch) { ChannelPipelinep= ch.pipeline(); // If no data is sent to the server within 15 seconds, a // heartbeat request is sent p.addLast(newIdleStateHandler(0, 5, 0, TimeUnit.SECONDS)); p.addLast(newRpcMessageEncoder()); p.addLast(newRpcMessageDecoder()); p.addLast(newNettyRpcClientHandler()); } }); }
private Channel fetchAndConnectChannel(InetSocketAddress address) { Channelchannel= addressChannelManager.get(address); if (channel == null) { // connect to service to get new address and rebuild the channel channel = connect(address); addressChannelManager.set(address, channel); } return channel; }
private Channel connect(InetSocketAddress address) { CompletableFuture<Channel> completableFuture = newCompletableFuture<>(); bootstrap.connect(address).addListener((ChannelFutureListener)future -> { if (future.isSuccess()) { // set channel to future LogUtil.info("The client has connected [{}] successful!", address.toString()); completableFuture.complete(future.channel()); } else { LogUtil.error("The client failed to connect to the server [{}],future", address.toString(), future); thrownewIllegalStateException(); } }); Channelchannel=null; try { channel = completableFuture.get(); } catch (Exception e) { LogUtil.error("occur exception when connect to server:", e); } return channel; }
privatestaticclassConsistentHashLoadBalanceSelector { // hash to virtual node list privatefinal TreeMap<Long, String> virtualInvokers;
privateConsistentHashLoadBalanceSelector(List<String> serviceUrlList, int virtualNodeNumber) { this.virtualInvokers = newTreeMap<>(); // generate service address virtual node] // one address may map to multiple virtual nodes // use the md5 hash algorithm to generate the hash value of the // virtual node LogUtil.info("init add serviceUrlList:{}", serviceUrlList); for (String serviceNode : serviceUrlList) { addVirtualNode(serviceNode, virtualNodeNumber); }
}
privatevoidaddVirtualNode(String serviceNode, int virtualNodeNumber) { for (inti=0; i < virtualNodeNumber / 8; i++) { StringvirtualNodeName= serviceNode + "#" + i; byte[] md5Hash = md5Hash(virtualNodeName); // md5Hash have 32 bytes // use 8 byte for each virtual node for (intj=0; j < 4; j++) { Longhash= calculateHash(md5Hash, j); virtualInvokers.put(hash, serviceNode); } } }
public String select(String rpcServiceKey) { byte[] digest = md5Hash(rpcServiceKey); // use first 8 byte to get hash return selectForKey(calculateHash(digest, 0)); }
privatestaticclassConsistentHashLoadBalanceSelector { // hash to virtual node list privatefinal TreeMap<Long, String> virtualInvokers;
privateConsistentHashLoadBalanceSelector(List<String> serviceUrlList, int virtualNodeNumber) { this.virtualInvokers = newTreeMap<>(); // generate service address virtual node] // one address may map to multiple virtual nodes // use the md5 hash algorithm to generate the hash value of the // virtual node LogUtil.info("init add serviceUrlList:{}", serviceUrlList); for (String serviceNode : serviceUrlList) { addVirtualNode(serviceNode, virtualNodeNumber); }
}
privatevoidaddVirtualNode(String serviceNode, int virtualNodeNumber) { for (inti=0; i < virtualNodeNumber / 8; i++) { StringvirtualNodeName= serviceNode + "#" + i; byte[] md5Hash = md5Hash(virtualNodeName); // md5Hash have 32 bytes // use 8 byte for each virtual node for (intj=0; j < 4; j++) { Longhash= calculateHash(md5Hash, j); virtualInvokers.put(hash, serviceNode); } } }
public String select(String rpcServiceKey) { byte[] digest = md5Hash(rpcServiceKey); // use first 8 byte to get hash return selectForKey(calculateHash(digest, 0)); }
protectedstatic Long calculateHash(byte[] digest, int idx) { if (digest.length < (idx + 1) * 8) { thrownewIllegalArgumentException("Insufficient length of digest"); }
longhash=0; // 8 bytes digest,a byte is 8 bits like :1321 2432 // each loop choose a byte to calculate hash,and shift i*8 bits for (inti=0; i < 8; i++) { hash |= (255L & (long)digest[i + idx * 8]) << (8 * i); } return hash; }
/** * Choose one from the list of existing service addresses list * * @param serviceUrlList Service address list * @param rpcRequest * @return */ @Override public String selectServiceAddress(List<String> serviceUrlList, RpcRequest rpcRequest) { intserviceListHash= System.identityHashCode(serviceUrlList); StringinterfaceName= rpcRequest.getServiceName(); StringselectorKey= interfaceName + serviceListHash;
private Channel fetchAndConnectChannel(InetSocketAddress address) { Channelchannel= addressChannelManager.get(address); if (channel == null) { // connect to service to get new address and rebuild the channel channel = connect(address); addressChannelManager.set(address, channel); } return channel; }
private Channel connect(InetSocketAddress address) { CompletableFuture<Channel> completableFuture = newCompletableFuture<>(); bootstrap.connect(address).addListener((ChannelFutureListener)future -> { if (future.isSuccess()) { // set channel to future LogUtil.info("The client has connected [{}] successful!", address.toString()); completableFuture.complete(future.channel()); } else { LogUtil.error("The client failed to connect to the server [{}],future", address.toString(), future); thrownewIllegalStateException(); } }); Channelchannel=null; try { channel = completableFuture.get(); } catch (Exception e) { LogUtil.error("occur exception when connect to server:", e); } return channel; }
/** * Called when an exception occurs in processing a client message */ @Override publicvoidexceptionCaught(ChannelHandlerContext ctx, Throwable cause) { LogUtil.error("server exceptionCaught"); cause.printStackTrace(); ctx.close(); }