0%

based on springBoot, hand-written a simple RPC framework(3)

based on springBoot, hand-written a simple RPC framework(3)

image-20230522163034434

Continuing from the previous chapter, after implementing server registration and invocation, the next step is to implement the functionality on the client side, which mainly includes load balancing, rate limiting, request sending, and service discovery. We will proceed to implement these functionalities in the order of an RPC call flow.

A Single Request:

Before implementing the client, it is important to understand what needs to be included in a single request.

Firstly, the request should contain the current service name, method name, and their corresponding parameters and parameter types. This information is necessary for the server to perform the appropriate reflection-based invocation.

Secondly, the request should include the parameters specified within the @RpcConsumer annotation. This allows the server to identify the correct service to be invoked.

Lastly, the request should include a unique identifier for tracing purposes, enabling tracking of the request within the system.

With these considerations, the basic parameters required for a request are fulfilled.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class RpcRequest implements Serializable {

private static final long serialVersionUID = 8509587559718339795L;
/**
* traceId
*/
private String traceId;
/**
* interface name
*/
private String serviceName;
/**
* method name
*/
private String methodName;
/**
* parameters
*/
private Object[] parameters;
/**
* parameter types
*/
private Class<?>[] paramTypes;
/**
* version
*/
private String version;
/**
* group
*/
private String project;

private String group;

/**
* generate service name,use to distinguish different service,and * can be
* split to get the service name
*/
public String fetchRpcServiceName() {
return this.getProject() + "*" + this.getGroup() + "*" + this.getServiceName() + "*" + this.getVersion();
}

}

Service Proxy

The first step is to scan all classes with the @RpcConsumer annotation during the Spring startup process and generate proxies for them. When a method of such a class is subsequently invoked, it will be routed through the proxy’s method, triggering the request.

By generating a proxy for the class, we can intercept the method invocations and add the necessary logic for request initiation. This allows us to transparently handle the communication with the remote service without modifying the original class implementation.

The proxy can be implemented using various techniques, such as JDK dynamic proxies or CGLIB proxies, depending on the specific requirements and framework being used.

Once the proxies are generated, any method calls on the annotated classes will be intercepted and forwarded to the proxy. The proxy will then handle the request by initiating the RPC call to the corresponding service.

This approach simplifies the integration of remote service invocations within the application codebase, as it abstracts away the details of communication and provides a seamless experience for invoking remote methods.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@Component
public class RpcBeanPostProcessor implements BeanPostProcessor {

private final RpcServiceRegistryAdapter adapter;

private final RpcSendingServiceAdapter sendingServiceAdapter;

public RpcBeanPostProcessor() {
this.adapter = SingletonFactory.getInstance(RpcServiceRegistryAdapterImpl.class);;
this.sendingServiceAdapter = ExtensionLoader.getExtensionLoader(RpcSendingServiceAdapter.class)
.getExtension(RpcRequestSendingEnum.NETTY.getName());
}

/**
* register service
*
* @param bean
* @param beanName
* @return
* @throws BeansException
*/
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
LogUtil.info("start process register service: {}", bean);
// register service
if (bean.getClass().isAnnotationPresent(RpcProvider.class)) {
RpcProvider annotation = bean.getClass().getAnnotation(RpcProvider.class);
// build rpc service config
RpcServiceConfig serviceConfig = RpcServiceConfig.builder()
.service(bean)
.project(annotation.project())
.version(annotation.version())
.group(annotation.group())
.build();
LogUtil.info("register service: {}", serviceConfig);
adapter.registryService(serviceConfig);
}
return bean;
}

/**
* proxy and injection of consumers
*
* @param bean
* @param beanName
* @return
* @throws BeansException
*/
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> toBeProcessedBean = bean.getClass();
Field[] declaredFields = toBeProcessedBean.getDeclaredFields();
for (Field declaredField : declaredFields) {
if (declaredField.isAnnotationPresent(RpcConsumer.class)) {
RpcConsumer annotation = declaredField.getAnnotation(RpcConsumer.class);
// build rpc service config
RpcServiceConfig serviceConfig = RpcServiceConfig.builder()
.project(annotation.project())
.version(annotation.version())
.group(annotation.group())
.build();
// create the proxy bean Factory and the proxy bean
RpcServiceProxy proxy = new RpcServiceProxy(sendingServiceAdapter, serviceConfig);
Object rpcProxy = proxy.getProxy(declaredField.getType());
declaredField.setAccessible(true);
try {
LogUtil.info("create service proxy: {}", bean);
declaredField.set(bean, rpcProxy);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
return bean;
}
}

Correct, the next step is to implement the assembly and invocation of the request within the invoke method of the proxy class. Additionally, you need to retrieve the response value from the Future and return it to the caller.

Inside the invoke method of the proxy class, you will receive the method name, parameters, and other necessary information.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class RpcServiceProxy implements InvocationHandler {

private final RpcSendingServiceAdapter sendingServiceAdapter;

private final RpcServiceConfig config;

public RpcServiceProxy(RpcSendingServiceAdapter sendingServiceAdapter, RpcServiceConfig config) {
this.sendingServiceAdapter = sendingServiceAdapter;
this.config = config;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) {
LogUtil.info("invoked method: [{}]", method.getName());
RpcRequest rpcRequest = buildRequest(method,args);

RpcResponse<Object> rpcResponse = null;
CompletableFuture<RpcResponse<Object>> completableFuture =
(CompletableFuture<RpcResponse<Object>>)sendingServiceAdapter.sendRpcRequest(rpcRequest);
try {
rpcResponse = completableFuture.get();
return rpcResponse.getData();
} catch (Exception e) {
LogUtil.error("occur exception:", e);
}
return null;
}

/**
* get the proxy object
*/
@SuppressWarnings("unchecked")
public <T> T getProxy(Class<T> clazz) {
return (T)Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[] {clazz}, this);
}

private RpcRequest buildRequest(Method method,Object[] args){
RpcRequest rpcRequest = RpcRequest.builder()
.methodName(method.getName())
.parameters(args)
.serviceName(method.getDeclaringClass().getName())
.paramTypes(method.getParameterTypes())
.traceId(UUID.randomUUID().toString())
.project(config.getProject())
.version(config.getVersion())
.group(config.getGroup())
.build();
return rpcRequest;
}
}

Sending requests:

The core method of the client is to send a request. There are various ways to send a request, here it is only implemented based on netty’s Nio. Here is a complete timing sequence.

WX20230530-145829@2x

Create a send service

First implement the send method, which should contain the functionality to find the address and send the request.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
public class RpcSendingServiceAdapterImpl implements RpcSendingServiceAdapter {

/**
* EventLoopGroup is a multithreaded event loop that handles I/O operation.
*/
private final EventLoopGroup eventLoopGroup;

/**
* Bootstrap helt setting and start netty client
*/
private final Bootstrap bootstrap;

/**
* Service discovery
*/
private final RpcServiceFindingAdapter findingAdapter;

/**
* Channel manager,mapping channel and address
*/
private final AddressChannelManager addressChannelManager;

/**
* Waiting process request queue
*/
private final WaitingProcessRequestQueue waitingProcessRequestQueue;

public RpcSendingServiceAdapterImpl() {
this.findingAdapter = ExtensionLoader.getExtensionLoader(RpcServiceFindingAdapter.class)
.getExtension(ServiceDiscoveryEnum.ZK.getName());
this.addressChannelManager = SingletonFactory.getInstance(AddressChannelManager.class);
this.waitingProcessRequestQueue = SingletonFactory.getInstance(WaitingProcessRequestQueue.class);
// initialize
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
// The timeout period for the connection.
// If this time is exceeded or if the connection cannot be
// established, the connection fails.
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
// If no data is sent to the server within 15 seconds, a
// heartbeat request is sent
p.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));
p.addLast(new RpcMessageEncoder());
p.addLast(new RpcMessageDecoder());
p.addLast(new NettyRpcClientHandler());
}
});
}

@Override
public Object sendRpcRequest(RpcRequest rpcRequest) {
CompletableFuture<RpcResponse<Object>> result = new CompletableFuture<>();
InetSocketAddress address = findServiceAddress(rpcRequest);
Channel channel = fetchAndConnectChannel(address);
if (channel.isActive()) {
addToProcessQueue(rpcRequest.getTraceId(), result);
RpcData rpcData = prepareRpcData(rpcRequest);
sendRpcData(channel, rpcData, result);
} else {
log.error("Send request[{}] failed", rpcRequest);
throw new IllegalStateException();
}
return result;
}
private InetSocketAddress findServiceAddress(RpcRequest rpcRequest) {
return findingAdapter.findServiceAddress(rpcRequest);
}

private void addToProcessQueue(String traceId, CompletableFuture<RpcResponse<Object>> result) {
waitingProcessRequestQueue.put(traceId, result);
}

private RpcData prepareRpcData(RpcRequest rpcRequest) {
return RpcData.builder()
.data(rpcRequest)
.serializeMethodCodec(SerializationTypeEnum.HESSIAN.getCode())
.compressType(CompressTypeEnum.GZIP.getCode())
.messageType(RpcConstants.REQUEST_TYPE)
.build();
}
private void sendRpcData(Channel channel, RpcData rpcData, CompletableFuture<RpcResponse<Object>> result) {
channel.writeAndFlush(rpcData).addListener((ChannelFutureListener)future -> {
if (future.isSuccess()) {
LogUtil.info("client send message: [{}]", rpcData);
} else {
future.channel().close();
result.completeExceptionally(future.cause());
LogUtil.error("Send failed:", future.cause());
}
});
}

private Channel fetchAndConnectChannel(InetSocketAddress address) {
Channel channel = addressChannelManager.get(address);
if (channel == null) {
// connect to service to get new address and rebuild the channel
channel = connect(address);
addressChannelManager.set(address, channel);
}
return channel;
}

private Channel connect(InetSocketAddress address) {
CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
bootstrap.connect(address).addListener((ChannelFutureListener)future -> {
if (future.isSuccess()) {
// set channel to future
LogUtil.info("The client has connected [{}] successful!", address.toString());
completableFuture.complete(future.channel());
} else {
LogUtil.error("The client failed to connect to the server [{}],future", address.toString(), future);
throw new IllegalStateException();
}
});
Channel channel = null;
try {
channel = completableFuture.get();
} catch (Exception e) {
LogUtil.error("occur exception when connect to server:", e);
}
return channel;
}

public Channel getChannel(InetSocketAddress inetSocketAddress) {
Channel channel = addressChannelManager.get(inetSocketAddress);
if (channel == null) {
channel = connect(inetSocketAddress);
addressChannelManager.set(inetSocketAddress, channel);
}
return channel;
}
}

​ The core method in this class is sendRpcRequest, which is responsible for fetching the service, creating the link, creating a Future task and sending the request.

Discovery Services

The process of discovering a service can include:

  1. pulling a list of service addresses from the registry

  2. Obtaining the service specific type through a load balancing algorithm.

Get the address

The first step is implemented first (here a cache can be used for further optimisation, in this project zk uses a ConcurrentHashMap instead of a cache, see CuratorClient for detailed code):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class RpcServiceFindingAdapterImpl implements RpcServiceFindingAdapter {

private final LoadBalanceService loadBalanceService;

public RpcServiceFindingAdapterImpl() {
this.loadBalanceService = ExtensionLoader.getExtensionLoader(LoadBalanceService.class).getExtension(LOAD_BALANCE);
}

@Override
public InetSocketAddress findServiceAddress(RpcRequest rpcRequest) {
String serviceName = rpcRequest.fetchRpcServiceName();
CuratorFramework zkClient = CuratorClient.getZkClient();
List<String> serviceAddresseList = CuratorClient.getChildrenNodes(zkClient, serviceName);
if (CollectionUtils.isEmpty(serviceAddresseList)) {
throw new RuntimeException("no service available, serviceName: " + serviceName);
}

String service = loadBalanceService.selectServiceAddress(serviceAddresseList, rpcRequest);
if (StringUtils.isBlank(service)) {
throw new RuntimeException("no service available, serviceName: " + serviceName);
}
String[] socketAddressArray = service.split(":");
String host = socketAddressArray[0];
int port = Integer.parseInt(socketAddressArray[1]);
return new InetSocketAddress(host, port);
}
}

Load Balancing - Consistent Hash Algorithm
Definition:

The Consistent Hash Algorithm is an algorithm for data slicing and load balancing in distributed systems. It minimises the need for data migration when dynamically expanding and reducing the capacity of nodes by introducing the concepts of virtual nodes and hash rings, improving the stability and performance of the system. It is widely used in distributed caching, load balancing and other scenarios.

Implementation:
  1. Hash calculation

First, according to the consistency hashing algorithm we need to have the hash value generated according to the corresponding service. In the following implementation, the input is first passed through the SHA-256 algorithm to generate a 32-byte (256-bit) hash

However, this hash is too long and not easy to handle, so we need to shorten it. Also, a node mapping multiple hashes can improve the uniformity of distribution of the consistent hashing algorithm, as each node will have multiple hashes in the hash space, which can help reduce the impact of hash space redistribution due to the addition or subtraction of nodes.

calculateHash will generate a new hash of type Long by taking 8 bytes backwards from the starting point j for the hash value of 256 already obtained.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
protected static byte[] md5Hash(String input) {
MessageDigest messageDigest = null;
try {
messageDigest = MessageDigest.getInstance("SHA-256");
byte[] hashBytes = messageDigest.digest(input.getBytes(StandardCharsets.UTF_8));
messageDigest.update(hashBytes);
return messageDigest.digest();
} catch (NoSuchAlgorithmException e) {
LogUtil.error("No such algorithm exception: {}", e.getMessage());
throw new RuntimeException(e);
}

}

protected static Long calculateHash(byte[] digest, int idx) {
if (digest.length < (idx + 1) * 8) {
throw new IllegalArgumentException("Insufficient length of digest");
}

long hash = 0;
// 8 bytes digest,a byte is 8 bits like :1321 2432
// each loop choose a byte to calculate hash,and shift i*8 bits
for (int i = 0; i < 8; i++) {
hash |= (255L & (long)digest[i + idx * 8]) << (8 * i);
}
return hash;
}
  1. Implement a virtual node selector.

    According to the definition of a consistent hashing algorithm, a virtual node selector needs to generate multiple virtual nodes from services and map each node to multiple hash values, and finally get the nearest node based on the incoming hash value to return to the caller.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    private static class ConsistentHashLoadBalanceSelector {
    // hash to virtual node list
    private final TreeMap<Long, String> virtualInvokers;

    private ConsistentHashLoadBalanceSelector(List<String> serviceUrlList, int virtualNodeNumber) {
    this.virtualInvokers = new TreeMap<>();
    // generate service address virtual node]
    // one address may map to multiple virtual nodes
    // use the md5 hash algorithm to generate the hash value of the
    // virtual node
    LogUtil.info("init add serviceUrlList:{}", serviceUrlList);
    for (String serviceNode : serviceUrlList) {
    addVirtualNode(serviceNode, virtualNodeNumber);
    }

    }

    private void addVirtualNode(String serviceNode, int virtualNodeNumber) {
    for (int i = 0; i < virtualNodeNumber / 8; i++) {
    String virtualNodeName = serviceNode + "#" + i;
    byte[] md5Hash = md5Hash(virtualNodeName);
    // md5Hash have 32 bytes
    // use 8 byte for each virtual node
    for (int j = 0; j < 4; j++) {
    Long hash = calculateHash(md5Hash, j);
    virtualInvokers.put(hash, serviceNode);
    }
    }
    }

    public String select(String rpcServiceKey) {
    byte[] digest = md5Hash(rpcServiceKey);
    // use first 8 byte to get hash
    return selectForKey(calculateHash(digest, 0));
    }

    public String selectForKey(long hashCode) {
    Map.Entry<Long, String> entry = virtualInvokers.tailMap(hashCode, true).firstEntry();

    if (entry == null) {
    entry = virtualInvokers.firstEntry();
    }

    return entry.getValue();
    }

    }
  2. Implementing the full load balancing method

    Use the hash of the interface name and the list of available services as the key, cache the corresponding consistent hash selector, and get a load node directly from the existing hash selector if it exists, or create a new one if it does not.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
public class ConsistentHashLoadBalanceService implements LoadBalanceService {

private final Map<String, ConsistentHashLoadBalanceSelector> serviceToSelectorMap = new ConcurrentHashMap<>();

private static class ConsistentHashLoadBalanceSelector {
// hash to virtual node list
private final TreeMap<Long, String> virtualInvokers;

private ConsistentHashLoadBalanceSelector(List<String> serviceUrlList, int virtualNodeNumber) {
this.virtualInvokers = new TreeMap<>();
// generate service address virtual node]
// one address may map to multiple virtual nodes
// use the md5 hash algorithm to generate the hash value of the
// virtual node
LogUtil.info("init add serviceUrlList:{}", serviceUrlList);
for (String serviceNode : serviceUrlList) {
addVirtualNode(serviceNode, virtualNodeNumber);
}

}

private void addVirtualNode(String serviceNode, int virtualNodeNumber) {
for (int i = 0; i < virtualNodeNumber / 8; i++) {
String virtualNodeName = serviceNode + "#" + i;
byte[] md5Hash = md5Hash(virtualNodeName);
// md5Hash have 32 bytes
// use 8 byte for each virtual node
for (int j = 0; j < 4; j++) {
Long hash = calculateHash(md5Hash, j);
virtualInvokers.put(hash, serviceNode);
}
}
}

public String select(String rpcServiceKey) {
byte[] digest = md5Hash(rpcServiceKey);
// use first 8 byte to get hash
return selectForKey(calculateHash(digest, 0));
}

public String selectForKey(long hashCode) {
Map.Entry<Long, String> entry = virtualInvokers.tailMap(hashCode, true).firstEntry();

if (entry == null) {
entry = virtualInvokers.firstEntry();
}

return entry.getValue();
}

}

protected static byte[] md5Hash(String input) {
MessageDigest messageDigest = null;
try {
messageDigest = MessageDigest.getInstance("SHA-256");
byte[] hashBytes = messageDigest.digest(input.getBytes(StandardCharsets.UTF_8));
messageDigest.update(hashBytes);
return messageDigest.digest();
} catch (NoSuchAlgorithmException e) {
LogUtil.error("No such algorithm exception: {}", e.getMessage());
throw new RuntimeException(e);
}

}

protected static Long calculateHash(byte[] digest, int idx) {
if (digest.length < (idx + 1) * 8) {
throw new IllegalArgumentException("Insufficient length of digest");
}

long hash = 0;
// 8 bytes digest,a byte is 8 bits like :1321 2432
// each loop choose a byte to calculate hash,and shift i*8 bits
for (int i = 0; i < 8; i++) {
hash |= (255L & (long)digest[i + idx * 8]) << (8 * i);
}
return hash;
}

/**
* Choose one from the list of existing service addresses list
*
* @param serviceUrlList Service address list
* @param rpcRequest
* @return
*/
@Override
public String selectServiceAddress(List<String> serviceUrlList, RpcRequest rpcRequest) {
int serviceListHash = System.identityHashCode(serviceUrlList);
String interfaceName = rpcRequest.getServiceName();
String selectorKey = interfaceName + serviceListHash;

ConsistentHashLoadBalanceSelector consistentHashLoadBalanceSelector = serviceToSelectorMap
.computeIfAbsent(selectorKey, key -> new ConsistentHashLoadBalanceSelector(serviceUrlList, VIRTUAL_NODES));

return consistentHashLoadBalanceSelector.select(interfaceName + Arrays.stream(rpcRequest.getParameters()));
}

}

Send a request

Send request
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
  @Override
public Object sendRpcRequest(RpcRequest rpcRequest) {
CompletableFuture<RpcResponse<Object>> result = new CompletableFuture<>();
InetSocketAddress address = findServiceAddress(rpcRequest);
Channel channel = fetchAndConnectChannel(address);
if (channel.isActive()) {
addToProcessQueue(rpcRequest.getTraceId(), result);
RpcData rpcData = prepareRpcData(rpcRequest);
sendRpcData(channel, rpcData, result);
} else {
log.error("Send request[{}] failed", rpcRequest);
throw new IllegalStateException();
}
return result;
}

private void addToProcessQueue(String traceId, CompletableFuture<RpcResponse<Object>> result) {
waitingProcessRequestQueue.put(traceId, result);
}

private RpcData prepareRpcData(RpcRequest rpcRequest) {
return RpcData.builder()
.data(rpcRequest)
.serializeMethodCodec(SerializationTypeEnum.HESSIAN.getCode())
.compressType(CompressTypeEnum.GZIP.getCode())
.messageType(RpcConstants.REQUEST_TYPE)
.build();
}
private void sendRpcData(Channel channel, RpcData rpcData, CompletableFuture<RpcResponse<Object>> result) {
channel.writeAndFlush(rpcData).addListener((ChannelFutureListener)future -> {
if (future.isSuccess()) {
LogUtil.info("client send message: [{}]", rpcData);
} else {
future.channel().close();
result.completeExceptionally(future.cause());
LogUtil.error("Send failed:", future.cause());
}
});
}

Linking to a server using a channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private Channel fetchAndConnectChannel(InetSocketAddress address) {
Channel channel = addressChannelManager.get(address);
if (channel == null) {
// connect to service to get new address and rebuild the channel
channel = connect(address);
addressChannelManager.set(address, channel);
}
return channel;
}

private Channel connect(InetSocketAddress address) {
CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
bootstrap.connect(address).addListener((ChannelFutureListener)future -> {
if (future.isSuccess()) {
// set channel to future
LogUtil.info("The client has connected [{}] successful!", address.toString());
completableFuture.complete(future.channel());
} else {
LogUtil.error("The client failed to connect to the server [{}],future", address.toString(), future);
throw new IllegalStateException();
}
});
Channel channel = null;
try {
channel = completableFuture.get();
} catch (Exception e) {
LogUtil.error("occur exception when connect to server:", e);
}
return channel;
}

Consumer for return value processing

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class NettyRpcClientHandler extends SimpleChannelInboundHandler<RpcData> {

private final RpcSendingServiceAdapterImpl adapter;

private final WaitingProcessRequestQueue waitingProcessRequestQueue;

public NettyRpcClientHandler() {
this.adapter = SingletonFactory.getInstance(RpcSendingServiceAdapterImpl.class);
this.waitingProcessRequestQueue = SingletonFactory.getInstance(WaitingProcessRequestQueue.class);
}

/**
* heart beat handle
*
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// if the channel is free,close it
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent)evt).state();
if (state == IdleState.WRITER_IDLE) {
LogUtil.info("write idle happen [{}]", ctx.channel().remoteAddress());
Channel channel = adapter.getChannel((InetSocketAddress)ctx.channel().remoteAddress());
RpcData rpcData = new RpcData();
rpcData.setSerializeMethodCodec(SerializationTypeEnum.HESSIAN.getCode());
rpcData.setCompressType(CompressTypeEnum.GZIP.getCode());
rpcData.setMessageType(RpcConstants.HEARTBEAT_REQUEST_TYPE);
rpcData.setData(RpcConstants.PING);
channel.writeAndFlush(rpcData).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
} else {
super.userEventTriggered(ctx, evt);
}
}

/**
* Called when an exception occurs in processing a client message
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LogUtil.error("server exceptionCaught");
cause.printStackTrace();
ctx.close();
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcData rpcData) throws Exception {
LogUtil.info("Client receive message: [{}]", rpcData);
RpcData rpcMessage = new RpcData();
setupRpcMessage(rpcMessage);

if (rpcData.isHeartBeatResponse()) {
LogUtil.info("heart [{}]", rpcMessage.getData());
} else if (rpcData.isResponse()) {
RpcResponse<Object> rpcResponse = (RpcResponse<Object>)rpcData.getData();
waitingProcessRequestQueue.complete(rpcResponse);
}
}

private void setupRpcMessage(RpcData rpcMessage) {
rpcMessage.setSerializeMethodCodec(SerializationTypeEnum.HESSIAN.getCode());
rpcMessage.setCompressType(CompressTypeEnum.GZIP.getCode());
}

}