簡單實用Nest+Nacos+Kafka+Docker實現微服務架構

  • 設計
  • 新建三個Nest服務
  • Nacos
  • 配置中心
  • 嘗試
  • 服務註冊
  • 嘗試
  • 改造user-service (TCP)
  • 改造gateway
  • 改造email-service(Kafka)
  • 其他


PS:並非實戰,一個練習。

設計

一個網關,一個用户服務,一個郵件推送服務。

用户服務部署兩份,郵件推送服務部署兩份。

用户服務和郵件推送服務啓動的時候,通過nacos拉取配置,如

新建三個Nest服務

nest new gateway
cd gateway
nest g app user-service
nest g app email-service

Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#nacos

Nacos

配置中心

新增lib:remote-config

nest g lib remote-config 根目錄新增.env

Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#nestjs_02

service.ts

import {
  Inject,
  Injectable,
  Logger,
  OnModuleDestroy,
  OnModuleInit,
} from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { NacosConfigClient } from 'nacos';

@Injectable()
export class RemoteConfigService implements OnModuleInit, OnModuleDestroy {
  private readonly logger = new Logger(RemoteConfigService.name);
  private client: NacosConfigClient;

  @Inject()
  private configService: ConfigService;

  constructor() {}

  async onModuleInit() {
    try {
      this.client = new NacosConfigClient({
        serverAddr: this.configService.get<string>('NACOS_SERVER'),
        namespace: this.configService.get<string>('NACOS_NAMESPACE'),
        username: this.configService.get<string>('NACOS_SECRET_NAME'),
        password: this.configService.get<string>('NACOS_SECRET_PWD'),
      });

      this.logger.log('Nacos配置客户端初始化成功');
    } catch (error) {
      this.logger.error('Nacos配置客户端初始化失敗', error);
    }
  }

  async onModuleDestroy() {
    await this.client.close();
  }
  async getConfig(
    dataId: string,
    group = this.configService.get('NACOS_GROUP_NAME'),
  ) {
    if (this.configService.get(dataId)) {
      return await this.configService.get(dataId);
    }
    this.watchConfig(dataId, group);
    const config = this.parseConfig(
      await this.client.getConfig(dataId, group),
      'json',
    );
    return config;
  }

  /**
   * 監聽配置變化
   */
  watchConfig(
    dataId: string,
    group = this.configService.get('NACOS_GROUP_NAME'),
  ) {
    this.client.subscribe(
      {
        dataId,
        group,
      },
      (content, configType = 'json') => {
        const config = this.parseConfig(content, configType);
        this.configService.set(dataId, config);
      },
    );
  }

  /**
   * 解析配置內容
   */
  private parseConfig(content: string, type: string): any {
    try {
      if (type === 'json') {
        return JSON.parse(content);
      } else if (type === 'yaml' || type === 'yml') {
        // 簡單的YAML解析,實際項目中可以使用js-yaml等庫
        const config = {};
        content.split('\n').forEach((line) => {
          const parts = line.split(':').map((part) => part.trim());
          if (parts.length >= 2) {
            config[parts[0]] = parts.slice(1).join(':');
          }
        });
        return config;
      } else if (type === 'properties') {
        const config = {};
        content.split('\n').forEach((line) => {
          const parts = line.split('=').map((part) => part.trim());
          if (parts.length >= 2) {
            config[parts[0]] = parts.slice(1).join('=');
          }
        });
        return config;
      }
      return content;
    } catch (error) {
      this.logger.error('配置解析失敗', error);
      return content;
    }
  }
}

module.ts

import { Module } from '@nestjs/common';
import { RemoteConfigService } from './remote-config.service';
import { ConfigModule } from '@nestjs/config';

@Module({
  imports: [
    ConfigModule.forRoot({
      isGlobal: true,
      envFilePath: ['.env'],
    }),
  ],
  providers: [RemoteConfigService],
  exports: [RemoteConfigService],
})
export class RemoteConfigModule {}

Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#nestjs_03

嘗試

在nacos web管理頁面 新增配置

Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#nacos_04

Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#後端_05

Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#nestjs_06

Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#微服務_07

服務註冊

nest g lib nacos 新增一個ip.service.ts

// src/utils/ip.service.ts
import { Injectable } from '@nestjs/common';
import * as os from 'os';

@Injectable()
export class IpService {
  /**
   * 獲取本機非內部 IPv4 地址(優先返回第一個有效地址)
   * @returns 本機 IP 地址(如 192.168.1.100),無有效地址時返回 127.0.0.1
   */
  getLocalIp(): string {
    const interfaces = os.networkInterfaces(); // 獲取所有網絡接口
    let localIp = '127.0.0.1'; // 默認迴環地址

    // 遍歷所有網絡接口
    for (const devName in interfaces) {
      const iface = interfaces[devName];
      if (!iface) continue;

      // 遍歷接口下的所有地址
      for (const alias of iface) {
        // 篩選條件:IPv4、非內部地址(非 127.0.0.1 等迴環地址)、已啓動
        if (
          alias.family === 'IPv4' && // 只取 IPv4
          !alias.internal && // 排除內部地址(如 127.0.0.1)
          alias.address !== '127.0.0.1' && // 進一步排除迴環地址
          !alias.address.startsWith('169.254.') // 排除 APIPA 地址(本地鏈路地址)
        ) {
          localIp = alias.address;
          return localIp; // 找到第一個有效地址後返回
        }
      }
    }

    return localIp; // 未找到有效地址時返回迴環地址
  }
}

nacos.module.ts 動態模塊,傳入參數

import { DynamicModule, Module } from '@nestjs/common';
import { NacosOptions, NacosService } from './nacos.service';
import { IpService } from './ip.service';

@Module({})
export class NacosModule {
  static forRoot(options: NacosOptions): DynamicModule {
    return {
      module: NacosModule,
      providers: [
        IpService,
        {
          provide: 'CONFIG_OPTIONS',
          useValue: options,
        },
        NacosService,
      ],
      exports: [NacosService],
    };
  }
}

nacos.service.ts

import {
  Inject,
  Injectable,
  Logger,
  OnModuleDestroy,
  OnModuleInit,
} from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { NacosNamingClient } from 'nacos';
import { IpService } from './ip.service';

interface Instance {
  instanceId: string;
  healthy: boolean;
  enabled: boolean;
  serviceName?: string;
  weight?: number;
  ephemeral?: boolean;
  clusterName?: string;
}

export interface NacosOptions {
  noRegister?: boolean;
  serviceName: string; //Service name
  instance?: Partial<Instance>; //Instance
  groupName?: string;
  subList?: { groupName?: string; dataId: string }[];
}

@Injectable()
export class NacosService implements OnModuleInit, OnModuleDestroy {
  @Inject()
  private configService: ConfigService;

  @Inject(IpService)
  private ipService: IpService;

  @Inject('CONFIG_OPTIONS')
  private options: NacosOptions;

  private readonly logger = new Logger(NacosService.name);
  private client: NacosNamingClient;
  async onModuleInit() {
    this.client = new NacosNamingClient({
      logger: {
        ...console,
        log: () => {},
        debug: () => {},
      },
      serverList: this.configService.get<string>('NACOS_SERVER'), // replace to real nacos serverList
      namespace: this.configService.get<string>('NACOS_NAMESPACE'),
      username: this.configService.get<string>('NACOS_SECRET_NAME'),
      password: this.configService.get<string>('NACOS_SECRET_PWD'),
    });
    await this.client.ready();
    if (!this.options.noRegister) {
      await this.destroy();
      await this.register();
    }
    this.logger.log('Nacos客户端準備就緒');
  }

  async getSub(serviceName: string) {
    const service = {
      dataId: serviceName,
      groupName: this.configService.get('NACOS_GROUP_NAME'),
    };
    const res = await this.client.getAllInstances(
      service.dataId,
      service.groupName,
    );
    this.configService.set(`nacos_${serviceName}`, res);
    return res.filter((item) => item.healthy);
  }

  async register() {
    await this.client.registerInstance(
      this.options.serviceName,
      // @ts-ignore
      {
        ...this.options.instance,
        ip: this.ipService.getLocalIp(),
        port: this.configService.get<number>('PORT'),
      },
      this.options.groupName,
    );
  }
  async destroy() {
    await this.client.deregisterInstance(
      this.options.serviceName,
      // @ts-ignore
      {
        ...this.options.instance,
        ip: this.ipService.getLocalIp(),
        port: this.configService.get<number>('PORT'),
      },
      this.options.groupName,
    );
  }

  async onModuleDestroy() {}
}
嘗試

在 user-service服務

Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#nacos_08


啓動服務

Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#後端_09

這個時候就可以在其他服務獲取到user-service的請求地址,可以使用http請求來處理。

改造user-service (TCP)

將user-service改造成微服務,使用TCP

import { NestFactory } from '@nestjs/core';
import { UserServiceModule } from './user-service.module';
import { Transport, MicroserviceOptions } from '@nestjs/microservices';

async function bootstrap() {
  const port = Number(process.env.PORT) || 3001;
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    UserServiceModule,
    {
      transport: Transport.TCP,
      options: {
        port,
      },
    },
  );
  await app.listen();
}
bootstrap();

修改user-service.controller.ts

import { Controller, Get } from '@nestjs/common';
import { UserServiceService } from './user-service.service';
import { MessagePattern } from '@nestjs/microservices';

@Controller()
export class UserServiceController {
  constructor(private readonly userServiceService: UserServiceService) {}

  @MessagePattern('user')
  handleFindById() {
    return this.userServiceService.register();
  }
}

Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#nacos_10


啓動服務這裏因為是本地,需要修改下獲取IP的邏輯

Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#後端_11

Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#後端_12

改造gateway

引入nacos

Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#後端_13


app.service.ts中 註冊微服務

import { NacosService } from '@app/nacos';
import { Inject, Injectable, OnApplicationBootstrap } from '@nestjs/common';
import {
  ClientProxyFactory,
  Transport,
  ClientProxy,
} from '@nestjs/microservices';

@Injectable()
export class AppService implements OnApplicationBootstrap {
  private client: ClientProxy;
  @Inject(NacosService)
  private readonly nacosService: NacosService;

  async onApplicationBootstrap() {
    const res = await this.nacosService.getSub('user-service');
    const instance = res[0];

    this.client = ClientProxyFactory.create({
      transport: Transport.TCP,
      options: {
        host: instance.ip,
        port: instance.port,
      },
    });
    await this.client.connect();
  }

  getHello(): string {
    return 'Hello World!';
  }
  async getInstance() {
    const res = await this.client.send('user', '');
    return !res;
  }
}

啓動服務

Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#nestjs_14

改造email-service(Kafka)

service.ts 這裏打印接收的數據

import { Controller, Get, Logger } from '@nestjs/common';
import { EmailServiceService } from './email-service.service';
import { MessagePattern, Payload } from '@nestjs/microservices';

@Controller()
export class EmailServiceController {
  constructor(private readonly emailServiceService: EmailServiceService) {}

  @MessagePattern('email')
  getHello(@Payload() data) {
    Logger.log('email-service', data);
  }
}

對於 main.ts 需要先創建一個臨時configApp,用來獲取kafka-config

import { NestFactory } from '@nestjs/core';
import { EmailServiceModule } from './email-service.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { RemoteConfigModule, RemoteConfigService } from '@app/remote-config';

async function bootstrap() {
  const configApp = await NestFactory.create(RemoteConfigModule);
  await configApp.init();
  const configService = configApp.get<RemoteConfigService>(RemoteConfigService);
  const res = await configService.getConfig('kafka-config');

  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    EmailServiceModule,
    {
      transport: Transport.KAFKA,
      options: {
        client: {
          clientId: 'email-service',
          brokers: res.brokers,
        },
        consumer: {
          groupId: 'email-service',
        },
      },
    },
  );
  await app.listen();
  await configApp.close();
}
bootstrap();

在gateway使用
app.servite.ts

import { NacosService } from '@app/nacos';
import { RemoteConfigService } from '@app/remote-config';
import {
  Inject,
  Injectable,
  Logger,
  OnApplicationBootstrap,
  OnModuleDestroy,
} from '@nestjs/common';
import {
  ClientProxyFactory,
  Transport,
  ClientProxy,
  ClientKafka,
} from '@nestjs/microservices';

@Injectable()
export class AppService implements OnApplicationBootstrap, OnModuleDestroy {
  private client: ClientProxy;
  private kafkaClient: ClientKafka;
  @Inject(NacosService)
  private readonly nacosService: NacosService;
  @Inject(RemoteConfigService)
  private readonly configService: RemoteConfigService;

  async onApplicationBootstrap() {
    const res = await this.nacosService.getSub('user-service');
    const instance = res[0];

    this.client = ClientProxyFactory.create({
      transport: Transport.TCP,
      options: {
        host: instance.ip,
        port: instance.port,
      },
    });
    await this.client.connect();
    Logger.log(`user-service 連接成功`);
    const config = await this.configService.getConfig('kafka-config');
    this.kafkaClient = new ClientKafka({
      client: {
        brokers: config.brokers,
      },
      producer: {
        allowAutoTopicCreation: true,
      },
    });

    await this.kafkaClient.connect();
    Logger.log(`email-service 連接成功`);
  }
  async onModuleDestroy() {
    await this.kafkaClient.close(); // 自動斷開連接(避免資源泄漏)
  }

  getHello(): string {
    return 'Hello World!';
  }
  async getInstance() {
    const res = await this.client.send('user', '');
    return !res;
  }
  sendEmail() {
    this.kafkaClient.emit('email', {
      email: '<EMAIL>',
      content: '測試發送郵件',
    });
  }
}

Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#微服務_15


測試一下

Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#nestjs_16


如果需要返回郵件的發送狀態。如返回true

Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#nestjs_17


如果直接這樣修改。

Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#nestjs_18


會報錯。

Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#kafka_19


send的話,接收reply,需要先訂閲下。

修改app.service.ts 在kafkaClient連接之前。

Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#kafka_20


Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#nacos_21


本地測試email-service開啓多個實例:

Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#後端_22


當關掉一個後,接下來的請求會發送到另外一個上面

Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#nestjs_23

同樣的也可以給email-service加上Nacos

Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#後端_24

因為本地啓動多個相同服務的原因,臨時給nacos加了個隨機數,

Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#微服務_25


Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#kafka_26

Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#kafka_27

Spring Cloud 從入門到精通(二)集成 Nacos 構建微服務實現服務註冊 - freelife -_#微服務_28


可以判斷是否有健康的實例來做進一步處理。

其他

這是一個練習。
其他的,比如,服務下線後,nacos是能檢測到健康狀態的,那麼就要通過配置或者服務變化(nacos有訂閲),來通知使用者如gateway。

部署
如通過docker打包,注入環境變量port等等,應用會通過nacos註冊,網關使用的時候去拿。
這樣就實現了服務的註冊和發現?