c - Is it OK to share the same epoll file descriptor among threads? -
is safe share same epoll fd (not socket fd) among several threads? , if so, each thread have pass own events array epoll_wait(2)
or can share it?
for example
void *thread_func(void *thread_args) { // extract socket_fd, epoll_fd, &event, &events_array // thread_args // epoll_wait() using epoll_fd , events_array received main // threads using same epoll_fd , events array } void main( void ) { // create , bind socket // create events_fd // allocate memory events array // subscribe events epollin , epollet // pack socket_fd, epoll_fd, &events, &events_array // thread_args struct. // create multiple threads , pass thread_func , // same thread_args threads }
or better this:
void *thread_func(void *socket_fd) { // create events_fd // allocate memory events array // subscribe events epollin , epollet // epoll_wait using own epoll_fd , events_array // threads have separate epoll_fd // events populated on own array } void main(void) { // create , bind socket //create multiple threads , pass thread_func , socket_fd // threads }
is there example of how in c? examples saw run event loop in main()
, spawn new thread process request whenever event detected. want create specific number of threads @ start of program , have each thread running event loop , processing requests.
is safe share same epoll fd (not socket fd) among several threads.
yes, safe - epoll(7)
interface thread-safe - should careful when doing so, should @ least use epollet
(edge-triggered mode, opposed default level-triggered) avoid spurious wake-ups in other threads. because level-triggered mode wake every thread when new event available processing. since 1 thread dealing it, wake threads unnecessarily.
if shared epfd used each thread have pass own events array or shared events array epoll_wait()
yes, need separate events array on each thread, or else you'll have race conditions , nasty things can happen. example, might have thread still iterating through events returned epoll_wait(2)
, processing requests when thread calls epoll_wait(2)
same array , events overwritten @ same time other thread reading them. not good! absolutely need separate array each thread.
assuming have separate array each thread, either possibility - waiting on same epoll fd or have separate epoll fd each thread - work equally well, note semantics different. globally shared epoll fd, every thread waits request any client, because clients added same epoll fd. separate epoll fd each thread, each thread responsible subset of clients (those clients accepted thread).
this may irrelevant system, or may make huge difference. example, may happen thread unfortunate enough group of power users make heavy , frequent requests, leaving thread overworked, while other threads less aggressive clients idle. wouldn't unfair? on other hand, maybe you'd have threads dealing specific class of users, , in case maybe makes sense have different epoll fds on each thread. usual, need consider both possibilities, evaluate trade offs, think specific problem, , make decision.
below example using globally shared epoll fd. didn't plan of this, 1 thing led another, and, well, fun , think may started. it's echo server listens on port 3000 , has pool of 20 threads using epoll concurrently accept new clients , serve requests.
#include <stdio.h> #include <stdlib.h> #include <inttypes.h> #include <errno.h> #include <string.h> #include <pthread.h> #include <assert.h> #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> #include <sys/epoll.h> #define serverport 3000 #define serverbacklog 10 #define threadsno 20 #define events_buff_sz 256 static int serversock; static int epoll_fd; static pthread_t threads[threadsno]; int accept_new_client(void) { int clientsock; struct sockaddr_in addr; socklen_t addrlen = sizeof(addr); if ((clientsock = accept(serversock, (struct sockaddr *) &addr, &addrlen)) < 0) { return -1; } char ip_buff[inet_addrstrlen+1]; if (inet_ntop(af_inet, &addr.sin_addr, ip_buff, sizeof(ip_buff)) == null) { close(clientsock); return -1; } printf("*** [%p] client connected %s:%" priu16 "\n", (void *) pthread_self(), ip_buff, ntohs(addr.sin_port)); struct epoll_event epevent; epevent.events = epollin | epollet; epevent.data.fd = clientsock; if (epoll_ctl(epoll_fd, epoll_ctl_add, clientsock, &epevent) < 0) { perror("epoll_ctl(2) failed attempting add new client"); close(clientsock); return -1; } return 0; } int handle_request(int clientfd) { char readbuff[512]; struct sockaddr_in addr; socklen_t addrlen = sizeof(addr); ssize_t n; if ((n = recv(clientfd, readbuff, sizeof(readbuff)-1, 0)) < 0) { return -1; } if (n == 0) { return 0; } readbuff[n] = '\0'; if (getpeername(clientfd, (struct sockaddr *) &addr, &addrlen) < 0) { return -1; } char ip_buff[inet_addrstrlen+1]; if (inet_ntop(af_inet, &addr.sin_addr, ip_buff, sizeof(ip_buff)) == null) { return -1; } printf("*** [%p] [%s:%" priu16 "] -> server: %s", (void *) pthread_self(), ip_buff, ntohs(addr.sin_port), readbuff); ssize_t sent; if ((sent = send(clientfd, readbuff, n, 0)) < 0) { return -1; } readbuff[sent] = '\0'; printf("*** [%p] server -> [%s:%" priu16 "]: %s", (void *) pthread_self(), ip_buff, ntohs(addr.sin_port), readbuff); return 0; } void *worker_thr(void *args) { struct epoll_event *events = malloc(sizeof(*events)*events_buff_sz); if (events == null) { perror("malloc(3) failed when attempting allocate events buffer"); pthread_exit(null); } int events_cnt; while ((events_cnt = epoll_wait(epoll_fd, events, events_buff_sz, -1)) > 0) { int i; (i = 0; < events_cnt; i++) { assert(events[i].events & epollin); if (events[i].data.fd == serversock) { if (accept_new_client() == -1) { fprintf(stderr, "error accepting new client: %s\n", strerror(errno)); } } else { if (handle_request(events[i].data.fd) == -1) { fprintf(stderr, "error handling request: %s\n", strerror(errno)); } } } } if (events_cnt == 0) { fprintf(stderr, "epoll_wait(2) returned 0, timeout not specified...?"); } else { perror("epoll_wait(2) error"); } free(events); return null; } int main(void) { if ((serversock = socket(af_inet, sock_stream, ipproto_tcp)) < 0) { perror("socket(2) failed"); exit(exit_failure); } struct sockaddr_in serveraddr; serveraddr.sin_family = af_inet; serveraddr.sin_port = htons(serverport); serveraddr.sin_addr.s_addr = inaddr_any; if (bind(serversock, (const struct sockaddr *) &serveraddr, sizeof(serveraddr)) < 0) { perror("bind(2) failed"); exit(exit_failure); } if (listen(serversock, serverbacklog) < 0) { perror("listen(2) failed"); exit(exit_failure); } if ((epoll_fd = epoll_create(1)) < 0) { perror("epoll_create(2) failed"); exit(exit_failure); } struct epoll_event epevent; epevent.events = epollin | epollet; epevent.data.fd = serversock; if (epoll_ctl(epoll_fd, epoll_ctl_add, serversock, &epevent) < 0) { perror("epoll_ctl(2) failed on main server socket"); exit(exit_failure); } int i; (i = 0; < threadsno; i++) { if (pthread_create(&threads[i], null, worker_thr, null) < 0) { perror("pthread_create(3) failed"); exit(exit_failure); } } /* main thread contributes worker thread */ worker_thr(null); return 0; }
a couple of notes:
main()
should returnint
, notvoid
(as show in example)- always deal error return codes. common ignore them , when things break it's hard know happened.
- the code assumes no request larger 511 bytes (as seen buffer size in
handle_request()
). if request greater this, possible data left in socket long time, becauseepoll_wait(2)
not report until new event occurs on file descriptor (because we're usingepollet
). in worst case, client may never send new data, , wait reply forever. - the code prints thread identifier each request assumes
pthread_t
opaque pointer type. indeed,pthread_t
pointer type in linux, may integer type in other platforms, not portable. however, not of problem, since epoll linux specific, code not portable anyway. - it assumes no other requests same client arrive when thread still serving request client. if new request arrives in meantime , thread starts serving it, have race condition , client not receive echo messages in same order sent them (however,
write(2)
atomic, while replies may out of order, not intersperse).
Comments
Post a Comment