Python: apply suggestions from code review for asyncio

This commit is contained in:
Peter Stöckli 2023-09-06 15:47:07 +02:00
Родитель 9027eac312
Коммит ede7d8fb6a
1 изменённых файлов: 74 добавлений и 107 удалений

Просмотреть файл

@ -4400,9 +4400,6 @@ private module StdlibPrivate {
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// asyncio // asyncio
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
/** Gets a reference to the `asyncio` module. */
API::Node asyncio() { result = API::moduleImport("asyncio") }
/** Provides models for the `asyncio` module. */ /** Provides models for the `asyncio` module. */
module AsyncIO { module AsyncIO {
/** /**
@ -4411,24 +4408,19 @@ private module StdlibPrivate {
* See https://docs.python.org/3/library/asyncio-subprocess.html#creating-subprocesses * See https://docs.python.org/3/library/asyncio-subprocess.html#creating-subprocesses
*/ */
private class CreateSubprocessExec extends SystemCommandExecution::Range, private class CreateSubprocessExec extends SystemCommandExecution::Range,
FileSystemAccess::Range, DataFlow::CallCfgNode FileSystemAccess::Range, API::CallNode
{ {
CreateSubprocessExec() { CreateSubprocessExec() {
exists(string name | this = API::moduleImport("asyncio").getMember("create_subprocess_exec").getACall()
name = "create_subprocess_exec" and or
( this =
this = asyncio().getMember(name).getACall() API::moduleImport("asyncio")
or .getMember("subprocess")
this = asyncio().getMember("subprocess").getMember(name).getACall() .getMember("create_subprocess_exec")
) .getACall()
)
} }
override DataFlow::Node getCommand() { override DataFlow::Node getCommand() { result = this.getParameter(0, "program").asSink() }
result = this.getArg(0)
or
result = this.getArgByName("program")
}
override DataFlow::Node getAPathArgument() { result = this.getCommand() } override DataFlow::Node getAPathArgument() { result = this.getCommand() }
@ -4436,104 +4428,79 @@ private module StdlibPrivate {
none() // this is a safe API. none() // this is a safe API.
} }
} }
}
/** private class CreateSubprocessShell extends SystemCommandExecution::Range,
* A call to the `asyncio.create_subprocess_shell` function (also in the `subprocess` module of `asyncio`) FileSystemAccess::Range, API::CallNode
* {
* See https://docs.python.org/3/library/asyncio-subprocess.html#creating-subprocesses CreateSubprocessShell() {
*/ this = API::moduleImport("asyncio").getMember("create_subprocess_shell").getACall()
private class CreateSubprocessShell extends SystemCommandExecution::Range, or
FileSystemAccess::Range, DataFlow::CallCfgNode this =
{ API::moduleImport("asyncio")
CreateSubprocessShell() { .getMember("subprocess")
exists(string name | .getMember("create_subprocess_shell")
name = "create_subprocess_shell" and .getACall()
( }
this = asyncio().getMember(name).getACall()
or override DataFlow::Node getCommand() { result = this.getParameter(0, "cmd").asSink() }
this = asyncio().getMember("subprocess").getMember(name).getACall()
) override DataFlow::Node getAPathArgument() { result = this.getCommand() }
)
override predicate isShellInterpreted(DataFlow::Node arg) { arg = this.getCommand() }
} }
override DataFlow::Node getCommand() { /**
result = this.getArg(0) * Get an asyncio event loop (an object with basetype `AbstractEventLoop`).
*
* See https://docs.python.org/3/library/asyncio-eventloop.html
*/
private API::Node getAsyncioEventLoop() {
result = API::moduleImport("asyncio").getMember("get_running_loop").getReturn()
or or
result = this.getArgByName("cmd") result = API::moduleImport("asyncio").getMember("get_event_loop").getReturn() // deprecated in Python 3.10.0 and later
}
override DataFlow::Node getAPathArgument() { result = this.getCommand() }
override predicate isShellInterpreted(DataFlow::Node arg) { arg = this.getCommand() }
}
/**
* A source for an event loop (an object with basetype `AbstractEventLoop`).
*
* See https://docs.python.org/3/library/asyncio-eventloop.html
*/
private class EventLoopSource extends DataFlow::LocalSourceNode, DataFlow::CallCfgNode {
EventLoopSource() {
this = asyncio().getMember("get_running_loop").getACall()
or or
this = asyncio().getMember("get_event_loop").getACall() // deprecated in Python 3.10.0 and later result = API::moduleImport("asyncio").getMember("new_event_loop").getReturn()
or
this = asyncio().getMember("new_event_loop").getACall()
}
}
/** Gets a reference to an event loop instance. */
private DataFlow::TypeTrackingNode eventLoopInstance(DataFlow::TypeTracker t) {
t.start() and
result instanceof EventLoopSource
or
exists(DataFlow::TypeTracker t2 | result = eventLoopInstance(t2).track(t2, t))
}
/** Gets a reference to an event loop instance. */
DataFlow::Node eventLoopInstance() {
eventLoopInstance(DataFlow::TypeTracker::end()).flowsTo(result)
}
/**
* A call to `subprocess_exec` on an event loop instance.
*
* See https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.subprocess_exec
*/
private class EventLoopSubprocessExec extends DataFlow::MethodCallNode,
SystemCommandExecution::Range, FileSystemAccess::Range
{
EventLoopSubprocessExec() { this.calls(eventLoopInstance(), "subprocess_exec") }
override DataFlow::Node getCommand() { result = this.getArg(1) }
override DataFlow::Node getAPathArgument() { result = this.getCommand() }
override predicate isShellInterpreted(DataFlow::Node arg) {
none() // this is a safe API.
}
}
/**
* A call to `subprocess_shell` on an event loop instance.
*
* See https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.subprocess_shell
*/
private class EventLoopSubprocessShell extends DataFlow::MethodCallNode,
SystemCommandExecution::Range, FileSystemAccess::Range
{
EventLoopSubprocessShell() { this.calls(eventLoopInstance(), "subprocess_shell") }
override DataFlow::Node getCommand() {
result = this.getArg(1)
or
result = this.getArgByName("cmd")
} }
override DataFlow::Node getAPathArgument() { result = this.getCommand() } /**
* A call to `subprocess_exec` on an event loop instance.
*
* See https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.subprocess_exec
*/
private class EventLoopSubprocessExec extends API::CallNode, SystemCommandExecution::Range,
FileSystemAccess::Range
{
EventLoopSubprocessExec() {
this = getAsyncioEventLoop().getMember("subprocess_exec").getACall()
}
override predicate isShellInterpreted(DataFlow::Node arg) { arg = this.getCommand() } override DataFlow::Node getCommand() { result = this.getArg(1) }
override DataFlow::Node getAPathArgument() { result = this.getCommand() }
override predicate isShellInterpreted(DataFlow::Node arg) {
none() // this is a safe API.
}
}
/**
* A call to `subprocess_shell` on an event loop instance.
*
* See https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.subprocess_shell
*/
private class EventLoopSubprocessShell extends API::CallNode, SystemCommandExecution::Range,
FileSystemAccess::Range
{
EventLoopSubprocessShell() {
this = getAsyncioEventLoop().getMember("subprocess_shell").getACall()
}
override DataFlow::Node getCommand() { result = this.getParameter(1, "cmd").asSink() }
override DataFlow::Node getAPathArgument() { result = this.getCommand() }
override predicate isShellInterpreted(DataFlow::Node arg) { arg = this.getCommand() }
}
} }
} }