2012-03-06 31 views
7

पूर्ण प्रकटीकरण, मैं एक छात्र हूं और यह एक असाइनमेंट है। मैं लगभग एक सप्ताह से अधिक समय तक गैर-स्टॉप (पिछले समय व्यतीत करने के अलावा) पर काम कर रहा हूं और मैं यह नहीं समझ सकता कि मैं क्या गलत कर रहा हूं। मेरा सर्वर केवल कुछ "कुछ" रिकॉव किए जाने के बाद epoll_wait पर लटकता रहता है ("कुछ" क्योंकि मुझे कई जीबी डेटा की उम्मीद है और मुझे केवल कुछ दर्जन एमबी मिल रहा है)। मुझे नहीं लगता कि मेरा क्लाइंट कैसे काम करता है, इसमें कुछ भी गलत है, क्योंकि यह मेरे चयन और बहु-थ्रेडेड सर्वरों के साथ ठीक काम कर रहा है। कृपया एक त्वरित नज़र डालें और मुझे बताएं कि क्या आपकी समस्या का कारण होने पर आपको कुछ भी बाहर निकलता है।कुछ खोना या क्या मैं एपोल को समझ नहीं पा रहा हूं?

क्लाइंट/सर्वर का मूल विचार सर्वर (10k +) के साथ सर्वर पर बमबारी करना और कई बार डेटा की एक निर्दिष्ट राशि स्थानांतरित करना है। इस एपोल सर्वर को 2000 के साथ परेशानी हो रही है, जब मेरे बहु-थ्रेडेड सर्वर ने 10k लक्ष्य के शर्मीले को संभाला।

मैं आपसे मेरे लिए अपना काम करने के लिए नहीं कह रहा हूं (मैं लगभग पूरा कर चुका हूं) मुझे बस यह पता लगाने में मदद की ज़रूरत है कि मैं यहां क्या कर रहा हूं। अग्रिम धन्यवाद किसी भी मदद के लिए आप की पेशकश कर सकते :)

1 #include "common.h" 
    2 #include <sys/epoll.h> 
    3 
    4 uint16_t ready[MAX_CONNS]; 
    5 uint16_t next; 
    6 pthread_mutex_t mutex; 
    7 
    8 void *worker_thread(void *param) { 
    9  int my_sock, pos; 
10  struct conn_params *conn_ps = (struct conn_params *)param; 
11 
12  while (1) { 
13   pthread_mutex_lock(&mutex); 
14 
15   while (1) { 
16    if (next == MAX_CONNS) { 
17     printf("balls\n"); 
18     next = 4; 
19    } 
20 
21    if (ready[next] != 0) { 
22     pos = next; 
23     my_sock = ready[pos]; 
24     next++; 
25     break; 
26    } 
27   } 
28 
29   pthread_mutex_unlock(&mutex); 
30   /* handle recv/send */ 
31   if (echo_recv(&conn_ps[my_sock], MULTIPLE) == 0) { /* closed conn */ 
32    shutdown(my_sock, SHUT_RDWR); 
33    close(my_sock); 
34    serv_stats.active_connections--; 
35   } 
36   ready[pos] = 0; 
37 /*  print_conn_stats(&conn_ps[my_sock]);*/ 
38  } 
39 } 
40 
41 void *add_client_thread(void *param) { 
42  struct epoll_accept_thread *eat = (struct epoll_accept_thread *)param; 
43  struct sockaddr client; 
44  struct epoll_event event; 
45  socklen_t client_len; 
46  int new_sock, ret; 
47  char hostbuf[NI_MAXHOST], servbuf[NI_MAXSERV]; 
48 
49  bzero(&client, sizeof(struct sockaddr)); 
50  event.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET; 
51 
52  while ((new_sock = accept(eat->listen_sock, &client, &client_len)) != -1) { 
53   set_nonblock(new_sock); 
54   event.data.fd = new_sock; 
55   if (epoll_ctl(eat->fd_epoll, EPOLL_CTL_ADD, new_sock, &event) == -1) { 
56    perror("epoll_ctl"); 
57    printf("%u\n", new_sock); 
58    continue; 
59   } 
60 
61   bzero(&(eat->conn_ps[new_sock]), sizeof(struct conn_params)); 
62   eat->conn_ps[new_sock].sock = new_sock; 
63   if ((ret = getnameinfo(&client, client_len, hostbuf, NI_MAXHOST, servbuf, NI_MAXSERV, NI_NUMERICHOST)) != 0) { 
64    gai_strerror(ret); 
65   } 
66 
67   update_server_stats(); 
68   printf("added client\n"); 
69  } 
70 
71  if (errno != EAGAIN) { 
72   perror("Couldn't accept connection"); 
73  } 
74 
75  pthread_exit(NULL); 
76 } 
77 
78 int main(int argc, char **argv) { 
79  char opt, *port = NULL; 
80  struct addrinfo hints, *results, *p; 
81  int listen_sock = new_tcp_sock(), nfds, i, ret; 
82  int fd_epoll, next_avail = 4; 
83  struct conn_params conn_ps[MAX_CONNS]; 
84  struct epoll_event evs[MAX_CONNS]; 
85  struct epoll_event event; 
86  struct epoll_accept_thread eat; 
87  pthread_t thread; 
88 
89  while ((opt = getopt(argc, argv, ":l:")) != -1) { 
90   switch (opt) { 
91    case 'l': /* port to listen on */ 
92     port = optarg; 
93     break; 
94    case '?': /* unknown option */ 
95     fprintf(stderr, "The option -%c is not supported.\n", opt); 
96     exit(1); 
97    case ':': /* required arg not supplied for option */ 
98     fprintf(stderr, "The option -%c requires an argument.\n", opt); 
99     exit(1); 
100   } 
101  } /* command line arg processing done */ 
102 
103  if (port == NULL) { 
104   fprintf(stderr, "You must provide the port to listen on (-l).\n"); 
105   exit(1); 
106  } 
107 
108  signal(SIGINT, handle_interrupt); 
109 
110  bzero(&hints, sizeof(struct addrinfo)); 
111  hints.ai_family = AF_INET; 
112  hints.ai_socktype = SOCK_STREAM; 
113  hints.ai_flags = AI_PASSIVE; 
114 
115  set_nonblock(listen_sock); 
116  set_reuseaddr(listen_sock); 
117 
118  if ((ret = getaddrinfo(NULL, port, &hints, &results) != 0)) { 
119   gai_strerror(ret); 
120   exit(1); 
121  } 
122 
123  for (p = results; p != NULL; p = p->ai_next) { /* attempt to connect to the host */ 
124   if (bind(listen_sock, p->ai_addr, p->ai_addrlen) == -1) { 
125    perror("Bind failed"); 
126   } else { 
127    break; 
128   } 
129  } 
130 
131  if (p == NULL) { /* we were unable to connect to anything */ 
132   fprintf(stderr, "Unable to bind to the specified port. Exiting...\n"); 
133   exit(1); 
134  } 
135 
136  freeaddrinfo(results); 
137 
138  if (listen(listen_sock, 5) == -1) { 
139   perror("Listen failed"); 
140   exit(1); 
141  } 
142 
143  /* everything is set up. method-specific code goes below */ 
144 
145  start_server_stats(); 
146  next = 4; 
147 
148  if ((fd_epoll = epoll_create(MAX_CONNS)) == -1) { 
149   perror("epoll_create"); 
150   exit(1); 
151  } 
152 
153  event.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET; 
154  event.data.fd = listen_sock; 
155  if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, listen_sock, &event) == -1) { 
156   perror("epoll_ctl"); 
157   exit(1); 
158  } 
159 
160  signal(SIGPIPE, SIG_IGN); 
161  bzero(ready, MAX_CONNS * sizeof(uint16_t)); 
162  pthread_mutex_init(&mutex, NULL); 
163 
164  for (i = 0; i < 5; i++) { /* five workers should be enough */ 
165   pthread_create(&thread, NULL, worker_thread, (void *)&conn_ps); 
166  } 
167 
168  while (1) { 
169   if ((nfds = epoll_wait(fd_epoll, evs, MAX_CONNS, -1)) > 0 && errno == EINTR) { 
170    continue; 
171   } 
172   for (i = 0; i < nfds; i++) { /* loop through all FDs */ 
173    if (evs[i].events & (EPOLLERR | EPOLLHUP)) { /* if there's an error or a hangup */ 
174     /*fprintf(stderr, "Error! Danger, Will Robinson! Danger!");*/ 
175     close(evs[i].data.fd); 
176     continue; 
177    } else if (evs[i].data.fd == listen_sock) { /* we have a new connection coming in */ 
178     eat.listen_sock = listen_sock; 
179     eat.fd_epoll = fd_epoll; 
180     eat.conn_ps = conn_ps; 
181     pthread_create(&thread, NULL, add_client_thread, (void *)&eat); 
182    } else { /* inbound data */ 
183     while (ready[next_avail] != 0) { 
184      next_avail++; 
185 
186      if (next_avail == MAX_CONNS) { 
187       next_avail = 4; 
188      } 
189     } 
190     ready[next_avail] = evs[i].data.fd; 
191    } /* end inbound data */ 
192   } /* end iterating through FDs */ 
193  } /* end epoll_wait loop */ 
194 
195  perror("epoll_wait"); 
196 
197  return 0; 
198 } 

और यहाँ echo_recv समारोह है, के रूप में मैं किसी के साथ-साथ कि देखना चाहता हूँ जा रहा है मान:

14 int echo_recv(struct conn_params *conn_p, int single) { 
15  char client_buf[CLIENT_BUF_SIZE], buffer[BUF_SIZE]; 
16  int nread, nwrite, nsent = 0, i; 
17 
18  while ((nread = recv(conn_p->sock, client_buf, CLIENT_BUF_SIZE, 0)) > 0) { 
19   /* create buffer of MULTIPLIER(int) times what was received */ 
20   for (i = 0; i < MULTIPLIER && nread*i < BUF_SIZE; i++) { 
21    memcpy(buffer+(nread*i), client_buf, nread); 
22   } 
23 
24   /* send the created buffer */ 
25   while ((nwrite = send(conn_p->sock, buffer+nsent, (nread*MULTIPLIER)-nsent, 0)) > 0) { 
26    nsent += nwrite; 
27   } 
28 
29   conn_p->total_recvd += nread; /* update our stats for this conn */ 
30   conn_p->total_sent += nsent; /* update our status for this conn */ 
31   serv_stats.total_recvd += nread; 
32   serv_stats.total_sent += nsent; 
33   nsent = 0; 
34 
35   if (single) { 
36    return 1; 
37   } 
38  } 
39 
40  if (nread == -1 && (errno & EAGAIN)) { 
41   return 1; 
42  } 
43 
44  if (nread == -1) { 
45   perror("wtf?"); 
46  } 
47 
48  shutdown(conn_p->sock, SHUT_RDWR); 
49  close(conn_p->sock); 
50 
51  return 0; /* recv failed */ 
52 } 
+2

दो बिंदु: पहला यह है कि 'errno' एक बिटफील्ड नहीं है, इसलिए' errno & EAGAIN' सही नहीं है, 'errno == EAGAIN' का उपयोग करें। दूसरा यह है कि आप सॉकेट डिस्क्रिप्टर के साथ सरणी अनुक्रमणित कर रहे हैं, और वे किसी भी संख्या में हो सकते हैं जो 'int 'में फिट बैठता है, क्या आप वाकई सरणी के आकार से कम हैं? –

+0

@ जोआचिम पिलेबोर्ग: बस जिज्ञासा, क्या आप एक ऐसे सिस्टम का नाम दे सकते हैं जहां एफडी-एस सबसे कम उपलब्ध संख्याओं को असाइन नहीं किया गया है? –

+0

@ करोलो हॉर्वथ नहीं, लेकिन जैसा कि मुझे पता है कि वहां कोई गारंटी नहीं है कि यह होना चाहिए, या संख्याओं को लगातार होना चाहिए। साथ ही, जब तक आप मानक इन/आउट/गलती बंद नहीं करते हैं, तब तक आपको "मान" 0 से 2 के साथ सॉकेट नहीं मिलेगा। –

उत्तर

2

यहाँ कुछ विचार कर रहे हैं:

  1. आपको वास्तव में यह देखना चाहिए कि साझा सरणी ready का उपयोग कैसे किया जाता है। अपने कार्यकर्ता धागे में, आप इसे पढ़ने के लिए एक म्यूटेक्स प्राप्त करते हैं, हालांकि कई बार जब आप लॉक के के बाहर संशोधित करते हैं, इसके अतिरिक्त, आप अपने लॉकिंग लूप (मुख्य थ्रेड) में इस लॉक को प्राप्त नहीं करते हैं, तो आप बस लिखते हैं सरणी - यह सादा गलत है।
  2. आप सभी कार्यकर्ता सूत्र, कैसे आप उन्हें मारने का प्रस्ताव करते हैं के लिए धागा आईडी नहीं रखते - आप करने के लिए एक अलग थ्रेड बनाने
  3. (या उन्हें पूर्ण होने की प्रतीक्षा सामान्य रूप से आप pthread_join कॉल करने की आवश्यकता होगी) कनेक्शन स्वीकार करें, लेकिन फिर आप इस धागे में साझा epoll_accept_thread संरचना को संशोधित करें - और इसके चारों ओर कोई लॉकिंग नहीं है।

मैं पहले सभी सिंक्रनाइज़ेशन मुद्दों को ठीक कर दूंगा, और फिर यह अन्य मुद्दों को प्रकट कर सकता है।

+0

यदि प्रक्रिया हमेशा के लिए चलती है, तो कार्यकर्ता धागे को मारना आवश्यक नहीं हो सकता है। मैंने अपने जैसे कुछ कार्यक्रम लिखे हैं, जहां धागे कभी बाहर नहीं निकलते हैं, जब तक कि पूर्ण आतंक में न हो क्योंकि कुछ गलत हो गया। मैंने पूरी प्रक्रिया को मार डाला था, न केवल एक धागा। –

+0

@ एक्स-इस्टेंस, यह मामला हो सकता है - लेकिन यदि आप यह सुनिश्चित करना चाहते हैं कि आप सभी थ्रेडों को बंद करने से पहले अपने कार्यों को पूरी तरह से पूरा करने के लिए प्रतीक्षा करें, तो आगे बढ़ने का तरीका है। – Nim

+0

अभी, मैं हमेशा प्रक्रिया को हमेशा के लिए चल रहा हूं (या इसे फिर से परीक्षण करने की आवश्यकता में मार डालो), इसलिए मुझे नहीं लगता था कि थ्रेड आईडी का ट्रैक रखना महत्वपूर्ण था। मुख्य धागे में 'तैयार' कैसे पहुंचाया जा रहा है, मैं उस स्थान को खोजने की जांच कर रहा हूं जो कब्जा नहीं किया गया है, और यह एकमात्र धागा है जो उन स्लॉट को भर रहा है, इसलिए मैंने एक म्यूटेक्स लगाया जिसकी आवश्यकता नहीं थी । यद्यपि कार्यकर्ता धागे में mutex फिक्स्ड। साथ ही, मैं मुख्य रूप से 'epoll_accept_thread' में अतिरिक्त असाइनमेंट हटा देता हूं। उन परिवर्तनों ने कोई अलग परिणाम नहीं दिया है :( –

1

मैं ऊपर एक टिप्पणी में इस पोस्ट करने के लिए चाहता था, लेकिन यह बहुत लंबे समय तक की तुलना में यह अनुमति होगी मिल गया:

एक सरल epoll आधारित सर्वर है कि है पूरी तरह से अतुल्यकालिक (चरणों)

  1. सेट अप को लागू करने का प्रयास करें अपने सॉकेट स्वीकार करें ...
  2. एपोल में जोड़ें।
  3. प्रतीक्षा पाश दर्ज करें:
    1. चेक कि क्या घटना सॉकेट स्वीकार पर है, या सामान्य सॉकेट
    2. तो सॉकेट स्वीकार करते हैं, कनेक्शन स्वीकार, epoll को जोड़ने के लिए, तो 3
    3. लिए वापस जाने के पढ़ने के लिए सामान्य सॉकेट पर घटना , एक्स बाइट्स को पढ़ें, बफर लिखने के लिए सहेजें, और सॉकेट के लिए एपॉल पर लिखने की घटना को सक्षम करें, 3
    4. पर जाएं, यदि लिखने के लिए सामान्य सॉकेट पर ईवेंट, बफर से नेटवर्क तक बाइट लिखें, लिखें बफर खाली होने पर लिखें ईवेंट अक्षम करें, 3 पर वापस जाओ।
    5. एक त्रुटि होती है epoll
  4. कोई चौथा कदम है ... कार्यक्रम होना चाहिए पाश हमेशा के लिए से सॉकेट को हटा दें।

यह किसी भी जटिलता को हटा देना चाहिए जो आपने थ्रेडिंग से जोड़ा है जो मुद्दों का कारण बन सकता है। यह select() के रूप में उसी प्रकार के डोमेन में वापस आ जाता है सिवाय इसके कि यह आमतौर पर बहुत तेज़ होता है। किसी ईवेंट लाइब्रेरी का उपयोग करने का पूरा विचार यह जानना है कि जब आप गैर-अवरुद्ध करने के लिए सॉकेट सेट करने के बजाय पढ़/लिख सकते हैं, और इसे पढ़ने/लिखने की कोशिश कर रहे हैं।

तुम भी write() जो कारण SIGPIPE प्राप्त करने के लिए विफल हो सकता है से वापसी मान की जाँच करने लगते हैं कभी नहीं (मुझे पता है तुम संकेत नजरअंदाज कर दिया है, लेकिन आप अभी भी एक EAGAIN/EINTR errno मिल जाएगा)।

दूसरी बात मैं देख रहा हूँ कि आप अपने धागा कि सॉकेट तैयार होने के लिए इंतजार कर रहा है के अंदर एक व्यस्त पाश कर रहे हैं। जब आप इस मामले में select() या epoll का उपयोग करते हैं, तो यह है कि आपको सूचित किया गया है कि कुछ नया है, इसलिए आपको व्यस्त लूप नहीं करना है ...

मुझे बिल्कुल यकीन नहीं है कि आप क्या कर रहे हैं पूरा करें, लेकिन आपका कोड बेहद अक्षम है।

तुम कर सकते हो क्या, बस एक साधारण अतुल्यकालिक उदाहरण चरणों का उपयोग कर ऊपर कई कार्यकर्ता धागे कि सभी listener/accept सॉकेट पर read घटनाओं के लिए सुनने (epoll प्रयोग करके) और धागे से प्रत्येक विभिन्न संभाल है शुरू कर रहा है लागू करने के बाद कनेक्शन (अभी भी मैंने जो पोस्ट किया है उसका उपयोग कर)।

+0

हाँ, मैं जानता हूँ कि मेरे कोड बुरी तरह अक्षम :( ईमानदारी से, मैं सिर्फ इससे पहले कि मैं पुनर्रचना। मैं इस एक के साथ थोड़ी देर के लिए मेज पर मेरे सिर को कोसने किया गया है शुरू कर दिया है कि यह काम करना चाहता था है। –