Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update async.vim #134

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 138 additions & 38 deletions autoload/minpac/job.vim
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ let s:job_type_nvimjob = 'nvimjob'
let s:job_type_vimjob = 'vimjob'
let s:job_error_unsupported_job_type = -2 " unsupported job type

function! s:noop(...) abort
endfunction

function! s:job_supported_types() abort
let l:supported_types = []
if has('nvim')
Expand All @@ -49,15 +52,11 @@ function! s:job_supports_type(type) abort
endfunction

function! s:out_cb(jobid, opts, job, data) abort
if has_key(a:opts, 'on_stdout')
call a:opts.on_stdout(a:jobid, split(a:data, "\n", 1), 'stdout')
endif
call a:opts.on_stdout(a:jobid, split(a:data, "\n", 1), 'stdout')
endfunction

function! s:err_cb(jobid, opts, job, data) abort
if has_key(a:opts, 'on_stderr')
call a:opts.on_stderr(a:jobid, split(a:data, "\n", 1), 'stderr')
endif
call a:opts.on_stderr(a:jobid, split(a:data, "\n", 1), 'stderr')
endfunction

function! s:exit_cb(jobid, opts, job, status) abort
Expand All @@ -70,21 +69,13 @@ function! s:exit_cb(jobid, opts, job, status) abort
endfunction

function! s:on_stdout(jobid, data, event) abort
if has_key(s:jobs, a:jobid)
let l:jobinfo = s:jobs[a:jobid]
if has_key(l:jobinfo.opts, 'on_stdout')
call l:jobinfo.opts.on_stdout(a:jobid, a:data, a:event)
endif
endif
let l:jobinfo = s:jobs[a:jobid]
call l:jobinfo.opts.on_stdout(a:jobid, a:data, a:event)
endfunction

function! s:on_stderr(jobid, data, event) abort
if has_key(s:jobs, a:jobid)
let l:jobinfo = s:jobs[a:jobid]
if has_key(l:jobinfo.opts, 'on_stderr')
call l:jobinfo.opts.on_stderr(a:jobid, a:data, a:event)
endif
endif
let l:jobinfo = s:jobs[a:jobid]
call l:jobinfo.opts.on_stderr(a:jobid, a:data, a:event)
endfunction

function! s:on_exit(jobid, status, event) abort
Expand All @@ -93,6 +84,9 @@ function! s:on_exit(jobid, status, event) abort
if has_key(l:jobinfo.opts, 'on_exit')
call l:jobinfo.opts.on_exit(a:jobid, a:status, a:event)
endif
if has_key(s:jobs, a:jobid)
call remove(s:jobs, a:jobid)
endif
endif
endfunction

Expand Down Expand Up @@ -124,12 +118,19 @@ function! s:job_start(cmd, opts) abort
return s:job_error_unsupported_job_type
endif

" options shared by both vim and neovim
let l:jobopt = {}
if has_key(a:opts, 'cwd')
let l:jobopt.cwd = a:opts.cwd
endif

if l:jobtype == s:job_type_nvimjob
let l:job = jobstart(a:cmd, {
\ 'on_stdout': function('s:on_stdout'),
\ 'on_stderr': function('s:on_stderr'),
call extend(l:jobopt, {
\ 'on_stdout': has_key(a:opts, 'on_stdout') ? function('s:on_stdout') : function('s:noop'),
\ 'on_stderr': has_key(a:opts, 'on_stderr') ? function('s:on_stderr') : function('s:noop'),
\ 'on_exit': function('s:on_exit'),
\})
let l:job = jobstart(a:cmd, l:jobopt)
if l:job <= 0
return l:job
endif
Expand All @@ -142,12 +143,16 @@ function! s:job_start(cmd, opts) abort
elseif l:jobtype == s:job_type_vimjob
let s:jobidseq = s:jobidseq + 1
let l:jobid = s:jobidseq
let l:job = job_start(a:cmd, {
\ 'out_cb': function('s:out_cb', [l:jobid, a:opts]),
\ 'err_cb': function('s:err_cb', [l:jobid, a:opts]),
call extend(l:jobopt, {
\ 'out_cb': has_key(a:opts, 'on_stdout') ? function('s:out_cb', [l:jobid, a:opts]) : function('s:noop'),
\ 'err_cb': has_key(a:opts, 'on_stderr') ? function('s:err_cb', [l:jobid, a:opts]) : function('s:noop'),
\ 'exit_cb': function('s:exit_cb', [l:jobid, a:opts]),
\ 'mode': 'raw',
\})
\ })
if has('patch-8.1.889')
let l:jobopt['noblock'] = 1
endif
let l:job = job_start(a:cmd, l:jobopt)
if job_status(l:job) !=? 'run'
return -1
endif
Expand All @@ -169,36 +174,67 @@ function! s:job_stop(jobid) abort
if has_key(s:jobs, a:jobid)
let l:jobinfo = s:jobs[a:jobid]
if l:jobinfo.type == s:job_type_nvimjob
call jobstop(a:jobid)
" See: vital-Whisky/System.Job
try
call jobstop(a:jobid)
catch /^Vim\%((\a\+)\)\=:E900/
" NOTE:
" Vim does not raise exception even the job has already closed so fail
" silently for 'E900: Invalid job id' exception
endtry
elseif l:jobinfo.type == s:job_type_vimjob
call job_stop(s:jobs[a:jobid].job)
endif
if has_key(s:jobs, a:jobid)
call remove(s:jobs, a:jobid)
if type(s:jobs[a:jobid].job) == v:t_job
call job_stop(s:jobs[a:jobid].job)
elseif type(s:jobs[a:jobid].job) == v:t_channel
call ch_close(s:jobs[a:jobid].job)
endif
endif
endif
endfunction

function! s:job_send(jobid, data) abort
function! s:job_send(jobid, data, opts) abort
let l:jobinfo = s:jobs[a:jobid]
let l:close_stdin = get(a:opts, 'close_stdin', 0)
if l:jobinfo.type == s:job_type_nvimjob
call jobsend(a:jobid, a:data)
if l:close_stdin
call chanclose(a:jobid, 'stdin')
endif
elseif l:jobinfo.type == s:job_type_vimjob
let l:jobinfo.buffer .= a:data
call s:flush_vim_sendraw(a:jobid, v:null)
" There is no easy way to know when ch_sendraw() finishes writing data
" on a non-blocking channels -- has('patch-8.1.889') -- and because of
" this, we cannot safely call ch_close_in(). So when we find ourselves
" in this situation (i.e. noblock=1 and close stdin after send) we fall
" back to using s:flush_vim_sendraw() and wait for transmit buffer to be
" empty
"
" Ref: https://groups.google.com/d/topic/vim_dev/UNNulkqb60k/discussion
if has('patch-8.1.818') && (!has('patch-8.1.889') || !l:close_stdin)
call ch_sendraw(l:jobinfo.channel, a:data)
else
let l:jobinfo.buffer .= a:data
call s:flush_vim_sendraw(a:jobid, v:null)
endif
if l:close_stdin
while len(l:jobinfo.buffer) != 0
sleep 1m
endwhile
call ch_close_in(l:jobinfo.channel)
endif
endif
endfunction

function! s:flush_vim_sendraw(jobid, timer) abort
" https://github.com/vim/vim/issues/2548
" https://github.com/natebosch/vim-lsc/issues/67#issuecomment-357469091
let l:jobinfo = s:jobs[a:jobid]
if len(l:jobinfo.buffer) <= 1024
sleep 1m
if len(l:jobinfo.buffer) <= 4096
call ch_sendraw(l:jobinfo.channel, l:jobinfo.buffer)
let l:jobinfo.buffer = ''
else
let l:to_send = l:jobinfo.buffer[:1023]
let l:jobinfo.buffer = l:jobinfo.buffer[1024:]
let l:to_send = l:jobinfo.buffer[:4095]
let l:jobinfo.buffer = l:jobinfo.buffer[4096:]
call ch_sendraw(l:jobinfo.channel, l:to_send)
call timer_start(1, function('s:flush_vim_sendraw', [a:jobid]))
endif
Expand Down Expand Up @@ -245,6 +281,38 @@ function! s:job_wait(jobids, timeout) abort
return l:ret
endfunction

function! s:job_pid(jobid) abort
if !has_key(s:jobs, a:jobid)
return 0
endif

let l:jobinfo = s:jobs[a:jobid]
if l:jobinfo.type == s:job_type_nvimjob
return jobpid(a:jobid)
elseif l:jobinfo.type == s:job_type_vimjob
let l:vimjobinfo = job_info(a:jobid)
if type(l:vimjobinfo) == type({}) && has_key(l:vimjobinfo, 'process')
return l:vimjobinfo['process']
endif
endif
return 0
endfunction

function! s:callback_cb(jobid, opts, ch, data) abort
if has_key(a:opts, 'on_stdout')
call a:opts.on_stdout(a:jobid, split(a:data, "\n", 1), 'stdout')
endif
endfunction

function! s:close_cb(jobid, opts, ch) abort
if has_key(a:opts, 'on_exit')
call a:opts.on_exit(a:jobid, 'closed', 'exit')
endif
if has_key(s:jobs, a:jobid)
call remove(s:jobs, a:jobid)
endif
endfunction

" public apis {{{
function! minpac#job#start(cmd, opts) abort
return s:job_start(a:cmd, a:opts)
Expand All @@ -254,14 +322,46 @@ function! minpac#job#stop(jobid) abort
call s:job_stop(a:jobid)
endfunction

function! minpac#job#send(jobid, data) abort
call s:job_send(a:jobid, a:data)
function! minpac#job#send(jobid, data, ...) abort
let l:opts = get(a:000, 0, {})
call s:job_send(a:jobid, a:data, l:opts)
endfunction

function! minpac#job#wait(jobids, ...) abort
let l:timeout = get(a:000, 0, -1)
return s:job_wait(a:jobids, l:timeout)
endfunction

function! async#job#pid(jobid) abort
return s:job_pid(a:jobid)
endfunction

function! async#job#connect(addr, opts) abort
let s:jobidseq = s:jobidseq + 1
let l:jobid = s:jobidseq
let l:retry = 0
while l:retry < 5
let l:ch = ch_open(a:addr, {'waittime': 1000})
call ch_setoptions(l:ch, {
\ 'callback': function('s:callback_cb', [l:jobid, a:opts]),
\ 'close_cb': function('s:close_cb', [l:jobid, a:opts]),
\ 'mode': 'raw',
\})
if ch_status(l:ch) ==# 'open'
break
endif
sleep 100m
let l:retry += 1
endwhile
let s:jobs[l:jobid] = {
\ 'type': s:job_type_vimjob,
\ 'opts': a:opts,
\ 'job': l:ch,
\ 'channel': l:ch,
\ 'buffer': ''
\}
return l:jobid
endfunction
" }}}

let &cpo = s:save_cpo
Expand Down