文 / Kenyon,資深軟件架構師,15年軟件開發和技術管理經驗,從程序員做到企業技術高管,專注技術管理、架構設計、AI技術應用和落地。
由於公眾號推流的原因,請在關注頁右上角加星標,這樣才能及時收到新文章的推送。
摘要:本文完成了RPC框架的剩餘核心功能,包括基於Nacos的服務註冊中心、多種負載均衡策略(隨機、輪詢、最小連接數)及服務端核心實現,提供了完整的使用示例(服務定義、服務端/客户端實現)和單元測試,最終構建了一個功能完備的RPC框架。
引言
在前面的兩篇文章中,我們完成了RPC框架整體的架構設計,並實現了一些核心的組件。今天,我們將完成RPC框架剩餘的功能,這些功能包括服務註冊中心、服務端核心實現、負載均衡等,在文章最後我們將提供完整的使用示例和測試。
一、服務註冊中心(Registry Center)實現
為了支持多種不同的註冊中心,我們基於單一職責原則、里氏替換原則和接口隔離原則,來對服務註冊中心的接口和實現進行設計。
// 服務註冊中心接口
public interface RegistryCenter {
// 註冊服務
void register(String serviceName, InetSocketAddress address) throws Exception;
// 註銷服務
void unregister(String serviceName, InetSocketAddress address) throws Exception;
// 發現服務
List<InetSocketAddress> discover(String serviceName) throws Exception;
// 訂閲服務變化
void subscribe(String serviceName, ServiceChangeListener listener) throws Exception;
// 取消訂閲
void unsubscribe(String serviceName, ServiceChangeListener listener) throws Exception;
// 關閉連接
void close();
}
// 服務變化監聽器接口
public interface ServiceChangeListener {
void onServiceChange(String serviceName, List<InetSocketAddress> addresses);
}
// 使用Nacos實現的服務註冊中心
public class NacosRegistryCenter implements RegistryCenter {
private static final Logger logger = LoggerFactory.getLogger(NacosRegistryCenter.class);
private final NamingService namingService;
private final Map<String, List<ServiceChangeListener>> listeners = new ConcurrentHashMap<>();
private final String groupName;
//構造函數
public NacosRegistryCenter(String serverAddr) throws NacosException {
this(serverAddr, "DEFAULT_GROUP");
}
//構造函數
public NacosRegistryCenter(String serverAddr, String groupName) throws NacosException {
this.groupName = groupName;
Properties properties = new Properties();
properties.setProperty("serverAddr", serverAddr);
this.namingService = NacosFactory.createNamingService(properties);
}
@Override
public void register(String serviceName, InetSocketAddress address) throws Exception {
logger.debug("Registering service: {} at {} in group {}", serviceName, address, groupName);
Instance instance = new Instance();
instance.setIp(address.getAddress().getHostAddress());
instance.setPort(address.getPort());
// 設置權重
instance.setWeight(1.0);
// 設置為臨時實例,服務下線後自動刪除
instance.setEphemeral(true);
// 設置為健康狀態
instance.setHealthy(true);
namingService.registerInstance(serviceName, groupName, instance);
logger.debug("Successfully registered service: {} at {} in group {}", serviceName, address, groupName);
}
@Override
public void unregister(String serviceName, InetSocketAddress address) throws Exception {
// Nacos會在服務下線後自動移除臨時實例,這裏不需要手動註銷
logger.debug("Unregistering service: {} at {} (will be automatically removed)", serviceName, address);
}
@Override
public List<InetSocketAddress> discover(String serviceName) throws Exception {
logger.debug("Discovering service: {} in group {}", serviceName, groupName);
// 獲取所有健康的服務實例
List<Instance> instances = namingService.getAllInstances(serviceName, groupName);
logger.debug("Found {} instances for service: {}", instances.size(), serviceName);
for (Instance instance : instances) {
logger.debug("Instance: {}:{}", instance.getIp(), instance.getPort());
}
// 添加監聽器以監聽服務變化
namingService.subscribe(serviceName, groupName, new NacosEventListener(serviceName));
List<InetSocketAddress> addresses = new ArrayList<>();
for (Instance instance : instances) {
if (instance.isHealthy()) {
addresses.add(new InetSocketAddress(instance.getIp(), instance.getPort()));
}
}
logger.debug("Returning {} healthy addresses for service: {}", addresses.size(), serviceName);
return addresses;
}
@Override
public void subscribe(String serviceName, ServiceChangeListener listener) throws Exception {
listeners.computeIfAbsent(serviceName, k -> new ArrayList<>()).add(listener);
// 立即觸發一次回調,獲取當前服務列表
listener.onServiceChange(serviceName, discover(serviceName));
}
@Override
public void unsubscribe(String serviceName, ServiceChangeListener listener) throws Exception {
List<ServiceChangeListener> serviceListeners = listeners.get(serviceName);
if (serviceListeners != null) {
serviceListeners.remove(listener);
}
}
@Override
public void close() {
try {
if (namingService != null) {
namingService.shutDown();
}
} catch (Exception e) {
logger.error("Error closing Nacos naming service", e);
}
}
//Nacos事件監聽器
private class NacosEventListener implements EventListener {
private final String serviceName;
public NacosEventListener(String serviceName) {
this.serviceName = serviceName;
}
@Override
public void onEvent(Event event) {
if (event instanceof NamingEvent) {
NamingEvent namingEvent = (NamingEvent) event;
List<Instance> instances = namingEvent.getInstances();
List<InetSocketAddress> addresses = instances.stream()
.filter(Instance::isHealthy)
.map(instance -> new InetSocketAddress(instance.getIp(), instance.getPort()))
.collect(Collectors.toList());
// 觸發監聽器
List<ServiceChangeListener> serviceListeners = listeners.get(serviceName);
if (serviceListeners != null) {
for (ServiceChangeListener listener : serviceListeners) {
try {
listener.onServiceChange(serviceName, addresses);
} catch (Exception e) {
logger.error("Error notifying service change listener", e);
}
}
}
}
}
}
}
二、負載均衡策略(Load Balancer)實現
同樣,為了支持多種不同的負載均衡策略,我們也基於里氏替換原則和設計模式裏面的策略模式來定義組件的接口和實現類。
// 定義負載均衡接口
public interface LoadBalancer {
InetSocketAddress select(List<InetSocketAddress> addresses);
}
// 隨機負載均衡
public class RandomLoadBalancer implements LoadBalancer {
private final Random random = new Random();
@Override
public InetSocketAddress select(List<InetSocketAddress> addresses) {
if (addresses.isEmpty()) {
return null;
}
return addresses.get(random.nextInt(addresses.size()));
}
}
// 輪詢負載均衡
public class RoundRobinLoadBalancer implements LoadBalancer {
private final AtomicInteger index = new AtomicInteger(0);
@Override
public InetSocketAddress select(List<InetSocketAddress> addresses) {
if (addresses.isEmpty()) {
return null;
}
return addresses.get(Math.abs(index.getAndIncrement() % addresses.size()));
}
}
// 最小連接數負載均衡(簡化版)
public class LeastConnectionLoadBalancer implements LoadBalancer {
// 模擬連接數統計
private final Map<InetSocketAddress, AtomicInteger> connectionCount = new ConcurrentHashMap<>();
@Override
public InetSocketAddress select(List<InetSocketAddress> addresses) {
if (addresses.isEmpty()) {
return null;
}
return addresses.stream()
.min(Comparator.comparingInt(address -> connectionCount.getOrDefault(address, new AtomicInteger(0)).get()))
.orElse(null);
}
// 記錄連接數變化
public void incrementConnection(InetSocketAddress address) {
connectionCount.computeIfAbsent(address, k -> new AtomicInteger(0)).incrementAndGet();
}
public void decrementConnection(InetSocketAddress address) {
AtomicInteger count = connectionCount.get(address);
if (count != null) {
count.decrementAndGet();
}
}
}
//ServiceProxy.java服務代理類構造函數在使用負載均衡時的使用方式
public ServiceProxy(Class<?> serviceClass, RegistryCenter registryCenter) {
// 默認使用隨機負載均衡,也可以在構造函數中傳入其他負載均衡策略
this(serviceClass, registryCenter, new RandomLoadBalance());
}
三、服務端核心實現
// RPC服務端核心類
public class RpcServer {
private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);
private final TransportServer transportServer;
private final RegistryCenter registryCenter;
private final ServiceRegistry serviceRegistry;
private final int port;
private final AtomicBoolean started = new AtomicBoolean(false);
private final Set<String> registeredServices = new HashSet<>();
//構造函數
public RpcServer(int port, RegistryCenter registryCenter) {
this.port = port;
this.registryCenter = registryCenter;
this.serviceRegistry = new ServiceRegistry();
this.transportServer = new NettyTransportServer();
logger.debug("RpcServer created with registryCenter: {}", registryCenter);
}
//註冊服務
public void registerService(Class<?> serviceClass, Object serviceImpl) {
if (!serviceClass.isAssignableFrom(serviceImpl.getClass())) {
throw new IllegalArgumentException("Service implementation must implement the service interface");
}
String serviceName = serviceClass.getName();
serviceRegistry.registerService(serviceName, serviceImpl);
registeredServices.add(serviceName);
logger.debug("Registered service: {} with implementation: {}", serviceName, serviceImpl.getClass().getName());
}
//啓動服務
public void start() throws Exception {
logger.debug("Starting RpcServer...");
if (!started.compareAndSet(false, true)) {
logger.debug("Server already started, returning.");
return;
}
try {
// 啓動網絡傳輸服務
logger.debug("Starting transport server on port: {}", port);
transportServer.start(port, new RpcRequestHandler(serviceRegistry));
logger.debug("RPC server started on port: {}", port);
} catch (Exception e) {
logger.error("Failed to start transport server", e);
throw e;
}
// 註冊服務到服務中心
logger.debug("Registry center: {}", registryCenter);
logger.debug("Number of registered services: {}", registeredServices.size());
if (registryCenter != null) {
InetSocketAddress address = new InetSocketAddress("localhost", port);
logger.debug("Attempting to register {} services", registeredServices.size());
for (String serviceName : registeredServices) {
logger.debug("Registering service: {} at {}", serviceName, address);
registryCenter.register(serviceName, address);
}
} else {
logger.debug("Registry center is null, skipping service registration");
}
}
//停止服務
public void stop() throws Exception {
if (!started.compareAndSet(true, false)) {
return;
}
// 從服務中心註銷服務
if (registryCenter != null) {
InetSocketAddress address = new InetSocketAddress("localhost", port);
for (String serviceName : registeredServices) {
registryCenter.unregister(serviceName, address);
}
registryCenter.close();
}
// 停止網絡傳輸服務
transportServer.stop();
logger.debug("RPC server stopped");
}
//獲取服務端口
public int getPort() {
return port;
}
//服務是否已啓動
public boolean isStarted() {
return started.get();
}
}
四、RPC框架的使用示例
這裏的示例我們使用計算服務來展示RPC框架的使用。
1. 定義服務接口
我們先定義一個簡單的計算器服務接口,包含加法、減法、乘法和除法運算。
// 計算器服務接口
public interface CalculatorService {
//加法運算
int add(int a, int b);
//減法運算
int subtract(int a, int b);
//乘法運算
int multiply(int a, int b);
//除法運算
int divide(int a, int b) throws IllegalArgumentException;
}
2. 服務端實現
以下是計算器服務接口的具體實現,為了方便查看被調用的情況,我們添加了一些日誌記錄和異常處理。
// 計算器服務實現
@RpcService(CalculatorService.class)
public class CalculatorServiceImpl implements CalculatorService {
private static final Logger logger = LoggerFactory.getLogger(CalculatorServiceImpl.class);
@Override
public int add(int a, int b) {
int result = a + b;
logger.info("Calculated: {} + {} = {}", a, b, result);
return result;
}
@Override
public int subtract(int a, int b) {
int result = a - b;
logger.info("Calculated: {} - {} = {}", a, b, result);
return result;
}
@Override
public int multiply(int a, int b) {
int result = a * b;
logger.info("Calculated: {} * {} = {}", a, b, result);
return result;
}
@Override
public int divide(int a, int b) throws IllegalArgumentException {
if (b == 0) {
throw new IllegalArgumentException("除數不能為0");
}
int result = a / b;
logger.info("Calculated: {} / {} = {}", a, b, result);
return result;
}
}
// 服務端示例的啓動類
public class ServerExample {
public static void main(String[] args) {
// 默認使用Nacos作為註冊中心
RegistryCenter registryCenter = new NacosRegistryCenter("localhost:8848");
// 如果想使用ZooKeeper作為註冊中心的話就打開下面這行註釋即可
// RegistryCenter registryCenter = new ZookeeperRegistryCenter("localhost:2181");
int port = 8081;
RpcServer server = new RpcServer(port, registryCenter);
CalculatorService calculatorService = new CalculatorServiceImpl();
server.registerService(CalculatorService.class, calculatorService);
server.start();
}
}
3. 客户端實現
我們實現客户端代碼,使用代理模式調用遠程服務。
// 客户端啓動類
public class ClientExample {
public static void main(String[] args) {
// 使用Nacos作為註冊中心
RegistryCenter registryCenter = new NacosRegistryCenter("localhost:8848");
// 或者使用ZooKeeper作為註冊中心
// RegistryCenter registryCenter = new ZookeeperRegistryCenter("localhost:2181");
// 使用輪詢負載均衡策略
RoundRobinLoadBalance loadBalance = new RoundRobinLoadBalance();
CalculatorService calculatorService = (CalculatorService) Proxy.newProxyInstance(
CalculatorService.class.getClassLoader(),
new Class<?>[]{CalculatorService.class},
new ServiceProxy(CalculatorService.class, registryCenter, loadBalance)
);
// 調用遠程服務
int result = calculatorService.add(10, 5);
System.out.println("10 + 5 = " + result);
}
}
五、框架測試與優化
1. 單元測試示例
// 服務註冊中心測試
public class ServerTest {
private static final Logger logger = LoggerFactory.getLogger(ServerTest.class);
private RegistryCenter registryCenter;
private RpcServer server;
private Thread serverThread;
private int port = 8081; // 將端口作為實例變量
@Before
public void setUp() throws Exception {
// 檢查系統屬性中是否指定了端口
String portProperty = System.getProperty("server.port");
if (portProperty != null && !portProperty.isEmpty()) {
try {
port = Integer.parseInt(portProperty);
logger.info("Using port from system property: {}", port);
} catch (NumberFormatException e) {
logger.warn("Invalid port specified in system property, using default port 8081");
}
}
// 創建Nacos註冊中心實例
logger.info("Creating Nacos registry center...");
registryCenter = new NacosRegistryCenter("localhost:8848");
logger.info("Nacos registry center created: {}", registryCenter);
// 創建RPC服務器
logger.info("Creating RPC server with registry center...");
server = new RpcServer(port, registryCenter);
logger.info("RPC server created: {}", server);
// 創建服務實現類實例
CalculatorService calculatorService = new CalculatorServiceImpl();
// 註冊服務
logger.info("Registering service...");
server.registerService(CalculatorService.class, calculatorService);
logger.info("Service registered.");
// 在獨立線程中啓動服務器
serverThread = new Thread(() -> {
try {
logger.info("Starting server on port {}...", port);
server.start();
logger.info("Server started.");
} catch (Exception e) {
logger.error("Failed to start RPC server", e);
}
});
serverThread.start();
// 等待服務器啓動
Thread.sleep(2000);
logger.info("RPC server started successfully on port {}", port);
logger.info("Service registered: {}", CalculatorService.class.getName());
}
@Test
public void testServerRunning() throws InterruptedException {
// 保持服務器運行一段時間用於測試
logger.info("Server is running, keeping it alive for testing...");
Thread.sleep(30000); // 保持運行30秒用於測試
}
@After
public void tearDown() {
try {
// 關閉資源
if (server != null) {
server.stop();
}
if (registryCenter != null) {
registryCenter.close();
}
// 等待服務器線程結束
if (serverThread != null) {
serverThread.join(5000); // 最多等待5秒
}
logger.info("Server stopped.");
} catch (Exception e) {
logger.error("Error stopping server", e);
}
}
}
// 客户端測試
public class ClientTest {
private static final Logger logger = LoggerFactory.getLogger(ClientTest.class);
private RegistryCenter registryCenter;
private CalculatorService calculatorService;
@Before
public void setUp() throws Exception {
// 創建註冊中心
logger.info("Creating Nacos registry center...");
registryCenter = new NacosRegistryCenter("localhost:8848");
logger.info("Nacos registry center created: {}", registryCenter);
// 創建服務代理,使用輪詢負載均衡策略
logger.info("Creating service proxy with round robin load balance...");
RoundRobinLoadBalance loadBalance = new RoundRobinLoadBalance();
calculatorService = (CalculatorService) Proxy.newProxyInstance(
CalculatorService.class.getClassLoader(),
new Class<?>[]{CalculatorService.class},
new ServiceProxy(CalculatorService.class, registryCenter, loadBalance)
);
logger.info("RPC client initialized successfully");
}
@Test
public void testCalculatorServiceAdd() throws Exception {
logger.info("--- Testing Calculator Service Add ---");
// 測試加法
int result = calculatorService.add(10, 5);
assertEquals(15, result);
logger.info("10 + 5 = {}", result);
}
@Test
public void testCalculatorServiceSubtract() throws Exception {
logger.info("--- Testing Calculator Service Subtract ---");
// 測試減法
int result = calculatorService.subtract(10, 5);
assertEquals(5, result);
logger.info("10 - 5 = {}", result);
}
@Test
public void testCalculatorServiceMultiply() throws Exception {
logger.info("--- Testing Calculator Service Multiply ---");
// 測試乘法
int result = calculatorService.multiply(10, 5);
assertEquals(50, result);
logger.info("10 * 5 = {}", result);
}
@Test
public void testCalculatorServiceDivide() throws Exception {
logger.info("--- Testing Calculator Service Divide ---");
// 測試除法
int result = calculatorService.divide(10, 5);
assertEquals(2, result);
logger.info("10 / 5 = {}", result);
}
@Test
public void testMultipleCallsWithLoadBalance() throws Exception {
logger.info("--- Testing Calculator Service with Load Balance ---");
// 多次調用以測試負載均衡
for (int i = 0; i < 5; i++) {
// 測試加法
int result = calculatorService.add(10, 5);
assertEquals(15, result);
logger.info("Round {}: 10 + 5 = {}", i+1, result);
// 稍微延時以便觀察負載均衡效果
Thread.sleep(500);
}
logger.info("All RPC calls completed successfully");
}
@After
public void tearDown() {
// 關閉資源
if (registryCenter != null) {
registryCenter.close();
}
}
}
2. 性能優化建議
為了提高RPC框架的性能,後續可以考慮按照以下的建議來進行優化:
- 優化客户端的連接池管理方式,避免頻繁創建和關閉連接
- 增加異步調用模式的支持,提高框架的併發處理能力
- 合併請求,讓框架支持批量發送多個請求,從而減少網絡的開銷
- 對請求和響應的數據進行壓縮,減少網絡的傳輸量
- 緩存服務發現時返回的結果列表,減少客户端向註冊中心發起的訪問,同時也增加註冊中心實例變化時的監聽功能,當服務實例有變化時,能夠及時更新本地的數據。
- 合理配置線程池參數,提高系統吞吐量
六、系列總結
在這個"手搓RPC框架"系列的3篇文章裏面,我們基於常見的架構設計原則、方法和模式,從0到1實現了一個功能完整的RPC框架。其中,我們重點應用了以下的這些架構設計的原則和方法:
1. 核心架構原則應用
- SOLID原則:每個組件只負責一個明確的功能,組件之間通過接口通信,實現高內聚低耦合。而且類、模塊等軟件實體基本都是面向接口編程,從而實現對擴展開放,對修改關閉,同時子類也都能平滑地替換父類而不破壞程序正確性。
- 高內聚低耦合:組件內部功能緊密相關,組件間通過接口通信,減少了組件之間的依賴關係,提高了系統的可維護性和可擴展性。
- KISS原則:實現簡潔明瞭,避免過度設計,保持代碼的可讀性和可維護性。
- 依賴倒置原則:通過依賴注入實現解耦,減少了組件之間的直接依賴關係,提高了系統的靈活性和可測試性。
- 策略模式:負載均衡等功能支持多種策略切換,用户可以根據實際場景選擇合適的策略,而無需修改框架代碼。
2. 框架特點
- 通過面向接口編程的方式,使得框架能夠支持自定義序列化、傳輸、負載均衡等組件,從而實現高度的可擴展性和可定製性。
- 簡單的API接口,方便服務註冊和調用。用户只需要定義服務接口,實現服務端的業務邏輯,即可完成服務的註冊和調用。
- 服務自動發現、故障轉移等機制,能夠自動發現服務端實例,同時在實例故障時能夠自動切換到其他可用的實例,提高系統的可用性和容錯性。
- 支持連接池、異步調用等優化手段,提高系統的併發處理能力和吞吐量。
3. 未來擴展方向
- 支持更多序列化協議(如Protobuf、Kryo等)
- 實現服務治理功能(限流、降級、熔斷等)
- 增加監控和追蹤能力
- 支持分佈式事務
- 實現集羣部署和動態擴縮容
通過這個系列文章,我們即學習了RPC框架的設計和實現,同時也通過實現RPC框架的過程掌握了該如何將架構設計原則應用到實際項目中,構建一個高質量、可維護的軟件系統。
項目我已經放到了GitHub上,歡迎star和fork。
互動話題:你對這個RPC框架的實現有什麼建議或改進意見?你在實際工作中使用過哪些優秀的RPC框架?歡迎在評論區分享你的觀點。
關於作者
Kenyon,資深軟件架構師,15年的軟件開發和技術管理經驗,從程序員做到企業技術高管。多年企業數字化轉型和軟件架構設計經驗,善於幫助企業構建高質量、可維護的軟件系統,目前專注技術管理、架構設計、AI技術應用和落地;全網統一名稱"六邊形架構",歡迎關注交流。
原創不易,轉載請聯繫授權,如果覺得有幫助,請點贊、收藏、轉發三連支持!