#include <iostream>
#include <sys/socket.h>
#include <sys/types.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdarg.h>
#include <unistd.h>
#include <map>
#include <thread>
#include <vector>
#include <mutex>
#include <set>
#include <cstring>
#include <sys/epoll.h>
#define EPOLL_CREATE_NUM 100
#define MAX_WAIT_EVENTS 10
std::vector<int> fdVec;
std::mutex gol_mutex;
void LogPrint(const char* format, ...)
{
va_list args;
va_start(args, format);
char content[2048];
memset(content, 0, sizeof(content));
int ret = vsnprintf(content, 2048 - 1, format, args);
if (ret < 0)
{
return;
}
printf(content);
printf("\n");
va_end(args);
}
class NetThread
{
public:
NetThread()
{
m_epollFd = epoll_create(EPOLL_CREATE_NUM);
}
~NetThread()
{
m_thread.join();
}
void SetThread(std::thread& t)
{
m_thread.swap(t);
}
void SetSockRead(int fd)
{
if (fd == -1)
{
return;
}
m_mutex.lock();
struct epoll_event epollEvent;
epollEvent.data.fd = fd;
epollEvent.events = EPOLLIN;
epoll_ctl(m_epollFd, EPOLL_CTL_ADD, fd, &epollEvent);
m_mutex.unlock();
}
void run()
{
while (true)
{
int eventNum = epoll_wait(m_epollFd, (struct epoll_event*)&m_event, MAX_WAIT_EVENTS, 0);
if (eventNum == -1)
{
continue;
}
for (int i = 0; i < eventNum; ++i)
{
if (m_event[i].events & EPOLLIN)
{
ReadData(m_event[i].data.fd);
}
}
//检测可删除列表,可用于心跳超时检测和发送失败检测
if (delSet.size() != 0)
{
for (auto iter = fdVec.begin(); iter != fdVec.end(); )
{
if (delSet.find(*iter) != delSet.end())
{
struct epoll_event epollEvent;
epollEvent.data.fd = *iter;
epollEvent.events = EPOLLIN;
epoll_ctl(m_epollFd, EPOLL_CTL_DEL, *iter, &epollEvent);
close(*iter);
iter = fdVec.erase(iter);
continue;
}
++iter;
}
delSet.clear();
}
}
}
private:
void ReadData(int fd)
{
char buff[1024];
memset(buff, 0, sizeof(buff) - 1);
int ret = recv(fd, buff, sizeof(buff), 0);
if (ret != -1)
{
//接收缓冲为空,表示已断开连接
if (buff[0] == '\0')
{
LogPrint("fd(%d) 已断开连接...", fd);
delSet.insert(fd);
return;
}
LogPrint("fd = %d send MSG=%s", fd, buff);
gol_mutex.lock();
for (auto iter = fdVec.begin(); iter != fdVec.end(); ++iter)
{
ret = send(*iter, buff, sizeof(buff), 0);
if (ret == -1)
{
LogPrint("fd(%d) 已断开连接...", *iter);
delSet.insert(*iter);
}
}
gol_mutex.unlock();
}
}
private:
//epoll句柄
int m_epollFd;
//线程句柄
std::thread m_thread;
//线程锁
std::mutex m_mutex;
//可删除的socket列表
std::set<int> delSet;
//同时获取最大的事件数
struct epoll_event m_event[10];
};
int main()
{
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr("0.0.0.0");
addr.sin_port = htons(10001);
int main_fd = socket(PF_INET, SOCK_STREAM, 0);
if (main_fd == -1)
{
LogPrint("create socket fail!");
return 1;
}
if (bind(main_fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1)
{
LogPrint("bind fail!");
return 1;
}
listen(main_fd, 10);
NetThread netThread;
std::thread t(&NetThread::run, &netThread);
netThread.SetThread(t);
LogPrint("服务器已启动!");
while (true)
{
int new_sock = accept(main_fd, NULL, NULL);
if (new_sock != -1)
{
char buff[1024] = "服务器已连接成功!";
send(new_sock, buff, sizeof(buff), 0);
LogPrint("fd = %d 已连接服务器...", new_sock);
gol_mutex.lock();
fdVec.push_back(new_sock);
gol_mutex.unlock();
netThread.SetSockRead(new_sock);
}
}
}
网友评论