Line data Source code
1 : /* Copyright (C) 2013 The PARI group.
2 :
3 : This file is part of the PARI/GP package.
4 :
5 : PARI/GP is free software; you can redistribute it and/or modify it under the
6 : terms of the GNU General Public License as published by the Free Software
7 : Foundation; either version 2 of the License, or (at your option) any later
8 : version. It is distributed in the hope that it will be useful, but WITHOUT
9 : ANY WARRANTY WHATSOEVER.
10 :
11 : Check the License for details. You should have received a copy of it, along
12 : with the package; see the file 'COPYING'. If not, write to the Free Software
13 : Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
14 : #include <pthread.h>
15 : #include "pari.h"
16 : #include "paripriv.h"
17 : #include "mt.h"
18 : #if defined(_WIN32)
19 : # include "../systems/mingw/mingw.h"
20 : #endif
21 :
22 : #define DEBUGLEVEL DEBUGLEVEL_mt
23 :
24 : struct mt_queue
25 : {
26 : long no;
27 : pari_sp avma;
28 : struct pari_mainstack *mainstack;
29 : GEN input, output;
30 : GEN worker;
31 : long workid;
32 : pthread_cond_t cond;
33 : pthread_mutex_t mut;
34 : pthread_cond_t *pcond;
35 : pthread_mutex_t *pmut;
36 : };
37 :
38 : struct mt_pstate
39 : {
40 : pthread_t *th;
41 : struct pari_thread *pth;
42 : struct mt_queue *mq;
43 : long n, nbint, last;
44 : long pending;
45 : pthread_cond_t pcond;
46 : pthread_mutex_t pmut;
47 : };
48 :
49 : static THREAD long mt_thread_no = -1;
50 : static struct mt_pstate *pari_mt;
51 :
52 : #define LOCK(x) pthread_mutex_lock(x); do
53 : #define UNLOCK(x) while(0); pthread_mutex_unlock(x)
54 :
55 : void
56 193435682 : mt_sigint_block(void)
57 : {
58 193435682 : if (mt_thread_no>=0)
59 27851229 : pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,NULL);
60 195114326 : }
61 :
62 : void
63 193897906 : mt_sigint_unblock(void)
64 : {
65 193897906 : if (mt_thread_no>=0)
66 28287307 : pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,NULL);
67 195119281 : }
68 :
69 : void
70 1724 : mt_err_recover(long er)
71 : {
72 1724 : if (mt_thread_no>=0)
73 : {
74 10 : struct mt_pstate *mt = pari_mt;
75 10 : struct mt_queue *mq = mt->mq+mt_thread_no;
76 10 : GEN err = pari_err_last();
77 10 : err = err_get_num(err)==e_STACK ? err_e_STACK: bin_copy(copy_bin(err));
78 10 : LOCK(mq->pmut)
79 : {
80 10 : mq->output = err;
81 10 : pthread_cond_signal(mq->pcond);
82 10 : } UNLOCK(mq->pmut);
83 10 : pthread_exit((void*)1);
84 : }
85 1714 : else mtsingle_err_recover(er);
86 1714 : }
87 :
88 : void
89 0 : mt_break_recover(void)
90 : {
91 0 : if (mt_thread_no<0) mtsingle_err_recover(0);
92 0 : }
93 :
94 : void
95 0 : mt_sigint(void)
96 : {
97 0 : if (pari_mt) pthread_cond_broadcast(&pari_mt->pcond);
98 0 : }
99 :
100 : int
101 215533 : mt_is_parallel(void)
102 : {
103 215533 : return !!pari_mt;
104 : }
105 :
106 : int
107 30868027 : mt_is_thread(void)
108 : {
109 30868027 : return mt_thread_no>=0 ? 1: mtsingle_is_thread();
110 : }
111 :
112 : long
113 357291 : mt_nbthreads(void)
114 : {
115 357291 : return pari_mt ? 1: pari_mt_nbthreads;
116 : }
117 :
118 : void
119 343926 : mt_thread_init(void) { mt_thread_no = 0; }
120 :
121 : void
122 13 : mt_export_add(const char *str, GEN val)
123 : {
124 13 : if (pari_mt)
125 0 : pari_err(e_MISC,"export() not allowed during parallel sections");
126 13 : export_add(str, val);
127 13 : }
128 :
129 : void
130 8 : mt_export_del(const char *str)
131 : {
132 8 : if (pari_mt)
133 0 : pari_err(e_MISC,"unexport() not allowed during parallel sections");
134 8 : export_del(str);
135 8 : }
136 :
137 1 : void mt_broadcast(GEN code) {(void) code;}
138 :
139 254 : void pari_mt_init(void)
140 : {
141 254 : pari_mt = NULL;
142 : #ifdef _SC_NPROCESSORS_CONF
143 254 : if (!pari_mt_nbthreads) pari_mt_nbthreads = sysconf(_SC_NPROCESSORS_CONF);
144 : #elif defined(_WIN32)
145 : if (!pari_mt_nbthreads) pari_mt_nbthreads = win32_nbthreads();
146 : #else
147 : pari_mt_nbthreads = 1;
148 : #endif
149 254 : }
150 :
151 254 : void pari_mt_close(void) { }
152 :
153 : static void
154 344849 : mt_queue_cleanup(void *arg)
155 : {
156 : (void) arg;
157 344849 : pari_thread_close();
158 343397 : }
159 :
160 : static void
161 344533 : mt_queue_unlock(void *arg)
162 344533 : { pthread_mutex_unlock((pthread_mutex_t*) arg); }
163 :
164 : static void*
165 345783 : mt_queue_run(void *arg)
166 : {
167 345783 : GEN args = pari_thread_start((struct pari_thread*) arg);
168 343682 : pari_sp av = avma;
169 343682 : struct mt_queue *mq = (struct mt_queue *) args;
170 343682 : mt_thread_no = mq->no;
171 343682 : pthread_cleanup_push(mt_queue_cleanup,NULL);
172 343785 : LOCK(mq->pmut)
173 : {
174 345957 : mq->mainstack = pari_mainstack;
175 345957 : mq->avma = av;
176 345957 : pthread_cond_signal(mq->pcond);
177 345957 : } UNLOCK(mq->pmut);
178 : for(;;)
179 431660 : {
180 : GEN work, done;
181 777572 : LOCK(&mq->mut)
182 : {
183 777518 : pthread_cleanup_push(mt_queue_unlock, &mq->mut);
184 1184965 : while(!mq->input)
185 753615 : pthread_cond_wait(&mq->cond, &mq->mut);
186 431350 : pthread_cleanup_pop(0);
187 431409 : } UNLOCK(&mq->mut);
188 431497 : pari_mainstack = mq->mainstack;
189 431497 : set_avma(mq->avma);
190 430969 : work = mq->input;
191 430969 : pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,NULL);
192 431614 : done = closure_callgenvec(mq->worker,work);
193 430282 : pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,NULL);
194 431449 : LOCK(mq->pmut)
195 : {
196 431688 : mq->mainstack = pari_mainstack;
197 431688 : mq->avma = av;
198 431688 : mq->input = NULL;
199 431688 : mq->output = done;
200 431688 : pthread_cond_signal(mq->pcond);
201 431688 : } UNLOCK(mq->pmut);
202 : }
203 : pthread_cleanup_pop(1);
204 : #ifdef __GNUC__
205 : return NULL; /* LCOV_EXCL_LINE */
206 : #endif
207 : }
208 :
209 : static long
210 592069 : mt_queue_check(struct mt_pstate *mt)
211 : {
212 : long i;
213 6937361 : for(i=0; i<mt->n; i++)
214 : {
215 6776983 : struct mt_queue *mq = mt->mq+i;
216 6776983 : if (mq->output) return i;
217 : }
218 160378 : return -1;
219 : }
220 :
221 : static GEN
222 721801 : mtpthread_queue_get(struct mt_state *junk, long *workid, long *pending)
223 : {
224 721801 : struct mt_pstate *mt = pari_mt;
225 : struct mt_queue *mq;
226 721801 : GEN done = NULL;
227 : long last;
228 : (void) junk;
229 721801 : if (mt->nbint<mt->n)
230 : {
231 290108 : mt->last = mt->nbint;
232 290108 : *pending = mt->pending;
233 290108 : return NULL;
234 : }
235 431693 : BLOCK_SIGINT_START
236 431693 : LOCK(&mt->pmut)
237 : {
238 592069 : while ((last = mt_queue_check(mt)) < 0)
239 : {
240 160378 : pthread_cond_wait(&mt->pcond, &mt->pmut);
241 160378 : if (PARI_SIGINT_pending)
242 : {
243 2 : int sig = PARI_SIGINT_pending;
244 2 : PARI_SIGINT_pending = 0;
245 2 : pthread_mutex_unlock(&mt->pmut);
246 2 : PARI_SIGINT_block = 0;
247 2 : raise(sig);
248 0 : PARI_SIGINT_block = 1;
249 0 : pthread_mutex_lock(&mt->pmut);
250 : }
251 : }
252 431691 : } UNLOCK(&mt->pmut);
253 431691 : BLOCK_SIGINT_END
254 431691 : mq = mt->mq+last;
255 431691 : done = gcopy(mq->output);
256 431691 : mq->output = NULL;
257 431691 : if (workid) *workid = mq->workid;
258 431691 : if (typ(done) == t_ERROR)
259 : {
260 5 : if (err_get_num(done)==e_STACK)
261 0 : pari_err(e_STACKTHREAD);
262 : else
263 5 : pari_err(0,done);
264 : }
265 431686 : mt->last = last;
266 431686 : mt->pending--;
267 431686 : *pending = mt->pending;
268 431686 : return done;
269 : }
270 :
271 : static void
272 721801 : mtpthread_queue_submit(struct mt_state *junk, long workid, GEN work)
273 : {
274 721801 : struct mt_pstate *mt = pari_mt;
275 721801 : struct mt_queue *mq = mt->mq+mt->last;
276 : (void) junk;
277 721801 : if (!work) { mt->nbint=mt->n; return; }
278 431779 : BLOCK_SIGINT_START
279 431779 : if (mt->nbint<mt->n)
280 : {
281 345260 : mt->nbint++;
282 345260 : LOCK(mq->pmut)
283 : {
284 420852 : while(!mq->avma)
285 75592 : pthread_cond_wait(mq->pcond, mq->pmut);
286 345260 : } UNLOCK(mq->pmut);
287 : }
288 431779 : LOCK(&mq->mut)
289 : {
290 431779 : mq->output = NULL;
291 431779 : mq->workid = workid;
292 431779 : BLOCK_SIGINT_START
293 : {
294 431779 : pari_sp av = avma;
295 431779 : struct pari_mainstack *st = pari_mainstack;
296 431779 : pari_mainstack = mq->mainstack;
297 431779 : set_avma(mq->avma);
298 431779 : mq->input = gcopy(work);
299 431779 : mq->avma = avma;
300 431779 : mq->mainstack = pari_mainstack;
301 431779 : pari_mainstack = st;
302 431779 : set_avma(av);
303 : }
304 431779 : BLOCK_SIGINT_END
305 431779 : pthread_cond_signal(&mq->cond);
306 431779 : } UNLOCK(&mq->mut);
307 431779 : mt->pending++;
308 431779 : BLOCK_SIGINT_END
309 : }
310 :
311 : void
312 55549 : mt_queue_reset(void)
313 : {
314 55549 : struct mt_pstate *mt = pari_mt;
315 : long i;
316 55549 : BLOCK_SIGINT_START
317 401506 : for (i=0; i<mt->n; i++)
318 345957 : pthread_cancel(mt->th[i]);
319 401506 : for (i=0; i<mt->n; i++)
320 345957 : pthread_join(mt->th[i],NULL);
321 55549 : pari_mt = NULL;
322 55549 : BLOCK_SIGINT_END
323 55549 : if (DEBUGLEVEL) pari_warn(warner,"stopping %ld threads", mt->n);
324 401506 : for (i=0;i<mt->n;i++)
325 : {
326 345957 : struct mt_queue *mq = mt->mq+i;
327 345957 : pthread_cond_destroy(&mq->cond);
328 345957 : pthread_mutex_destroy(&mq->mut);
329 345957 : pari_thread_free(&mt->pth[i]);
330 : }
331 55549 : pari_free(mt->mq);
332 55549 : pari_free(mt->pth);
333 55549 : pari_free(mt->th);
334 55549 : pari_free(mt);
335 55549 : }
336 :
337 : static long
338 55549 : closure_has_clone(GEN fun)
339 : {
340 55549 : if (isclone(fun)) return 1;
341 55543 : if (lg(fun) >= 8)
342 : {
343 54984 : GEN f = closure_get_frame(fun);
344 54984 : long i, l = lg(f);
345 199983 : for (i = 1; i < l; i++)
346 146490 : if (isclone(gel(f,i))) return 1;
347 : }
348 54052 : return 0;
349 : }
350 :
351 : void
352 127143 : mt_queue_start_lim(struct pari_mt *pt, GEN worker, long lim)
353 : {
354 127143 : if (lim==0) lim = pari_mt_nbthreads;
355 127124 : else lim = minss(pari_mt_nbthreads, lim);
356 127144 : if (mt_thread_no >= 0)
357 41757 : mtsequential_queue_start(pt, worker);
358 85387 : else if (pari_mt || lim <= 1)
359 29838 : mtsingle_queue_start(pt, worker);
360 : else
361 : {
362 : struct mt_pstate *mt =
363 55549 : (struct mt_pstate*) pari_malloc(sizeof(struct mt_pstate));
364 55549 : long mtparisize = GP_DATA->threadsize? GP_DATA->threadsize: pari_mainstack->rsize;
365 55549 : long mtparisizemax = GP_DATA->threadsizemax;
366 : long i;
367 55549 : if (closure_has_clone(worker))
368 1497 : worker = gcopy(worker); /* to avoid clone_lock race */
369 55549 : mt->mq = (struct mt_queue *) pari_malloc(sizeof(*mt->mq)*lim);
370 55549 : mt->th = (pthread_t *) pari_malloc(sizeof(*mt->th)*lim);
371 55549 : mt->pth = (struct pari_thread *) pari_malloc(sizeof(*mt->pth)*lim);
372 55549 : mt->pending = 0;
373 55549 : mt->n = lim;
374 55549 : mt->nbint = 0;
375 55549 : mt->last = 0;
376 55549 : pthread_cond_init(&mt->pcond,NULL);
377 55549 : pthread_mutex_init(&mt->pmut,NULL);
378 401506 : for (i=0;i<lim;i++)
379 : {
380 345957 : struct mt_queue *mq = mt->mq+i;
381 345957 : mq->no = i;
382 345957 : mq->avma = 0;
383 345957 : mq->mainstack = NULL;
384 345957 : mq->worker = worker;
385 345957 : mq->input = NULL;
386 345957 : mq->output = NULL;
387 345957 : mq->pcond = &mt->pcond;
388 345957 : mq->pmut = &mt->pmut;
389 345957 : pthread_cond_init(&mq->cond,NULL);
390 345957 : pthread_mutex_init(&mq->mut,NULL);
391 345957 : if (mtparisizemax)
392 0 : pari_thread_valloc(&mt->pth[i],mtparisize,mtparisizemax,(GEN)mq);
393 : else
394 345957 : pari_thread_alloc(&mt->pth[i],mtparisize,(GEN)mq);
395 : }
396 55549 : if (DEBUGLEVEL) pari_warn(warner,"starting %ld threads", lim);
397 55549 : BLOCK_SIGINT_START
398 401506 : for (i=0;i<lim;i++)
399 345957 : pthread_create(&mt->th[i],NULL, &mt_queue_run, (void*)&mt->pth[i]);
400 55549 : pari_mt = mt;
401 55549 : BLOCK_SIGINT_END
402 55549 : pt->get=&mtpthread_queue_get;
403 55549 : pt->submit=&mtpthread_queue_submit;
404 55549 : pt->end=&mt_queue_reset;
405 : }
406 127143 : }
|