0%

based on springBoot, hand-written a simple RPC framework(1)

based on springBoot, hand-written a simple RPC framework(1)

Code:pjpjsocute/rpc-service: personal rcp attempt (github.com)

Technology stack use includes:springboot,Zookeeper,netty,java spi

RPC Definition

Remote Procedure Call (RPC) is a communication mechanism that allows different services to communicate and interact with each other over the network.

Through RPC, a service can make a request to another service and obtain a response, just like a local call, without the developer having to manually handle the underlying network communication details.The RPC framework encapsulates the underlying network transport and provides functionality such as the definition of remote service interfaces, serialisation and deserialisation of data.

rpc vs. http discrimination:

HTTP is an application layer protocol used to transmit hypertext, which communicates between a client and a server. It is based on the request-response model, where the client sends an HTTP request to the server and the server processes the request and returns the corresponding HTTP response. rpc is more akin to an architectural idea, and rpc can be implemented with HTTP and implemented with TCP.

RPC process

A simple RPC architecture is shown in the diagram:

image-20230522161808172

How to achieve:

image-20230522163034434

server-side implementation based on netty and ZK

According to the diagram above, first we need to implement service registration.

Service registration:

Most current RPC frameworks support annotations for registration, and the same approach is used here.

Define registration annotations

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Inherited
public @interface RpcProvider {

/**
* Service group, default value is empty string
*/
String project() default "default";

/**
* Service version, default value is 1.0
*
* @return
*/
String version() default "1.0";

/**
* Service group, default value is empty string
*/
String group() default "default";

}

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.TYPE})
@Inherited
public @interface RpcConsumer {
/**
* Service project, default value is empty string
*/
String project() default "default";

/**
* Service version, default value is 1.0
*
* @return
*/
String version() default "1.0";

/**
* Service group, default value is empty string
*/
String group() default "default";
}


@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Import(CustomBeanScannerRegistrar.class)
@Documented
public @interface SimpleRpcApplication {

String[] basePackage();
}

This annotation will define the service version, the group (to distinguish between different interfaces with the same name and the same project), and the project name for service exposure.

Similarly, an annotation is needed for consumption; an annotation defining the packages to be scanned

Registering services at startup

First, the annotation with @provider needs to be registered

Get the package to be scanned and then register the annotated bean into spring

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class CustomBeanScannerRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware {

private ResourceLoader resourceLoader;

private static final String API_SCAN_PARAM = "basePackage";

private static final String SPRING_BEAN_BASE_PACKAGE = "org.example.ray";

@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
//get the scan annotation and the bean package to be scanned
String[] scanBasePackages = fetchScanBasePackage(importingClassMetadata);
LogUtil.info("scanning packages: [{}]", (Object) scanBasePackages);

// //scan the package and register the bean
// RpcBeanScanner rpcConsumerBeanScanner = new RpcBeanScanner(registry, RpcConsumer.class);
RpcBeanScanner rpcProviderBeanScanner = new RpcBeanScanner(registry, RpcProvider.class);
RpcBeanScanner springBeanScanner = new RpcBeanScanner(registry, Component.class);
if (resourceLoader != null) {
springBeanScanner.setResourceLoader(resourceLoader);
rpcProviderBeanScanner.setResourceLoader(resourceLoader);
}
int rpcServiceCount = rpcProviderBeanScanner.scan(scanBasePackages);
LogUtil.info("rpcServiceScanner扫描的数量 [{}]", rpcServiceCount);
LogUtil.info("scanning RpcConsumer annotated beans end");
}

@Override
public void setResourceLoader(ResourceLoader resourceLoader) {
this.resourceLoader = resourceLoader;
}

private String[] fetchScanBasePackage(AnnotationMetadata importingClassMetadata){
AnnotationAttributes annotationAttributes = AnnotationAttributes.fromMap(importingClassMetadata.getAnnotationAttributes(SimpleRpcApplication.class.getName()));
String[] scanBasePackages = new String[0];
if (annotationAttributes != null) {
scanBasePackages = annotationAttributes.getStringArray(API_SCAN_PARAM);
}
//user doesn't specify the package to scan,use the Application base package
if (scanBasePackages.length == 0) {
scanBasePackages = new String[]{((org.springframework.core.type.StandardAnnotationMetadata) importingClassMetadata).getIntrospectedClass().getPackage().getName()};
}
return scanBasePackages;
}

}
Register the service and associated configuration before the bean is initialised to ensure that the service is registered when spring starts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Component
public class RpcBeanPostProcessor implements BeanPostProcessor {

private final RpcServiceRegistryAdapter adapter;

private final RpcSendingServiceAdapter sendingServiceAdapter;

public RpcBeanPostProcessor() {
this.adapter = SingletonFactory.getInstance(RpcServiceRegistryAdapterImpl.class);;
this.sendingServiceAdapter = ExtensionLoader.getExtensionLoader(RpcSendingServiceAdapter.class)
.getExtension(RpcRequestSendingEnum.NETTY.getName());
}

/**
* register service
*
* @param bean
* @param beanName
* @return
* @throws BeansException
*/
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
LogUtil.info("start process register service: {}", bean);
// register service
if (bean.getClass().isAnnotationPresent(RpcProvider.class)) {
RpcProvider annotation = bean.getClass().getAnnotation(RpcProvider.class);
// build rpc service config
RpcServiceConfig serviceConfig = RpcServiceConfig.builder()
.service(bean)
.project(annotation.project())
.version(annotation.version())
.group(annotation.group())
.build();
LogUtil.info("register service: {}", serviceConfig);
adapter.registryService(serviceConfig);
}
return bean;
}
}
A concrete way to implement service registration

To register a service, you should at least include: the service provider (ip), the service name, and the variables in @RpcProvider``, so you can start by defining aRpcServiceConfig.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class RpcServiceConfig {
/**
* service version
*/
private String version = "";

/**
* target service
*/
private Object service;

/**
* belong to which project
*/
private String project = "";

/**
* group
*/
private String group = "";

/**
* generate service name,use to distinguish different service,and * can be split to get the service name
* @return
*/
public String fetchRpcServiceName() {
return this.getProject() + "*" + this.getGroup() + "*" + this.getServiceName() + "*" + this.getVersion();
}

/**
* get the interface name
*
* @return
*/
public String getServiceName() {
return this.service.getClass().getInterfaces()[0].getCanonicalName();
}

}

Provides 2 methods to register a service and get the corresponding bean based on the service name

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface RpcServiceRegistryAdapter {

/**
* @param rpcServiceConfig rpc service related attributes
*/
void registryService(RpcServiceConfig rpcServiceConfig);

/**
* @param rpcClassName rpc class name
* @return service object
*/
Object getService(String rpcClassName);

}

The registration process can be divided into 3 steps, generate the address -> service registration into Zookeeper -> registration into the cache. Here a ConcurrentHashMap is used to cache the service (the api of zookeeper is called at the end of the method for registration, which is skipped because it is not very relevant to RPC, so you can directly refer to the source code).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class RpcServiceRegistryAdapterImpl implements RpcServiceRegistryAdapter {

/**
* cache map
*/
private final Map<String, Object> serviceMap = new ConcurrentHashMap<>();

@Override
public void registryService(RpcServiceConfig rpcServiceConfig) {
try {
// first get address and service
String hostAddress = InetAddress.getLocalHost().getHostAddress();
// add service to zk
LogUtil.info("add service to zk,service name{},host:{}", rpcServiceConfig.fetchRpcServiceName(),hostAddress);
registerServiceToZk(rpcServiceConfig.fetchRpcServiceName(),
new InetSocketAddress(hostAddress, PropertiesFileUtil.readPortFromProperties()));
// add service to map cache
registerServiceToMap(rpcServiceConfig);
} catch (UnknownHostException e) {
LogUtil.error("occur exception when getHostAddress", e);
throw new RuntimeException(e);
}

}

@Override
public Object getService(String rpcServiceName) {
Object service = serviceMap.get(rpcServiceName);
if (null == service) {
throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND.getCode(),"service not found");
}
return service;
}


private void registerServiceToZk(String rpcServiceName, InetSocketAddress inetSocketAddress) {
String servicePath = CuratorClient.ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName + inetSocketAddress.toString();
CuratorFramework zkClient = CuratorClient.getZkClient();
CuratorClient.createPersistentNode(zkClient, servicePath);
}

private void registerServiceToMap(RpcServiceConfig rpcServiceConfig) {
String rpcServiceName = rpcServiceConfig.fetchRpcServiceName();
if (serviceMap.containsKey(rpcServiceName)) {
return;
}
serviceMap.put(rpcServiceName, rpcServiceConfig.getService());
}
}

​ At this point, the registration process for a service is complete.