Files
2024-12-27 22:31:23 +09:00

793 lines
26 KiB
Cython

@cython.no_gc_clear
cdef class UVProcess(UVHandle):
"""Abstract class; wrapper over uv_process_t handle."""
def __cinit__(self):
self.uv_opt_env = NULL
self.uv_opt_args = NULL
self._returncode = None
self._pid = None
self._fds_to_close = list()
self._preexec_fn = None
self._restore_signals = True
self.context = Context_CopyCurrent()
cdef _close_process_handle(self):
# XXX: This is a workaround for a libuv bug:
# - https://github.com/libuv/libuv/issues/1933
# - https://github.com/libuv/libuv/pull/551
if self._handle is NULL:
return
self._handle.data = NULL
uv.uv_close(self._handle, __uv_close_process_handle_cb)
self._handle = NULL # close callback will free() the memory
cdef _init(self, Loop loop, list args, dict env,
cwd, start_new_session,
_stdin, _stdout, _stderr, # std* can be defined as macros in C
pass_fds, debug_flags, preexec_fn, restore_signals):
global __forking
global __forking_loop
global __forkHandler
cdef int err
self._start_init(loop)
self._handle = <uv.uv_handle_t*>PyMem_RawMalloc(
sizeof(uv.uv_process_t))
if self._handle is NULL:
self._abort_init()
raise MemoryError()
# Too early to call _finish_init, but still a lot of work to do.
# Let's set handle.data to NULL, so in case something goes wrong,
# callbacks have a chance to avoid casting *something* into UVHandle.
self._handle.data = NULL
force_fork = False
if system.PLATFORM_IS_APPLE and not (
preexec_fn is None
and not pass_fds
):
# see _execute_child() in CPython/subprocess.py
force_fork = True
try:
self._init_options(args, env, cwd, start_new_session,
_stdin, _stdout, _stderr, force_fork)
restore_inheritable = set()
if pass_fds:
for fd in pass_fds:
if not os_get_inheritable(fd):
restore_inheritable.add(fd)
os_set_inheritable(fd, True)
except Exception:
self._abort_init()
raise
if __forking or loop.active_process_handler is not None:
# Our pthread_atfork handlers won't work correctly when
# another loop is forking in another thread (even though
# GIL should help us to avoid that.)
self._abort_init()
raise RuntimeError(
'Racing with another loop to spawn a process.')
self._errpipe_read, self._errpipe_write = os_pipe()
fds_to_close = self._fds_to_close
self._fds_to_close = None
fds_to_close.append(self._errpipe_read)
# add the write pipe last so we can close it early
fds_to_close.append(self._errpipe_write)
try:
os_set_inheritable(self._errpipe_write, True)
self._preexec_fn = preexec_fn
self._restore_signals = restore_signals
loop.active_process_handler = self
__forking = 1
__forking_loop = loop
system.setForkHandler(<system.OnForkHandler>&__get_fork_handler)
PyOS_BeforeFork()
err = uv.uv_spawn(loop.uvloop,
<uv.uv_process_t*>self._handle,
&self.options)
__forking = 0
__forking_loop = None
system.resetForkHandler()
loop.active_process_handler = None
PyOS_AfterFork_Parent()
if err < 0:
self._close_process_handle()
self._abort_init()
raise convert_error(err)
self._finish_init()
# close the write pipe early
os_close(fds_to_close.pop())
if preexec_fn is not None:
errpipe_data = bytearray()
while True:
# XXX: This is a blocking code that has to be
# rewritten (using loop.connect_read_pipe() or
# otherwise.)
part = os_read(self._errpipe_read, 50000)
errpipe_data += part
if not part or len(errpipe_data) > 50000:
break
finally:
while fds_to_close:
os_close(fds_to_close.pop())
for fd in restore_inheritable:
os_set_inheritable(fd, False)
# asyncio caches the PID in BaseSubprocessTransport,
# so that the transport knows what the PID was even
# after the process is finished.
self._pid = (<uv.uv_process_t*>self._handle).pid
# Track the process handle (create a strong ref to it)
# to guarantee that __dealloc__ doesn't happen in an
# uncontrolled fashion. We want to wait until the process
# exits and libuv calls __uvprocess_on_exit_callback,
# which will call `UVProcess._close()`, which will, in turn,
# untrack this handle.
self._loop._track_process(self)
if debug_flags & __PROCESS_DEBUG_SLEEP_AFTER_FORK:
time_sleep(1)
if preexec_fn is not None and errpipe_data:
# preexec_fn has raised an exception. The child
# process must be dead now.
try:
exc_name, exc_msg = errpipe_data.split(b':', 1)
exc_name = exc_name.decode()
exc_msg = exc_msg.decode()
except Exception:
self._close()
raise subprocess_SubprocessError(
'Bad exception data from child: {!r}'.format(
errpipe_data))
exc_cls = getattr(__builtins__, exc_name,
subprocess_SubprocessError)
exc = subprocess_SubprocessError(
'Exception occurred in preexec_fn.')
exc.__cause__ = exc_cls(exc_msg)
self._close()
raise exc
cdef _after_fork(self):
# See CPython/_posixsubprocess.c for details
cdef int err
if self._restore_signals:
_Py_RestoreSignals()
PyOS_AfterFork_Child()
err = uv.uv_loop_fork(self._loop.uvloop)
if err < 0:
raise convert_error(err)
if self._preexec_fn is not None:
try:
gc_disable()
self._preexec_fn()
except BaseException as ex:
try:
with open(self._errpipe_write, 'wb') as f:
f.write(str(ex.__class__.__name__).encode())
f.write(b':')
f.write(str(ex.args[0]).encode())
finally:
system._exit(255)
return
else:
os_close(self._errpipe_write)
else:
os_close(self._errpipe_write)
cdef _close_after_spawn(self, int fd):
if self._fds_to_close is None:
raise RuntimeError(
'UVProcess._close_after_spawn called after uv_spawn')
self._fds_to_close.append(fd)
def __dealloc__(self):
if self.uv_opt_env is not NULL:
PyMem_RawFree(self.uv_opt_env)
self.uv_opt_env = NULL
if self.uv_opt_args is not NULL:
PyMem_RawFree(self.uv_opt_args)
self.uv_opt_args = NULL
cdef char** __to_cstring_array(self, list arr):
cdef:
int i
ssize_t arr_len = len(arr)
bytes el
char **ret
ret = <char **>PyMem_RawMalloc((arr_len + 1) * sizeof(char *))
if ret is NULL:
raise MemoryError()
for i in range(arr_len):
el = arr[i]
# NB: PyBytes_AsString doesn't copy the data;
# we have to be careful when the "arr" is GCed,
# and it shouldn't be ever mutated.
ret[i] = PyBytes_AsString(el)
ret[arr_len] = NULL
return ret
cdef _init_options(self, list args, dict env, cwd, start_new_session,
_stdin, _stdout, _stderr, bint force_fork):
memset(&self.options, 0, sizeof(uv.uv_process_options_t))
self._init_env(env)
self.options.env = self.uv_opt_env
self._init_args(args)
self.options.file = self.uv_opt_file
self.options.args = self.uv_opt_args
if start_new_session:
self.options.flags |= uv.UV_PROCESS_DETACHED
if force_fork:
# This is a hack to work around the change in libuv 1.44:
# > macos: use posix_spawn instead of fork
# where Python subprocess options like preexec_fn are
# crippled. CPython only uses posix_spawn under a pretty
# strict list of conditions (see subprocess.py), and falls
# back to using fork() otherwise. We'd like to simulate such
# behavior with libuv, but unfortunately libuv doesn't
# provide explicit API to choose such implementation detail.
# Based on current (libuv 1.46) behavior, setting
# UV_PROCESS_SETUID or UV_PROCESS_SETGID would reliably make
# libuv fallback to use fork, so let's just use it for now.
self.options.flags |= uv.UV_PROCESS_SETUID
self.options.uid = uv.getuid()
if cwd is not None:
cwd = os_fspath(cwd)
if isinstance(cwd, str):
cwd = PyUnicode_EncodeFSDefault(cwd)
if not isinstance(cwd, bytes):
raise ValueError('cwd must be a str or bytes object')
self.__cwd = cwd
self.options.cwd = PyBytes_AsString(self.__cwd)
self.options.exit_cb = &__uvprocess_on_exit_callback
self._init_files(_stdin, _stdout, _stderr)
cdef _init_args(self, list args):
cdef:
bytes path
int an = len(args)
if an < 1:
raise ValueError('cannot spawn a process: args are empty')
self.__args = args.copy()
for i in range(an):
arg = os_fspath(args[i])
if isinstance(arg, str):
self.__args[i] = PyUnicode_EncodeFSDefault(arg)
elif not isinstance(arg, bytes):
raise TypeError('all args must be str or bytes')
path = self.__args[0]
self.uv_opt_file = PyBytes_AsString(path)
self.uv_opt_args = self.__to_cstring_array(self.__args)
cdef _init_env(self, dict env):
if env is not None:
self.__env = list()
for key in env:
val = env[key]
if isinstance(key, str):
key = PyUnicode_EncodeFSDefault(key)
elif not isinstance(key, bytes):
raise TypeError(
'all environment vars must be bytes or str')
if isinstance(val, str):
val = PyUnicode_EncodeFSDefault(val)
elif not isinstance(val, bytes):
raise TypeError(
'all environment values must be bytes or str')
self.__env.append(key + b'=' + val)
self.uv_opt_env = self.__to_cstring_array(self.__env)
else:
self.__env = None
cdef _init_files(self, _stdin, _stdout, _stderr):
self.options.stdio_count = 0
cdef _kill(self, int signum):
cdef int err
self._ensure_alive()
err = uv.uv_process_kill(<uv.uv_process_t*>self._handle, signum)
if err < 0:
raise convert_error(err)
cdef _on_exit(self, int64_t exit_status, int term_signal):
if term_signal:
# From Python docs:
# A negative value -N indicates that the child was
# terminated by signal N (POSIX only).
self._returncode = -term_signal
else:
self._returncode = exit_status
self._close()
cdef _close(self):
try:
if self._loop is not None:
self._loop._untrack_process(self)
finally:
UVHandle._close(self)
DEF _CALL_PIPE_DATA_RECEIVED = 0
DEF _CALL_PIPE_CONNECTION_LOST = 1
DEF _CALL_PROCESS_EXITED = 2
DEF _CALL_CONNECTION_LOST = 3
@cython.no_gc_clear
cdef class UVProcessTransport(UVProcess):
def __cinit__(self):
self._exit_waiters = []
self._protocol = None
self._init_futs = []
self._pending_calls = []
self._stdio_ready = 0
self._stdin = self._stdout = self._stderr = None
self.stdin_proto = self.stdout_proto = self.stderr_proto = None
self._finished = 0
cdef _on_exit(self, int64_t exit_status, int term_signal):
UVProcess._on_exit(self, exit_status, term_signal)
if self._stdio_ready:
self._loop.call_soon(self._protocol.process_exited,
context=self.context)
else:
self._pending_calls.append((_CALL_PROCESS_EXITED, None, None))
self._try_finish()
for waiter in self._exit_waiters:
if not waiter.cancelled():
waiter.set_result(self._returncode)
self._exit_waiters.clear()
self._close()
cdef _check_proc(self):
if not self._is_alive() or self._returncode is not None:
raise ProcessLookupError()
cdef _pipe_connection_lost(self, int fd, exc):
if self._stdio_ready:
self._loop.call_soon(self._protocol.pipe_connection_lost, fd, exc,
context=self.context)
self._try_finish()
else:
self._pending_calls.append((_CALL_PIPE_CONNECTION_LOST, fd, exc))
cdef _pipe_data_received(self, int fd, data):
if self._stdio_ready:
self._loop.call_soon(self._protocol.pipe_data_received, fd, data,
context=self.context)
else:
self._pending_calls.append((_CALL_PIPE_DATA_RECEIVED, fd, data))
cdef _file_redirect_stdio(self, int fd):
fd = os_dup(fd)
os_set_inheritable(fd, True)
self._close_after_spawn(fd)
return fd
cdef _file_devnull(self):
dn = os_open(os_devnull, os_O_RDWR)
os_set_inheritable(dn, True)
self._close_after_spawn(dn)
return dn
cdef _file_outpipe(self):
r, w = __socketpair()
os_set_inheritable(w, True)
self._close_after_spawn(w)
return r, w
cdef _file_inpipe(self):
r, w = __socketpair()
os_set_inheritable(r, True)
self._close_after_spawn(r)
return r, w
cdef _init_files(self, _stdin, _stdout, _stderr):
cdef uv.uv_stdio_container_t *iocnt
UVProcess._init_files(self, _stdin, _stdout, _stderr)
io = [None, None, None]
self.options.stdio_count = 3
self.options.stdio = self.iocnt
if _stdin is not None:
if _stdin == subprocess_PIPE:
r, w = self._file_inpipe()
io[0] = r
self.stdin_proto = WriteSubprocessPipeProto(self, 0)
waiter = self._loop._new_future()
self._stdin = WriteUnixTransport.new(
self._loop, self.stdin_proto, None, waiter)
self._init_futs.append(waiter)
self._stdin._open(w)
self._stdin._init_protocol()
elif _stdin == subprocess_DEVNULL:
io[0] = self._file_devnull()
elif _stdout == subprocess_STDOUT:
raise ValueError(
'subprocess.STDOUT is supported only by stderr parameter')
else:
io[0] = self._file_redirect_stdio(_stdin)
else:
io[0] = self._file_redirect_stdio(0)
if _stdout is not None:
if _stdout == subprocess_PIPE:
# We can't use UV_CREATE_PIPE here, since 'stderr' might be
# set to 'subprocess.STDOUT', and there is no way to
# emulate that functionality with libuv high-level
# streams API. Therefore, we create pipes for stdout and
# stderr manually.
r, w = self._file_outpipe()
io[1] = w
self.stdout_proto = ReadSubprocessPipeProto(self, 1)
waiter = self._loop._new_future()
self._stdout = ReadUnixTransport.new(
self._loop, self.stdout_proto, None, waiter)
self._init_futs.append(waiter)
self._stdout._open(r)
self._stdout._init_protocol()
elif _stdout == subprocess_DEVNULL:
io[1] = self._file_devnull()
elif _stdout == subprocess_STDOUT:
raise ValueError(
'subprocess.STDOUT is supported only by stderr parameter')
else:
io[1] = self._file_redirect_stdio(_stdout)
else:
io[1] = self._file_redirect_stdio(1)
if _stderr is not None:
if _stderr == subprocess_PIPE:
r, w = self._file_outpipe()
io[2] = w
self.stderr_proto = ReadSubprocessPipeProto(self, 2)
waiter = self._loop._new_future()
self._stderr = ReadUnixTransport.new(
self._loop, self.stderr_proto, None, waiter)
self._init_futs.append(waiter)
self._stderr._open(r)
self._stderr._init_protocol()
elif _stderr == subprocess_STDOUT:
if io[1] is None:
# shouldn't ever happen
raise RuntimeError('cannot apply subprocess.STDOUT')
io[2] = self._file_redirect_stdio(io[1])
elif _stderr == subprocess_DEVNULL:
io[2] = self._file_devnull()
else:
io[2] = self._file_redirect_stdio(_stderr)
else:
io[2] = self._file_redirect_stdio(2)
assert len(io) == 3
for idx in range(3):
iocnt = &self.iocnt[idx]
if io[idx] is not None:
iocnt.flags = uv.UV_INHERIT_FD
iocnt.data.fd = io[idx]
else:
iocnt.flags = uv.UV_IGNORE
cdef _call_connection_made(self, waiter):
try:
# we're always called in the right context, so just call the user's
self._protocol.connection_made(self)
except (KeyboardInterrupt, SystemExit):
raise
except BaseException as ex:
if waiter is not None and not waiter.cancelled():
waiter.set_exception(ex)
else:
raise
else:
if waiter is not None and not waiter.cancelled():
waiter.set_result(True)
self._stdio_ready = 1
if self._pending_calls:
pending_calls = self._pending_calls.copy()
self._pending_calls.clear()
for (type, fd, arg) in pending_calls:
if type == _CALL_PIPE_CONNECTION_LOST:
self._pipe_connection_lost(fd, arg)
elif type == _CALL_PIPE_DATA_RECEIVED:
self._pipe_data_received(fd, arg)
elif type == _CALL_PROCESS_EXITED:
self._loop.call_soon(self._protocol.process_exited)
elif type == _CALL_CONNECTION_LOST:
self._loop.call_soon(self._protocol.connection_lost, None)
cdef _try_finish(self):
if self._returncode is None or self._finished:
return
if ((self.stdin_proto is None or self.stdin_proto.disconnected) and
(self.stdout_proto is None or
self.stdout_proto.disconnected) and
(self.stderr_proto is None or
self.stderr_proto.disconnected)):
self._finished = 1
if self._stdio_ready:
# copy self.context for simplicity
self._loop.call_soon(self._protocol.connection_lost, None,
context=self.context)
else:
self._pending_calls.append((_CALL_CONNECTION_LOST, None, None))
def __stdio_inited(self, waiter, stdio_fut):
exc = stdio_fut.exception()
if exc is not None:
if waiter is None:
raise exc
else:
waiter.set_exception(exc)
else:
self._loop._call_soon_handle(
new_MethodHandle1(self._loop,
"UVProcessTransport._call_connection_made",
<method1_t>self._call_connection_made,
None, # means to copy the current context
self, waiter))
@staticmethod
cdef UVProcessTransport new(Loop loop, protocol, args, env,
cwd, start_new_session,
_stdin, _stdout, _stderr, pass_fds,
waiter,
debug_flags,
preexec_fn,
restore_signals):
cdef UVProcessTransport handle
handle = UVProcessTransport.__new__(UVProcessTransport)
handle._protocol = protocol
handle._init(loop, args, env, cwd, start_new_session,
__process_convert_fileno(_stdin),
__process_convert_fileno(_stdout),
__process_convert_fileno(_stderr),
pass_fds,
debug_flags,
preexec_fn,
restore_signals)
if handle._init_futs:
handle._stdio_ready = 0
init_fut = aio_gather(*handle._init_futs)
# add_done_callback will copy the current context and run the
# callback within the context
init_fut.add_done_callback(
ft_partial(handle.__stdio_inited, waiter))
else:
handle._stdio_ready = 1
loop._call_soon_handle(
new_MethodHandle1(loop,
"UVProcessTransport._call_connection_made",
<method1_t>handle._call_connection_made,
None, # means to copy the current context
handle, waiter))
return handle
def get_protocol(self):
return self._protocol
def set_protocol(self, protocol):
self._protocol = protocol
def get_pid(self):
return self._pid
def get_returncode(self):
return self._returncode
def get_pipe_transport(self, fd):
if fd == 0:
return self._stdin
elif fd == 1:
return self._stdout
elif fd == 2:
return self._stderr
def terminate(self):
self._check_proc()
self._kill(uv.SIGTERM)
def kill(self):
self._check_proc()
self._kill(uv.SIGKILL)
def send_signal(self, int signal):
self._check_proc()
self._kill(signal)
def is_closing(self):
return self._closed
def close(self):
if self._returncode is None:
self._kill(uv.SIGKILL)
if self._stdin is not None:
self._stdin.close()
if self._stdout is not None:
self._stdout.close()
if self._stderr is not None:
self._stderr.close()
if self._returncode is not None:
# The process is dead, just close the UV handle.
#
# (If "self._returncode is None", the process should have been
# killed already and we're just waiting for a SIGCHLD; after
# which the transport will be GC'ed and the uvhandle will be
# closed in UVHandle.__dealloc__.)
self._close()
def get_extra_info(self, name, default=None):
return default
def _wait(self):
fut = self._loop._new_future()
if self._returncode is not None:
fut.set_result(self._returncode)
return fut
self._exit_waiters.append(fut)
return fut
class WriteSubprocessPipeProto(aio_BaseProtocol):
def __init__(self, proc, fd):
if UVLOOP_DEBUG:
if type(proc) is not UVProcessTransport:
raise TypeError
if not isinstance(fd, int):
raise TypeError
self.proc = proc
self.fd = fd
self.pipe = None
self.disconnected = False
def connection_made(self, transport):
self.pipe = transport
def __repr__(self):
return ('<%s fd=%s pipe=%r>'
% (self.__class__.__name__, self.fd, self.pipe))
def connection_lost(self, exc):
self.disconnected = True
(<UVProcessTransport>self.proc)._pipe_connection_lost(self.fd, exc)
self.proc = None
def pause_writing(self):
(<UVProcessTransport>self.proc)._protocol.pause_writing()
def resume_writing(self):
(<UVProcessTransport>self.proc)._protocol.resume_writing()
class ReadSubprocessPipeProto(WriteSubprocessPipeProto,
aio_Protocol):
def data_received(self, data):
(<UVProcessTransport>self.proc)._pipe_data_received(self.fd, data)
cdef __process_convert_fileno(object obj):
if obj is None or isinstance(obj, int):
return obj
fileno = obj.fileno()
if not isinstance(fileno, int):
raise TypeError(
'{!r}.fileno() returned non-integer'.format(obj))
return fileno
cdef void __uvprocess_on_exit_callback(
uv.uv_process_t *handle,
int64_t exit_status,
int term_signal,
) noexcept with gil:
if __ensure_handle_data(<uv.uv_handle_t*>handle,
"UVProcess exit callback") == 0:
return
cdef UVProcess proc = <UVProcess> handle.data
try:
proc._on_exit(exit_status, term_signal)
except BaseException as ex:
proc._error(ex, False)
cdef __socketpair():
cdef:
int fds[2]
int err
err = system.socketpair(uv.AF_UNIX, uv.SOCK_STREAM, 0, fds)
if err:
exc = convert_error(-err)
raise exc
os_set_inheritable(fds[0], False)
os_set_inheritable(fds[1], False)
return fds[0], fds[1]
cdef void __uv_close_process_handle_cb(
uv.uv_handle_t* handle
) noexcept with gil:
PyMem_RawFree(handle)