Merge #10915 'jobwait(): fix race if job exits quickly'

This commit is contained in:
Justin M. Keyes
2019-09-03 08:40:54 -07:00
committed by GitHub
5 changed files with 36 additions and 40 deletions

View File

@ -5411,7 +5411,8 @@ jobstop({id}) *jobstop()*
See |job-control|. See |job-control|.
jobwait({ids}[, {timeout}]) *jobwait()* jobwait({ids}[, {timeout}]) *jobwait()*
Wait for a set of jobs to complete. Wait for a set of jobs and their |on_exit| handlers to
complete.
{ids} is a list of |job-id|s to wait for. {ids} is a list of |job-id|s to wait for.
{timeout} is the maximum waiting time in milliseconds, -1 {timeout} is the maximum waiting time in milliseconds, -1
@ -5427,10 +5428,10 @@ jobwait({ids}[, {timeout}]) *jobwait()*
Returns a list of len({ids}) integers, where each integer is Returns a list of len({ids}) integers, where each integer is
the wait-result of the corresponding job. Each wait-result is the wait-result of the corresponding job. Each wait-result is
one of the following: one of the following:
* Exit-code, if the job exited Exit-code, if the job exited
* -1 if the timeout was exceeded -1 if the timeout was exceeded
* -2 if the job was interrupted -2 if the job was interrupted (by |CTRL-C|)
* -3 if the |job-id| is invalid -3 if the job-id is invalid
join({list} [, {sep}]) *join()* join({list} [, {sep}]) *join()*
Join the items in {list} together into one String. Join the items in {list} together into one String.

View File

@ -12437,35 +12437,30 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr)
if (check_restricted() || check_secure()) { if (check_restricted() || check_secure()) {
return; return;
} }
if (argvars[0].v_type != VAR_LIST || (argvars[1].v_type != VAR_NUMBER if (argvars[0].v_type != VAR_LIST || (argvars[1].v_type != VAR_NUMBER
&& argvars[1].v_type != VAR_UNKNOWN)) { && argvars[1].v_type != VAR_UNKNOWN)) {
EMSG(_(e_invarg)); EMSG(_(e_invarg));
return; return;
} }
ui_busy_start();
list_T *args = argvars[0].vval.v_list; list_T *args = argvars[0].vval.v_list;
Channel **jobs = xcalloc(tv_list_len(args), sizeof(*jobs)); Channel **jobs = xcalloc(tv_list_len(args), sizeof(*jobs));
ui_busy_start();
MultiQueue *waiting_jobs = multiqueue_new_parent(loop_on_put, &main_loop); MultiQueue *waiting_jobs = multiqueue_new_parent(loop_on_put, &main_loop);
// For each item in the input list append an integer to the output list. -3
// is used to represent an invalid job id, -2 is for a interrupted job and
// -1 for jobs that were skipped or timed out.
// Validate, prepare jobs for waiting.
int i = 0; int i = 0;
TV_LIST_ITER_CONST(args, arg, { TV_LIST_ITER_CONST(args, arg, {
Channel *chan = NULL; Channel *chan = NULL;
if (TV_LIST_ITEM_TV(arg)->v_type != VAR_NUMBER if (TV_LIST_ITEM_TV(arg)->v_type != VAR_NUMBER
|| !(chan = find_job(TV_LIST_ITEM_TV(arg)->vval.v_number, false))) { || !(chan = find_job(TV_LIST_ITEM_TV(arg)->vval.v_number, false))) {
jobs[i] = NULL; jobs[i] = NULL; // Invalid job.
} else { } else {
jobs[i] = chan; jobs[i] = chan;
channel_incref(chan); channel_incref(chan);
if (chan->stream.proc.status < 0) { if (chan->stream.proc.status < 0) {
// Process any pending events for the job because we'll temporarily // Process any pending events on the job's queue before temporarily
// replace the parent queue // replacing it.
multiqueue_process_events(chan->events); multiqueue_process_events(chan->events);
multiqueue_replace_parent(chan->events, waiting_jobs); multiqueue_replace_parent(chan->events, waiting_jobs);
} }
@ -12482,40 +12477,36 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr)
for (i = 0; i < tv_list_len(args); i++) { for (i = 0; i < tv_list_len(args); i++) {
if (remaining == 0) { if (remaining == 0) {
// timed out break; // Timeout.
break;
} }
if (jobs[i] == NULL) {
// if the job already exited, but wasn't freed yet continue; // Invalid job, will assign status=-3 below.
if (jobs[i] == NULL || jobs[i]->stream.proc.status >= 0) {
continue;
} }
int status = process_wait(&jobs[i]->stream.proc, remaining, int status = process_wait(&jobs[i]->stream.proc, remaining,
waiting_jobs); waiting_jobs);
if (status < 0) { if (status < 0) {
// interrupted or timed out, skip remaining jobs. break; // Interrupted (CTRL-C) or timeout, skip remaining jobs.
break;
} }
if (remaining > 0) { if (remaining > 0) {
uint64_t now = os_hrtime(); uint64_t now = os_hrtime();
remaining -= (int) ((now - before) / 1000000); remaining = MIN(0, remaining - (int)((now - before) / 1000000));
before = now; before = now;
if (remaining <= 0) {
break;
}
} }
} }
list_T *const rv = tv_list_alloc(tv_list_len(args)); list_T *const rv = tv_list_alloc(tv_list_len(args));
// restore the parent queue for any jobs still alive // For each job:
// * Restore its parent queue if the job is still alive.
// * Append its status to the output list, or:
// -3 for "invalid job id"
// -2 for "interrupted" (user hit CTRL-C)
// -1 for jobs that were skipped or timed out
for (i = 0; i < tv_list_len(args); i++) { for (i = 0; i < tv_list_len(args); i++) {
if (jobs[i] == NULL) { if (jobs[i] == NULL) {
tv_list_append_number(rv, -3); tv_list_append_number(rv, -3);
continue; continue;
} }
// restore the parent queue for the job
multiqueue_process_events(jobs[i]->events); multiqueue_process_events(jobs[i]->events);
multiqueue_replace_parent(jobs[i]->events, main_loop.events); multiqueue_replace_parent(jobs[i]->events, main_loop.events);

View File

@ -181,8 +181,7 @@ int process_wait(Process *proc, int ms, MultiQueue *events)
got_int // interrupted by the user got_int // interrupted by the user
|| proc->refcount == 1); // job exited || proc->refcount == 1); // job exited
// we'll assume that a user frantically hitting interrupt doesn't like // Assume that a user hitting CTRL-C does not like the current job. Kill it.
// the current job. Signal that it has to be killed.
if (got_int) { if (got_int) {
got_int = false; got_int = false;
process_stop(proc); process_stop(proc);

View File

@ -582,13 +582,17 @@ describe('jobs', function()
it('will run callbacks while waiting', function() it('will run callbacks while waiting', function()
source([[ source([[
let g:dict = {'id': 10} let g:dict = {}
let g:exits = 0 let g:jobs = []
function g:dict.on_exit(id, code, event) let g:exits = []
function g:dict.on_stdout(id, code, event) abort
call add(g:jobs, a:id)
endfunction
function g:dict.on_exit(id, code, event) abort
if a:code != 5 if a:code != 5
throw 'Error!' throw 'Error!'
endif endif
let g:exits += 1 call add(g:exits, a:id)
endfunction endfunction
call jobwait(has('win32') ? [ call jobwait(has('win32') ? [
\ jobstart('Start-Sleep -Milliseconds 100; exit 5', g:dict), \ jobstart('Start-Sleep -Milliseconds 100; exit 5', g:dict),
@ -601,9 +605,10 @@ describe('jobs', function()
\ jobstart('sleep 0.050; exit 5', g:dict), \ jobstart('sleep 0.050; exit 5', g:dict),
\ jobstart('sleep 0.070; exit 5', g:dict) \ jobstart('sleep 0.070; exit 5', g:dict)
\ ]) \ ])
call rpcnotify(g:channel, 'wait', g:exits) call rpcnotify(g:channel, 'wait', sort(g:jobs), sort(g:exits))
]]) ]])
eq({'notification', 'wait', {4}}, next_msg()) eq({'notification', 'wait',
{{3,4,5,6}, {3,4,5,6}}}, next_msg())
end) end)
it('will return status codes in the order of passed ids', function() it('will return status codes in the order of passed ids', function()

View File

@ -98,8 +98,8 @@ describe("'wildmenu'", function()
]]} ]]}
-- cmdline CTRL-D display should also be preserved. -- cmdline CTRL-D display should also be preserved.
feed([[<C-\><C-N>]]) feed([[<C-U>]])
feed([[:sign <C-D>]]) -- Invoke cmdline CTRL-D. feed([[sign <C-D>]]) -- Invoke cmdline CTRL-D.
expect_stay_unchanged{grid=[[ expect_stay_unchanged{grid=[[
:sign | :sign |
define place | define place |