前言
圍繞Socket 的基礎概念、I/O 模型,逐步實現阻塞 I/O 客户端 - 服務器、多進程 / 多線程服務端處理,以及基於 select、poll、epoll 的 I/O 多路複用服務端,側重與如何實現。
什麼是socket
在C++中,Socket編程是一種用於在網絡上進行通信的技術,它允許不同主機上的應用程序通過TCP/IP協議族進行數據交換。Socket作為應用層與TCP/IP協議族通信的中間軟件抽象層,提供了一組接口,隱藏了協議的複雜性,使得開發者只需要關注簡單的接口即可實現網絡通信。
I/O
I/O (Input/Output) - 輸入輸出:泛指計算機與外部世界的數據交換,socket是網絡I/O;
進行I/O時分為兩步:一是檢測是否符合條件,二是進行操作。阻塞條件下不滿足條件就會一直檢測直到符合條件;非阻塞檢測一次就會返回。
I/O多路複用就是隻檢測,找到符合條件的文件描述符,然後只處理這些文件描述符。現在常見的I/O多路複用有三種select、poll、epoll,在檢測時select、poll是通過輪詢進行檢測,epoll是通過回調進行實現。還有就是select需要每次把關注的fd集合拷貝到內核,而epoll只需要第一次把fd拷貝到紅黑樹中;
阻塞 I/O(Blocking I/O):滿足發送/接收條件才輸入輸出
- 當進程執行I/O操作時,如果條件不滿足,進程會被操作系統掛起(睡眠),直到條件滿足才被喚醒繼續執行。
應用程序 內核 網絡
| | |
| send() | |
|--------------->| |
| | wait buffer space |
| |<----------------> |
| | copy data |
| | start send |
| |------------------>|
| return size | |
|<---------------| |
應用程序 內核 網絡
| | |
| recv() | |
|--------------->| |
| |check recv buffer|
| | if null->wait |
| |<--------------->|
| | else recv |
| |<----------------|
| | copy data to |
| return size | user space |
|<---------------| |
非阻塞 I/O(Non-blocking I/O):
- 當進程執行I/O操作時,如果條件不滿足,系統調用立即返回一個錯誤,而不是讓進程進入睡眠狀態。進程可以繼續執行其他任務,稍後再重試。
- 非阻塞 I/O 的關鍵特徵
- 立即返回:無論I/O是否完成,立即返回控制權
- 錯誤碼指示:用特殊錯誤碼錶示"需要等待"
- 主動輪詢:需要程序主動檢查I/O狀態
- 並行處理:可以在等待I/O時做其他事情
| 模型 | 特點 | 適用場景 |
|---|---|---|
| 阻塞 I/O | 條件不滿足時進程掛起 | 簡單應用,連接數少 |
| 非阻塞 I/O | 立即返回,需要輪詢 | 需要實時響應的應用 |
| I/O 多路複用 | 一個線程管理多個 I/O | 高併發服務器 |
TCP-UDP
| 特性 | TCP | UDP |
|---|---|---|
| 連接 | 面向連接 | 無連接 |
| 可靠性 | 可靠,自動重傳 | 不可靠,可能丟包 |
| 順序性 | 保證數據順序 | 不保證順序 |
| 流量控制 | 有滑動窗口 | 無 |
| 頭部開銷 | 20-60字節 | 8字節 |
連接:tcp三次握手,四次揮手
- SYN (Synchronize):同步序號,用於建立連接
- ACK (Acknowledgment):確認標誌
- FIN (Finish):結束標誌,用於關閉連接
- RST (Reset):重置連接
- PSH (Push):推送數據
- URG (Urgent):緊急指針有效
連接-三次握手
Client Server
| |
| 1. SYN (seq=x) |
|--------------------------------------->|
| |
| 2. SYN-ACK (seq=y, ack=x+1) |
|<---------------------------------------|
| |
| 3. ACK (ack=y+1) |
|--------------------------------------->|
| |
| 連接建立,開始數據傳輸 |
|<======================================>|
斷開連接-四次揮手
Client Server
| |
| 1. FIN (seq=u) |
|--------------------------------------->|
| |
| 2. ACK (ack=u+1) |
|<---------------------------------------|
| |
| 服務器處理剩餘數據 |
|<======================================>|
| |
| 3. FIN (seq=v, ack=u+1) |
|<---------------------------------------|
| |
| 4. ACK (ack=v+1) |
|--------------------------------------->|
| 雙方關閉連接 |
send-recv
tcp使用send-recv收發消息;
- 發送消息- send
ssize_t send(int sockfd, // 目標socket文件描述符
const void *buf, // 要發送的數據緩衝區
size_t len, // 要發送的數據長度
int flags); // 發送標誌(通常為0)
// 0 - 默認行為,阻塞發送
send(sockfd, buf, len, 0);
// MSG_DONTWAIT - 非阻塞發送
send(sockfd, buf, len, MSG_DONTWAIT);
// MSG_NOSIGNAL - 不生成SIGPIPE信號
send(sockfd, buf, len, MSG_NOSIGNAL);
// MSG_OOB - 發送帶外數據(緊急數據)
send(sockfd, buf, len, MSG_OOB);
// MSG_MORE - 提示有更多數據要發送(用於UDP)
send(sockfd, buf, len, MSG_MORE);
// 可以組合使用
send(sockfd, buf, len, MSG_DONTWAIT | MSG_NOSIGNAL);
- 接收消息- recv
ssize_t recv(int sockfd, // socket文件描述符
void *buf, // 接收數據緩衝區
size_t len, // 緩衝區長度
int flags); // 接收標誌(通常為0)
// 常用的flags值:
// 0 - 默認行為,阻塞接收
recv(sockfd, buf, len, 0);
// MSG_DONTWAIT - 非阻塞接收
recv(sockfd, buf, len, MSG_DONTWAIT);
// MSG_PEEK - 查看數據但不從緩衝區移除
recv(sockfd, buf, len, MSG_PEEK);
// MSG_OOB - 接收帶外數據(緊急數據)
recv(sockfd, buf, len, MSG_OOB);
// MSG_WAITALL - 等待接收所有請求的字節
recv(sockfd, buf, len, MSG_WAITALL);
// MSG_TRUNC - 即使數據被截斷也返回數據包長度(原始長度)
recv(sockfd, buf, len, MSG_TRUNC);
// 可以組合使用(某些組合可能無效)
recv(sockfd, buf, len, MSG_DONTWAIT | MSG_PEEK);
TCP實例
簡單實例
實現一個簡單的單線程、單個連接的阻塞I/O的客户端-服務器程序
server
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#define PORT 8080
#define BUFFER_SIZE 1024
int main() {
int server_fd, client_fd;
struct sockaddr_in address, client_addr;
socklen_t client_len = sizeof(client_addr);
char buffer[BUFFER_SIZE];
// 創建socket
server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd == 0) {
perror("socket failed");
return 1;
}
int opt = 1;
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
perror("setsockopt failed");
close(server_fd);
return 1;
}
// 設置地址
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
// 綁定
if (bind(server_fd, (struct sockaddr*)&address, sizeof(address)) < 0) {
perror("bind failed");
return 1;
}
// 監聽
if (listen(server_fd, 3) < 0) {
perror("listen");
return 1;
}
printf("Server started on port %d\n", PORT);
// 接受連接
client_fd = accept(server_fd, (struct sockaddr*)&client_addr, &client_len);
if (client_fd < 0) {
perror("accept");
return 1;
}
printf("Client connected\n");
while (1) {
// 讀取數據
int bytes_read = read(client_fd, buffer, BUFFER_SIZE - 1);
if (bytes_read > 0) {
buffer[bytes_read] = '\0';
printf("Received: %s\n", buffer);
// 發送響應
const char* response = "Hello from server!\n";
write(client_fd, response, strlen(response));
}
else if (bytes_read == 0) {
printf("Client disconnected\n");
break;
}
else {
perror("Read failed");
break;
}
}
// 關閉連接
close(client_fd);
return 0;
}
- CMakeLists.txt
cmake_minimum_required(VERSION 3.0)
project(socketServer)
# include_directories(${CMAKE_SOURCE_DIR}/CMAKE_SOURCE_DIRinclude)
include_directories(include)
add_compile_options(-g -std=c++11 -o2 -Wall)
set(CMAKE_BUILD_TYPE Debug)
# 設置可執行文件輸出目錄(在 build/bin/)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_SOURCE_DIR}/lib)
# 設置庫文件輸出目錄(在 build/lib/)
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_SOURCE_DIR}/lib)
add_executable(server ./src/server.cpp)
target_link_libraries(server PRIVATE pthread)
client
#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#define SERVER_IP "127.0.0.1"
#define PORT 8080
#define BUFFER_SIZE 1024
int main() {
int sock = 0;
struct sockaddr_in serv_addr;
char buffer[BUFFER_SIZE] = {0};
// 創建socket
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
std::cerr << "Socket creation error: " << strerror(errno) << std::endl;
return EXIT_FAILURE;
}
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(PORT);
// 轉換IP地址
if (inet_pton(AF_INET, SERVER_IP, &serv_addr.sin_addr) <= 0) {
std::cerr << "Invalid address/Address not supported: " << strerror(errno) << std::endl;
return EXIT_FAILURE;
}
// 連接服務器
if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
std::cerr << "Connection failed: " << strerror(errno) << std::endl;
return EXIT_FAILURE;
}
std::cout << "Connected to server at " << SERVER_IP << ":" << PORT << std::endl;
std::cout << "Type your messages (type 'exit' to quit):" << std::endl;
while (true) {
// 獲取用户輸入
std::string input;
//不使用do-while 如果直接按enter,程序會向下走,send發送為空,但是服務端接收不到,就會在read中等待
//客户端也會在read中等待
do{
std::cout << "> ";
std::getline(std::cin, input);
}while(input.empty());
// 檢查退出命令
if (input == "exit") {
break;
}
// 發送消息到服務器
if (send(sock, input.c_str(), input.length(), 0) < 0) {
std::cerr << "Send failed: " << strerror(errno) << std::endl;
break;
}
std::cout << "Message sent to server" << std::endl;
// 接收服務器響應
memset(buffer, 0, BUFFER_SIZE);
int valread = read(sock, buffer, BUFFER_SIZE - 1);
if (valread < 0) {
std::cerr << "Read error: " << strerror(errno) << std::endl;
break;
} else if (valread == 0) {
std::cout << "Server closed the connection" << std::endl;
break;
} else {
buffer[valread] = '\0';
std::cout << "Server response:\n" << buffer << std::endl;
}
}
close(sock);
std::cout << "Connection closed" << std::endl;
return 1;
}
非阻塞方式
// 獲取當前文件狀態標誌
int flags = fcntl(fd, F_GETFL, 0);
if (flags < 0) {
perror("fcntl F_GETFL");
return -1;
}
// 添加非阻塞標誌
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) {
perror("fcntl F_SETFL");
return -1;
}
int on = 1; // 1表示啓用非阻塞
if (ioctl(fd, FIONBIO, &on) < 0) {
perror("ioctl FIONBIO");
return -1;
}
服務端接收多個客户端
- 多進程、多線程、select、poll、epoll
- 少量連接:多進程/多線程
- 中等併發:select/poll
- 高併發:epoll
多進程 fork()
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <signal.h>
#define PORT 8080
#define BUFFER_SIZE 1024
int main() {
int server_fd, client_fd;
struct sockaddr_in address, client_addr;
socklen_t client_len = sizeof(client_addr);
char buffer[BUFFER_SIZE];
// 創建socket
server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd == 0) {
perror("socket failed");
return 1;
}
int opt = 1;
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
perror("setsockopt failed");
close(server_fd);
return 1;
}
// 設置地址
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
// 綁定
if (bind(server_fd, (struct sockaddr*)&address, sizeof(address)) < 0) {
perror("bind failed");
return 1;
}
// 監聽
if (listen(server_fd, 3) < 0) {
perror("listen");
return 1;
}
signal(SIGCHLD, SIG_IGN);
printf("Server started on port %d\n", PORT);
// 只需要修改主循環部分
while (true) {
// 接受連接
client_fd = accept(server_fd, (struct sockaddr*)&client_addr, &client_len);
if (client_fd < 0) {
perror("accept faild!");
continue; // 繼續等待下一個連接
}
printf("Client connected\n");
// 創建子進程處理客户端
pid_t pid = fork();
if (pid < 0) {
perror("fork failed");
close(client_fd);
continue;
}
if (pid == 0) { // 子進程
close(server_fd); // 子進程不需要監聽socket
// 處理客户端連接(使用你現有的while循環)
while (true) {
int bytes_read = read(client_fd, buffer, BUFFER_SIZE - 1);
if (bytes_read > 0) {
buffer[bytes_read] = '\0';
printf("PID %d Received: %s\n", getpid(), buffer);
const char* response = "server received!\n";
write(client_fd, response, strlen(response));
}
else if (bytes_read == 0) {
printf("PID %d Client disconnected\n", getpid());
break;
}
else {
perror("Read failed");
break;
}
}
close(client_fd);
exit(0); // 子進程退出
}
else { // 父進程
close(client_fd); // 父進程關閉客户端socket,繼續監聽
}
}
// 關閉連接
close(client_fd);
return 0;
}
多線程 std::thread
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <thread>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <signal.h>
#include <iostream>
#define PORT 8080
#define BUFFER_SIZE 1024
// 客户端處理線程函數
void handle_client(int client_fd) {
char buffer[BUFFER_SIZE];
std::cout << "Thread " << std::this_thread::get_id() << ": Client connected" << std::endl;
while (1) {
memset(buffer, 0, BUFFER_SIZE);
int bytes_read = read(client_fd, buffer, BUFFER_SIZE - 1);
if (bytes_read > 0) {
buffer[bytes_read] = '\0';
const char * response = "receive over!";
std::cout<< "received:" << buffer << std::endl;
write(client_fd, response, strlen(response));
}
else if (bytes_read == 0) {
printf("Thread %lu: Client disconnected\n", pthread_self());
break;
}
else {
perror("Read failed");
break;
}
}
close(client_fd);
return;
}
int main() {
int server_fd, client_fd;
struct sockaddr_in address, client_addr;
socklen_t client_len = sizeof(client_addr);
// 創建socket
server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd == 0) {
perror("socket failed");
return 1;
}
int opt = 1;
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
perror("setsockopt failed");
close(server_fd);
return 1;
}
// 設置地址
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
// 綁定
if (bind(server_fd, (struct sockaddr*)&address, sizeof(address)) < 0) {
perror("bind failed");
return 1;
}
// 監聽
if (listen(server_fd, 3) < 0) {
perror("listen");
return 1;
}
signal(SIGCHLD, SIG_IGN);
printf("Server started on port %d\n", PORT);
while (true) {
// 接受連接
client_fd = accept(server_fd, (struct sockaddr*)&client_addr, &client_len);
if (client_fd < 0) {
perror("accept faild!");
continue; // 繼續等待下一個連接
}
char client_ip[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &client_addr.sin_addr, client_ip, sizeof(client_ip));
int client_port = ntohs(client_addr.sin_port);
std::cout << "New client connected from " << client_ip << ":" << client_port << std::endl;
try {
std::thread client_thread(handle_client, client_fd);
client_thread.detach(); // 分離線程,讓它在後台運行
}
catch (const std::exception& e) {
std::cerr << "Failed to create thread: " << e.what() << std::endl;
close(client_fd);
}
}
close(client_fd);
std::cout << "Server stopped" << std::endl;
return 0;
}
select多路複用
使用select 實現,並做簡單的封裝。
雖然使用多線程和多進程都可以實現對多個客户端監聽,每一個進程/線程都會在send/recv處阻塞,等待發送/接收;
select、poll、epoll可以對文件描述符進行監控,當有事件觸發時才會去調用。
select實現I/O多路複用就是對fd_set(位圖)的操作,如果要監控可讀操作,就聲明一個read的位圖,把已經連接到服務器的clientfd記錄下來(不如clientfd=10,就是把位圖中第十個值置為1);通過select對位圖進行監控,當有滿足read的文件描述符時值不變,不滿足的清空,所以要把連接的客户端存起來避免丟失。所以返回的fd_set就是滿足可讀的已連接客户端,對這些客户端進行操作就行了。
這裏面存在的問題就是系統默認fd_set大小為1024,所以最多連接1020個客户端(0-標準輸入流、1-標準輸出流、2-標準錯誤流、3-服務端文件描述符),當客户端連接過多時會丟失或者出現錯誤。
還有就是fd_set會從用户態拷貝到內核,再拷貝出來,性能上不是很好。
select 是一種 I/O 多路複用技術,允許程序同時監視多個文件描述符(file descriptors),等待一個或多個描述符變為"就緒"狀態(可讀、可寫或發生異常)
I/O多路複用就是使用一個線程管理多個io是否就緒。
-
先了解一下幾個概念
-
fd_set是什麼
fd_set是一個存放文件描述符的數組,大小為1024位; 結構體實際上就是定義一個 長整型的數組 long int fds_bits[FD_SETSIZE / NFDBITS];
1024 = int * 數組個數 * 8 位,能夠儲存1024個文件描述符;也就是實際能夠儲存1024個socket的文件描述符。
每個進程默認打開3個文件描述符,0-標準輸入流、1-標準輸出流、2-標準錯誤流
/* The fd_set member is required to be an array of longs. */
typedef long int __fd_mask;
/* Some versions of <linux/posix_types.h> define this macros. */
#undef __NFDBITS
/* It's easier to assume 8-bit bytes than to get CHAR_BIT. */
#define __NFDBITS (8 * (int) sizeof (__fd_mask))
#define __FD_ELT(d) ((d) / __NFDBITS)
#define __FD_MASK(d) ((__fd_mask) (1UL << ((d) % __NFDBITS)))
/* fd_set for select and pselect. */
typedef struct
{
/* XPG4.2 requires this member name. Otherwise avoid the name
from the global namespace. */
#ifdef __USE_XOPEN
__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->fds_bits)
#else
__fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->__fds_bits)
#endif
} fd_set;
- 文件描述符的分配規則
尋找最小的未使用的文件描述符,所以連接數較少時可以使用select進行實現。
- 文件描述符如何映射到socket
好像是:進程控制塊-進程描述表->文件描述符表->文件對象;具體怎麼映射的沒了解
int select(int maxfdp, //最大文件描述符值加1
fd_set *readfds, //指向可讀文件描述符集合的指針 可讀時接收
fd_set *writefds, //指向可寫文件描述符集合的指針 可寫時發送
fd_set *exceptfds, //指向異常文件描述符集合的指針 常見:帶外數據到達(TCP緊急數據)
struct timeval *timeout); //超時時間結構體指針
- 實現
#include <iostream>
#include <sys/select.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <vector>
#include <algorithm>
#define PORT 8080
#define BUFFER_SIZE 1024
#define MAX_CLIENTS 10
class TCPServer {
private:
int server_socket;
fd_set readfds;
int client_socket[MAX_CLIENTS];
int max_sd;
public:
TCPServer() {
server_socket = -1;
// 初始化客户端socket數組
for (int i = 0; i < MAX_CLIENTS; i++) {
client_socket[i] = 0;
}
}
void init() {
int opt = 1;
struct sockaddr_in address;
// 創建主socket
if ((server_socket = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
perror("socket failed");
exit(EXIT_FAILURE);
}
// 設置socket選項,允許地址重用
if (setsockopt(server_socket, SOL_
perror("setsockopt");
exit(EXIT_FAILURE);
}
// 配置地址
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
// 綁定socket
if (bind(server_socket, (struct sockaddr *)&address, sizeof(address)) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}
std::cout << "Listener on port " << PORT << std::endl;
// 開始監聽
if (listen(server_socket, MAX_CLIENTS) < 0) {
perror("listen");
exit(EXIT_FAILURE);
}
}
void run() {
int activity, new_socket, sd;
int max_sd;
struct sockaddr_in address;
socklen_t addrlen = sizeof(address);
char buffer[BUFFER_SIZE];
std::cout << "Waiting for connections..." << std::endl;
while (true) {
// 清空文件描述符集合
FD_ZERO(&readfds);
// 添加主socket到集合
FD_SET(server_socket, &readfds);
max_sd = server_socket;
// 添加客户端socket到集合
for (int i = 0; i < MAX_CLIENTS; ++i) {
sd = client_socket[i];
if (sd > 0) {
FD_SET(sd, &readfds);
}
max_sd = max_sd > sd ? max_sd : sd;
}
// 等待活動
struct timeval timeout;
timeout.tv_sec = 0; // 秒
timeout.tv_usec = 500;
activity = select(max_sd + 1, &readfds, NULL, NULL, &timeout);
if (activity < 0 && errno != EINTR) {
std::cerr << "select error" << std::endl;
continue;
}else if (activity == 0) {
// 超時處理
continue;
}
// 如果有新連接
if (FD_ISSET(server_socket, &readfds)) {
if ((new_socket = accept(server_socket,
(struct sockaddr *)&address,
&addrlen)) < 0) {
perror("accept");
exit(EXIT_FAILURE);
}
std::cout << "New connection, socket fd: " << new_socket
<< ", IP: " << inet_ntoa(address.sin_addr)
<< ", Port: " << ntohs(address.sin_port) << std::endl;
// 添加到客户端數組
for (int i = 0; i < MAX_CLIENTS; i++) {
if (client_socket[i] == 0) {
client_socket[i] = new_socket;
std::cout << "Adding to list of sockets as " << i <<" client id: " << new_sock et << std::endl;
break;
}
}
}
// 檢查客户端socket的IO操作
for (int i = 0; i < MAX_CLIENTS; i++) {
sd = client_socket[i];
memset(buffer,0,BUFFER_SIZE);
if (FD_ISSET(sd, &readfds)) {
// 檢查是否斷開連接
int valread = read(sd, buffer, BUFFER_SIZE);
if (valread == 0) {
// 客户端斷開連接
getpeername(sd, (struct sockaddr*)&address, &addrlen);
std::cout << "Host disconnected, IP: " << inet_ntoa(address.sin_addr)
<< ", Port: " << ntohs(address.sin_port) << std::endl;
close(sd);
client_socket[i] = 0;
} else {
// 處理接收到的數據
buffer[valread] = '\0';
std::cout << "Received: " << buffer << std::endl;
// 回聲給客户端
send(sd, buffer, strlen(buffer), 0);
}
}
}
}
}
~TCPServer() {
close(server_socket);
for (int i = 0; i < MAX_CLIENTS; ++i) {
if (client_socket[i] > 0) {
std::cout << "close client :"<< client_socket[i] << std::endl;
close(client_socket[i]);
}
}
}
};
int main() {
TCPServer server;
server.init();
server.run();
return 0;
}
poll多路複用
poll不再使用位圖對文件描述符存儲,而是通過pollfd進行控制,可以聲明pollfd的數組(所以就沒有了1024的限制)。
- poll改進
沒有最大文件描述符限制:poll 使用數組,可以處理任意數量的文件描述符
更高效:不需要每次調用都重新設置文件描述符集合
更清晰的事件分離:每個文件描述符都有獨立的事件輸入和輸出字段
- poll事件常量
| 事件常量 | 值(通常) | 説明 | 對象 |
|---|---|---|---|
POLLIN |
0x001 | 數據可讀(普通數據) | events/revents |
POLLPRI |
0x002 | 緊急數據可讀(帶外數據) | events/revents |
POLLOUT |
0x004 | 數據可寫,不阻塞 | events/revents |
POLLRDNORM |
0x040 | 普通數據可讀 | events/revents |
POLLRDBAND |
0x080 | 優先級帶數據可讀 | events/revents |
POLLWRNORM |
0x100 | 普通數據可寫 | events/revents |
POLLWRBAND |
0x200 | 優先級帶數據可寫 | events/revents |
POLLERR |
0x008 | 錯誤情況 | revents |
POLLHUP |
0x010 | 已掛起 | revents |
POLLNVAL |
0x020 | 無效輪詢請求 | revents |
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <iostream>
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <poll.h>
#define PORT 8080
#define MAX_CLIENTS 1000
#define BUFFER_SIZE 1024
#define TIMEOUT -1 // 無限等待
class TCPServer{
private:
int server_fd;
char buffer[BUFFER_SIZE];
struct pollfd fds[MAX_CLIENTS + 1]; // +1 給服務器套接字
int nfds;
public:
TCPServer(){
server_fd = -1;
nfds = 0;
// 初始化所有 pollfd 結構
for (int i = 0; i < MAX_CLIENTS + 1; i++) {
fds[i].fd = -1; // 表示未使用
fds[i].events = 0;
fds[i].revents = 0;
}
};
void init(){
// 創建監聽套接字
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
perror("socket failed");
exit(EXIT_FAILURE);
}
// 設置 SO_REUSEADDR 選項
int opt = 1;
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))) {
perror("setsockopt failed");
exit(EXIT_FAILURE);
}
struct sockaddr_in address;
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
// 綁定地址
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}
// 開始監聽 listen(server_fd, 10) 等待連接的最大個數
if (listen(server_fd, 10) < 0) {
perror("listen failed");
exit(EXIT_FAILURE);
}
printf("Server listening on port %d\n", PORT);
// 添加服務器套接字到 pollfd 數組
fds[0].fd = server_fd;
fds[0].events = POLLIN; // 監視可讀事件(新連接)
nfds = 1; // 當前監視的文件描述符數量
};
void run(){
int current_size = 0;
while (1) {
// 調用 poll,等待事件發生
int ret = poll(fds, nfds, TIMEOUT);
if (ret < 0) {
perror("poll failed");
break;
} else if (ret == 0) {
// 超時(本例中不會發生,因為TIMEOUT=-1)
continue;
}
current_size = nfds; // 保存當前大小,因為 nfds 可能在循環中改變
// 檢查所有文件描述符
for (int i = 0; i < current_size; i++) {
if (fds[i].fd < 0) continue; // 跳過未使用的文件描述符
// 檢查是否有事件發生
if (fds[i].revents == 0) continue;
// 檢查是否是錯誤事件
if (fds[i].revents & (POLLERR | POLLHUP | POLLNVAL)) {
printf("Error on fd %d, closing connection\n", fds[i].fd);
close(fds[i].fd);
fds[i].fd = -1;
fds[i].revents = 0;
continue;
}
// 如果是服務器套接字,處理新連接
if (fds[i].fd == server_fd) {
if (fds[i].revents & POLLIN)
handle_client_connect();
}
// 處理客户端套接字
else {
handle_client_event(i);
}
}
}
};
void handle_client_connect(){
// 接受新連接
struct sockaddr_in address;
socklen_t addrlen = sizeof(address);
int new_socket = accept(server_fd, (struct sockaddr *)&address, &addrlen);
if (new_socket < 0) {
perror("accept failed");
return ;
}
printf("New connection, socket fd is %d, IP: %s, port: %d\n",new_socket,
inet_ntoa(address.sin_addr), ntohs(address.sin_port));
// 查找空位添加新客户端
int added = 0 ;
for (int j = 1; j < MAX_CLIENTS + 1; ++j) {
if (fds[j].fd == -1) {
fds[j].fd = new_socket;
fds[j].events = POLLIN | POLLRDHUP; // 監視可讀和連接關閉
fds[j].revents = 0;
added = 1;
nfds = j >= nfds ? j+1 : nfds;
break;
}
}
if (!added) {
printf("Too many clients, rejecting connection\n");
close(new_socket);
}
};
void handle_client_event(int fd_index){
// 檢查連接是否關閉
if (fds[fd_index].revents & POLLRDHUP) {
printf("Client %d disconnected\n", fds[fd_index].fd);
close(fds[fd_index].fd);
fds[fd_index].fd = -1;
fds[fd_index].revents = 0;
return;
}
// 檢查是否有數據可讀
if (fds[fd_index].revents & POLLIN) {
memset(buffer,0,BUFFER_SIZE);
int valread = read(fds[fd_index].fd, buffer, BUFFER_SIZE);
if (valread == 0) {
// 客户端正常關閉連接
printf("Client %d closed connection\n", fds[fd_index].fd);
close(fds[fd_index].fd);
fds[fd_index].fd = -1;
} else if (valread < 0) {
// 讀取錯誤
perror("read failed");
close(fds[fd_index].fd);
fds[fd_index].fd = -1;
} else {
// 回顯數據
buffer[valread] = '\0';
std::cout << "Received from client : " << fds[fd_index].fd <<" " << buffer << std: :endl;
send(fds[fd_index].fd,buffer,strlen(buffer),0);
}
}
}
~TCPServer(){
// 關閉所有連接
for (int i = 0; i < nfds; i++) {
if (fds[i].fd >= 0) {
close(fds[i].fd);
}
}
if (server_fd >= 0) {
close(server_fd);
}
printf("Server shutdown complete\n");
};
};
int main() {
TCPServer server;
server.init();
server.run();
return 0;
}
- 事件
#define POLLIN 0x001 /* There is data to read. */
#define POLLPRI 0x002 /* There is urgent data to read. */
#define POLLOUT 0x004 /* Writing now will not block. */
#if defined __USE_XOPEN || defined __USE_XOPEN2K8
/* These values are defined in XPG4.2. */
# define POLLRDNORM 0x040 /* Normal data may be read. */
# define POLLRDBAND 0x080 /* Priority data may be read. */
# define POLLWRNORM 0x100 /* Writing now will not block. */
# define POLLWRBAND 0x200 /* Priority data may be written. */
#endif
#ifdef __USE_GNU
/* These are extensions for Linux. */
# define POLLMSG 0x400
# define POLLREMOVE 0x1000
# define POLLRDHUP 0x2000
#endif
/* Event types always implicitly polled for. These bits need not be set in
`events', but they will appear in `revents' to indicate the status of
the file descriptor. */
#define POLLERR 0x008 /* Error condition. */
#define POLLHUP 0x010 /* Hung up. */
#define POLLNVAL 0x020 /* Invalid polling request. */
select/poll 缺點
- 每次調用時要重複地從用户態讀入參數。
- 每次調用時要重複地掃描文件描述符。
- 每次在調用開始時,要把當前進程放入各個文件描述符的等待隊列。在調用結束後,又把進程從各個等待隊列中刪除。
poll核心時間可分為可讀、可寫、異常事件。
epoll()多路複用
-
水平觸發 :fd事件沒有被處理或者沒有處理全部,下次還會報告這個fd
-
邊緣觸發:程序在處理文件描述符的就緒事件時,必須確保將其處理完畢,否則 epoll_wait 將不會重複通知該文件描述符的就緒狀態。
-
基於紅黑樹+鏈表+回調函數
用户空間 內核空間
│ │
│ epoll_create() │
├───────────────────────────────> │ 創建eventpoll結構
│ │ • 初始化紅黑樹(rbr)
│ │ • 初始化就緒鏈表(rdllist)
│ │
│ epoll_ctl(EPOLL_CTL_ADD, fd) │
├───────────────────────────────> │ • 分配epitem結構
│ │ • 插入紅黑樹(O(log n))
│ │ • 設置文件回調函數
│ │
│ epoll_wait() │
├───────────────────────────────> │ • 檢查rdllist是否為空
│ │ • 空則阻塞進程
│ │
│ 數據到達 │ • 網卡中斷/定時器觸發
│ │ • 調用ep_poll_callback()
│ │ • 將epitem加入rdllist
│ │ • 喚醒等待進程
│ │
│ <───────────────────────────────┤ • 複製events到用户空間
│ │ • 清空rdllist(LT模式)
│ │
│ 處理事件 │
│───────────────────────────────> │
│ │
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <vector>
#include <cerrno>
#include <arpa/inet.h>
#define MAX_EVENTS 64
#define BUFFER_SIZE 1024
class EpollServer {
private:
int server_fd;
int epoll_fd;
struct sockaddr_in server_addr;
public:
EpollServer(int port) {
// 創建socket SOCK_STREAM (TCP) SOCK_DERAM (UDP)
server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd < 0) {
perror("socket creation failed");
exit(EXIT_FAILURE);
}
// 設置socket選項
int opt = 1;
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT,
&opt, sizeof(opt))) {
perror("setsockopt failed");
exit(EXIT_FAILURE);
}
// 綁定地址
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(port);
if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}
// 創建epoll實例
epoll_fd = epoll_create1(0);
if (epoll_fd < 0) {
perror("epoll_create1 failed");
exit(EXIT_FAILURE);
}
// 添加server_fd到epoll
struct epoll_event event;
event.data.fd = server_fd;
event.events = EPOLLIN | EPOLLET; // 邊緣觸發模式
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event);
// 監聽socket
if (listen(server_fd, SOMAXCONN) < 0) {
perror("listen failed");
return;
}
std::cout << "Server listening on port "
<< ntohs(server_addr.sin_port) << std::endl;
}
~EpollServer() {
close(server_fd);
close(epoll_fd);
}
// 設置非阻塞模式
void set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
void run() {
// 事件循環
struct epoll_event events[MAX_EVENTS];
while (true) {
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
if (nfds < 0) {
perror("epoll_wait failed");
break;
}
for (int i = 0; i < nfds; i++) {
if (events[i].data.fd == server_fd) {
// 處理新連接
handle_new_connection();
} else {
// 處理客户端數據
if (events[i].events & EPOLLIN) {
handle_client_data(events[i].data.fd);
}
if (events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)) {
// 處理連接關閉或錯誤
close_client(events[i].data.fd);
}
}
}
}
}
private:
void handle_new_connection() {
struct sockaddr_in client_addr;
socklen_t addr_len = sizeof(client_addr);
// 接受所有待處理的連接
do {
int client_fd = accept(server_fd, (struct sockaddr*)&client_addr, &addr_len);
if (client_fd < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 沒有更多連接
break;
} else {
perror("accept failed");
break;
}
}
else{
std::cout << "New connection from "
<< inet_ntoa(client_addr.sin_addr)
<< ":" << ntohs(client_addr.sin_port)
<< std::endl;
// 設置非阻塞
set_nonblocking(client_fd);
// 添加到epoll
struct epoll_event event;
event.data.fd = client_fd;
event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &event) < 0) {
perror("epoll_ctl add client failed");
close(client_fd);
}else{
std::cout << "Successfully added client fd " << client_fd << " to epoll" << std::endl;
}
break;
}
}while (true);
}
void handle_client_data(int client_fd) {
char buffer[BUFFER_SIZE];
// 邊緣觸發模式需要讀取所有可用數據
while (true) {
memset(buffer,0,BUFFER_SIZE);
ssize_t bytes_read = read(client_fd, buffer, BUFFER_SIZE - 1);
if (bytes_read > 0) {
buffer[bytes_read] = '\0';
std::cout << "Received from client " << client_fd << ": " << buffer;
// 回顯數據
send(client_fd, buffer, bytes_read, 0);
// 如果讀取的數據少於緩衝區大小,説明數據已讀完
if (bytes_read < BUFFER_SIZE - 1) {
break;
}
} else if (bytes_read == 0) {
// 客户端關閉連接
close_client(client_fd);
break;
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 數據已讀完
break;
} else {
perror("read failed");
close_client(client_fd);
break;
}
}
}
}
void close_client(int client_fd) {
std::cout << "Client " << client_fd << " disconnected" << std::endl;
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, nullptr);
close(client_fd);
}
};
int main() {
EpollServer server(8080);
server.run();
return 0;
}
UDP實例
udp使用sendto-recvfrom收發消息;
- server
#include <iostream>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#define BUFFER_SIZE 1024
class UDPServer {
private:
int server_fd;
struct sockaddr_in server_addr;
public:
UDPServer(int port) {
// 創建UDP socket
server_fd = socket(AF_INET, SOCK_DGRAM, 0);
if (server_fd < 0) {
perror("socket creation failed");
exit(EXIT_FAILURE);
}
// 設置地址重用
int opt = 1;
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))) {
perror("setsockopt failed");
close(server_fd);
exit(EXIT_FAILURE);
}
// 綁定地址
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY; // 監聽所有接口
server_addr.sin_port = htons(port);
if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
perror("bind failed");
close(server_fd);
exit(EXIT_FAILURE);
}
std::cout << "UDP Server listening on port " << port << std::endl;
}
~UDPServer() {
if (server_fd >= 0) {
close(server_fd);
}
}
void run() {
char buffer[BUFFER_SIZE];
struct sockaddr_in client_addr;
socklen_t addr_len = sizeof(client_addr);
while (true) {
// 清空緩衝區
memset(buffer, 0, BUFFER_SIZE);
memset(&client_addr, 0, sizeof(client_addr));
// 接收數據
ssize_t recv_len = recvfrom(server_fd, buffer, BUFFER_SIZE - 1, 0,
(struct sockaddr*)&client_addr, &addr_len);
if (recv_len < 0) {
perror("recvfrom failed");
continue;
}
// 獲取客户端信息
char client_ip[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &client_addr.sin_addr,
client_ip, INET_ADDRSTRLEN);
std::cout << "Received from " << client_ip << ":"
<< ntohs(client_addr.sin_port)
<< " (" << recv_len << " bytes): "
<< buffer << std::endl;
// 準備回顯消息
std::string echo_msg = "Echo: ";
echo_msg += buffer;
// 發送回顯(發送到剛才接收的地址)
sendto(server_fd, echo_msg.c_str(), echo_msg.length(), 0,
(struct sockaddr*)&client_addr, addr_len);
}
}
void run_with_select() {
fd_set read_fds;
struct timeval timeout;
char buffer[BUFFER_SIZE];
std::cout << "UDP Server with select() running..." << std::endl;
while (true) {
// 設置文件描述符集合
FD_ZERO(&read_fds);
FD_SET(server_fd, &read_fds);
// 設置超時(5秒)
timeout.tv_sec = 5;
timeout.tv_usec = 0;
// 使用select等待數據
int ready = select(server_fd + 1, &read_fds, NULL, NULL, &timeout);
if (ready < 0) {
perror("select failed");
break;
} else if (ready == 0) {
// 超時
std::cout << "Timeout, waiting for data..." << std::endl;
continue;
}
// 有數據可讀
if (FD_ISSET(server_fd, &read_fds)) {
struct sockaddr_in client_addr;
socklen_t addr_len = sizeof(client_addr);
ssize_t recv_len = recvfrom(server_fd, buffer, BUFFER_SIZE - 1, 0,
(struct sockaddr*)&client_addr, &addr_len);
if (recv_len > 0) {
buffer[recv_len] = '\0';
std::cout << "Received: " << buffer << std::endl;
// 回顯
sendto(server_fd, buffer, recv_len, 0,
(struct sockaddr*)&client_addr, addr_len);
}
}
}
}
};
int main() {
UDPServer server(8080);
server.run(); // 或 server.run_with_select();
return 0;
}
- client
#include <iostream>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <string>
class UDPClient {
private:
int client_fd;
struct sockaddr_in server_addr;
public:
UDPClient(const std::string& server_ip, int port) {
// 創建UDP socket
client_fd = socket(AF_INET, SOCK_DGRAM, 0);
if (client_fd < 0) {
perror("socket creation failed");
exit(EXIT_FAILURE);
}
// 設置服務器地址
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
// 轉換IP地址
if (inet_pton(AF_INET, server_ip.c_str(), &server_addr.sin_addr) <= 0) {
std::cerr << "Invalid address: " << server_ip << std::endl;
close(client_fd);
exit(EXIT_FAILURE);
}
std::cout << "UDP Client ready to send to "
<< server_ip << ":" << port << std::endl;
}
~UDPClient() {
if (client_fd >= 0) {
close(client_fd);
}
}
// 發送單條消息
bool send_message(const std::string& message) {
ssize_t sent = sendto(client_fd, message.c_str(), message.length(), 0,
(struct sockaddr*)&server_addr, sizeof(server_addr));
if (sent < 0) {
perror("sendto failed");
return false;
}
std::cout << "Sent " << sent << " bytes: " << message << std::endl;
return true;
}
// 發送並接收回復(阻塞)
bool send_and_receive(const std::string& message, int timeout_sec = 5) {
// 發送消息
if (!send_message(message)) {
return false;
}
// 設置接收超時
struct timeval timeout;
timeout.tv_sec = timeout_sec;
timeout.tv_usec = 0;
if (setsockopt(client_fd, SOL_SOCKET, SO_RCVTIMEO,
&timeout, sizeof(timeout)) < 0) {
perror("setsockopt timeout failed");
}
// 接收回復
char buffer[1024];
struct sockaddr_in from_addr;
socklen_t addr_len = sizeof(from_addr);
ssize_t recv_len = recvfrom(client_fd, buffer, sizeof(buffer) - 1, 0,
(struct sockaddr*)&from_addr, &addr_len);
if (recv_len < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
std::cout << "Timeout: No response received" << std::endl;
} else {
perror("recvfrom failed");
}
return false;
}
buffer[recv_len] = '\0';
char from_ip[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &from_addr.sin_addr, from_ip, INET_ADDRSTRLEN);
std::cout << "Received from " << from_ip << ":"
<< ntohs(from_addr.sin_port)
<< " (" << recv_len << " bytes): "
<< buffer << std::endl;
return true;
}
// 廣播消息
bool broadcast(const std::string& message, int port) {
// 開啓廣播選項
int broadcast_enable = 1;
if (setsockopt(client_fd, SOL_SOCKET, SO_BROADCAST,
&broadcast_enable, sizeof(broadcast_enable)) < 0) {
perror("setsockopt broadcast failed");
return false;
}
// 設置廣播地址
struct sockaddr_in broadcast_addr;
memset(&broadcast_addr, 0, sizeof(broadcast_addr));
broadcast_addr.sin_family = AF_INET;
broadcast_addr.sin_port = htons(port);
broadcast_addr.sin_addr.s_addr = htonl(INADDR_BROADCAST); // 255.255.255.255
// 發送廣播
ssize_t sent = sendto(client_fd, message.c_str(), message.length(), 0,
(struct sockaddr*)&broadcast_addr, sizeof(broadcast_addr));
if (sent < 0) {
perror("broadcast sendto failed");
return false;
}
std::cout << "Broadcast " << sent << " bytes" << std::endl;
return true;
}
};
int main() {
UDPClient client("127.0.0.1", 8080);
// 發送測試消息
client.send_and_receive("Hello UDP Server!");
client.send_and_receive("Another message");
// 發送多條消息
for (int i = 0; i < 5; i++) {
std::string msg = "Message " + std::to_string(i);
client.send_and_receive(msg);
}
return 0;
}
組播
組播地址範圍:224.0.0.0 - 239.255.255.255
- 224.0.0.0 - 224.0.0.255:本地網絡控制塊
- 224.0.1.0 - 238.255.255.255:全球範圍組播地址
- 239.0.0.0 - 239.255.255.255:本地管理組播地址
#include <iostream>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
class MulticastServer {
private:
int sock_fd;
struct sockaddr_in multicast_addr;
public:
MulticastServer(const std::string& multicast_ip, int port) {
// 創建UDP socket
sock_fd = socket(AF_INET, SOCK_DGRAM, 0);
if (sock_fd < 0) {
perror("socket creation failed");
exit(EXIT_FAILURE);
}
// 設置組播地址
memset(&multicast_addr, 0, sizeof(multicast_addr));
multicast_addr.sin_family = AF_INET;
multicast_addr.sin_port = htons(port);
if (inet_pton(AF_INET, multicast_ip.c_str(),
&multicast_addr.sin_addr) <= 0) {
std::cerr << "Invalid multicast address" << std::endl;
close(sock_fd);
exit(EXIT_FAILURE);
}
// 設置TTL(生存時間)
unsigned char ttl = 1; // 只在本地網絡
if (setsockopt(sock_fd, IPPROTO_IP, IP_MULTICAST_TTL,
&ttl, sizeof(ttl)) < 0) {
perror("setsockopt TTL failed");
}
std::cout << "Multicast Server ready for "
<< multicast_ip << ":" << port << std::endl;
}
void send_message(const std::string& message) {
ssize_t sent = sendto(sock_fd, message.c_str(), message.length(), 0,
(struct sockaddr*)&multicast_addr,
sizeof(multicast_addr));
if (sent < 0) {
perror("multicast sendto failed");
} else {
std::cout << "Multicast sent " << sent << " bytes" << std::endl;
}
}
};
#include <iostream>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
class MulticastClient {
private:
int sock_fd;
struct sockaddr_in local_addr;
struct ip_mreq mreq;
public:
MulticastClient(const std::string& multicast_ip, int port) {
// 創建UDP socket
sock_fd = socket(AF_INET, SOCK_DGRAM, 0);
if (sock_fd < 0) {
perror("socket creation failed");
exit(EXIT_FAILURE);
}
// 允許地址重用
int reuse = 1;
if (setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR,
&reuse, sizeof(reuse)) < 0) {
perror("setsockopt reuseaddr failed");
}
// 綁定到任意地址和指定端口
memset(&local_addr, 0, sizeof(local_addr));
local_addr.sin_family = AF_INET;
local_addr.sin_addr.s_addr = htonl(INADDR_ANY);
local_addr.sin_port = htons(port);
if (bind(sock_fd, (struct sockaddr*)&local_addr,
sizeof(local_addr)) < 0) {
perror("bind failed");
close(sock_fd);
exit(EXIT_FAILURE);
}
// 加入多播組
mreq.imr_multiaddr.s_addr = inet_addr(multicast_ip.c_str());
mreq.imr_interface.s_addr = htonl(INADDR_ANY);
if (setsockopt(sock_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
&mreq, sizeof(mreq)) < 0) {
perror("setsockopt add membership failed");
close(sock_fd);
exit(EXIT_FAILURE);
}
std::cout << "Joined multicast group " << multicast_ip
<< " on port " << port << std::endl;
}
void receive_messages() {
char buffer[1024];
std::cout << "Listening for multicast messages..." << std::endl;
while (true) {
struct sockaddr_in from_addr;
socklen_t addr_len = sizeof(from_addr);
ssize_t recv_len = recvfrom(sock_fd, buffer, sizeof(buffer) - 1, 0,
(struct sockaddr*)&from_addr, &addr_len);
if (recv_len > 0) {
buffer[recv_len] = '\0';
char from_ip[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &from_addr.sin_addr,
from_ip, INET_ADDRSTRLEN);
std::cout << "Multicast from " << from_ip << ": "
<< buffer << std::endl;
}
}
}
};