"""Emulating Python's interactive interpreter in asynchronous contexts.
"""
# Based on code standard module by Guido van Rossum and co-othors,
# and on asyncio.__main__ code by Yury Selivanov (differs from it in
# that these classes operate in already running asynchronous contexts)
import sys
import traceback
import code
import asyncio
import ast
import io
import types
__all__ = ["AsyncInteractiveInterpreter", "AsyncInteractiveConsole"]
[docs]class AsyncInteractiveInterpreter(code.InteractiveInterpreter):
"""Base class for AsyncInteractiveConsole.
This class inherits from :class:`code.InteractiveInterpreter`. It
- makes ``runsource``, ``runcode``, ``showsyntaxerror``,
``showtraceback`` and ``write`` methods asynchronous
(they return coroutines, that have to be awaited);
- tell the compiler to allow top-level ``await`` statements;
- awaits coroutines returned by such statements execution;
- catches :data:`sys.stdout` and :data:`sys.stderr` during code
execution, then send gathered content using :meth:`write`.
This class deals with parsing and interpreter state (the user's
namespace); it doesn't deal with input buffering or prompting or
input file naming (the filename is always passed in explicitly).
"""
[docs] def __init__(self, locals, *, reroute_stdout=True, reroute_stderr=True):
"""Constructor.
The optional ``locals`` argument specifies the dictionary in
which code will be executed; it defaults to a newly created
dictionary with key ``"__name__"`` set to ``"__console__"`` and
key ``"__doc__"`` set to ``None``.
The optional ``reroute_stdout`` and ``reroute_stderr`` keyword
arguments, if set to ``False``, disable catching
:data:`sys.stdout` and :data:`sys.stderr`, respectively.
"""
super().__init__(locals)
if "__builtins__" not in self.locals:
# Why should __builtins__ be specified here and not in
# code.InteractiveInterpreter? No idea, but necessary!
self.locals["__builtins__"] = __builtins__
self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT
self.reroute_stdout = reroute_stdout
self.reroute_stderr = reroute_stderr
[docs] async def runsource(self, source, filename="<input>", symbol="single"):
"""Compile and run some source in the interpreter.
Arguments and behavior are as for :class:`code.InteractiveInterpreter`,
except that this is an asynchronous method.
Unless explicitly disabled, this method catches :data:`sys.stdout`
and :data:`sys.stderr` during code execution. If some data has
been written, it is send using :meth:`write` before returning.
"""
try:
code = self.compile(source, filename, symbol)
except (OverflowError, SyntaxError, ValueError):
# Case 1
await self.showsyntaxerror(filename)
return False
if code is None:
# Case 2
return True
# Case 3a
if self.reroute_stdout or self.reroute_stderr:
# Cache current stdout and stderr
_stdout = sys.stdout
_stderr = sys.stderr
# Create temporary IO buffer
buffer = io.StringIO()
try:
if self.reroute_stdout:
# Catch standard output
sys.stdout = buffer
if self.reroute_stderr:
# Catch error output
sys.stderr = buffer
await self.runcode(code)
return False
finally:
# Restore sys.stdout and sys.stderr
sys.stdout = _stdout
sys.stderr = _stderr
data = buffer.getvalue()
if data:
# Write gathered output (from print, repr...)
await self.write(data)
buffer.close()
# Case 3b
else:
await self.runcode(code)
return False
[docs] async def runcode(self, code):
"""Execute a code object.
Arguments and behavior are as for :class:`code.InteractiveInterpreter`,
except that this is an asynchronous method and that 'code' is
wrapped in a function object which is called instead of being
directly executed.
If the result is a coroutine, it is then awaited before this
method returns. Exceptions are processed in the same way as
during code execution.
"""
func = types.FunctionType(code, self.locals)
coro = None
try:
# Same as exec(code, self.locals) but return result
coro = func()
except SystemExit:
raise
except BaseException:
await self.showtraceback()
if asyncio.iscoroutine(coro):
# func() returned a coroutine
try:
# We await it
await coro
except SystemExit:
raise
except BaseException:
await self.showtraceback()
[docs] async def showsyntaxerror(self, filename=None):
"""Display the syntax error that just occurred.
Arguments and behavior are as for :class:`code.InteractiveInterpreter`,
except that this is an asynchronous method.
"""
type, value, tb = sys.exc_info()
sys.last_type = type
sys.last_value = value
sys.last_traceback = tb
if filename and type is SyntaxError:
# Work hard to stuff the correct filename in the exception
try:
msg, (dummy_filename, lineno, offset, line) = value.args
except ValueError:
# Not the format we expect; leave it alone
pass
else:
# Stuff in the right filename
value = SyntaxError(msg, (filename, lineno, offset, line))
sys.last_value = value
if sys.excepthook is sys.__excepthook__:
lines = traceback.format_exception_only(type, value)
await self.write(''.join(lines))
else:
# If someone has set sys.excepthook, we let that take precedence
# over self.write
sys.excepthook(type, value, tb)
[docs] async def showtraceback(self):
"""Display the exception that just occurred.
Arguments and behavior are as for :class:`code.InteractiveInterpreter`,
except that this is an asynchronous method.
"""
sys.last_type, sys.last_value, last_tb = ei = sys.exc_info()
sys.last_traceback = last_tb
try:
lines = traceback.format_exception(ei[0], ei[1], last_tb.tb_next)
if sys.excepthook is sys.__excepthook__:
await self.write(''.join(lines))
else:
# If someone has set sys.excepthook, we let that take precedence
# over self.write
sys.excepthook(ei[0], ei[1], last_tb)
finally:
last_tb = ei = None
[docs] async def write(self, data):
"""Write a string.
Arguments are as for :class:`code.InteractiveInterpreter`.
While that this is an async method, the base implementation
still writes to :data:`sys.stderr`; a subclass should replace
this with a different (asynchronous) implementation.
"""
sys.stderr.write(data)
[docs]class AsyncInteractiveConsole(AsyncInteractiveInterpreter,
code.InteractiveConsole):
"""Emulate interactive Python interpreter in asynchronous contexts.
This class builds on :class:`AsyncInteractiveInterpreter` and adds
prompting using the familiar :data:`sys.ps1` and :data:`sys.ps2`,
and input buffering.
It does exactly the same job as :class:`code.InteractiveConsole`,
except that ``interact``, ``push`` and ``raw_input`` methods are
asynchronous (they return coroutines, that have to be awaited).
"""
[docs] def __init__(self, locals=None, filename="<console>", *,
reroute_stdout=True, reroute_stderr=True):
"""Constructor.
The optional ``locals``, ``reroute_stdout`` and
``reroute_stderr`` arguments will be passed to the
:class:`AsyncInteractiveInterpreter` base class.
The optional ``filename`` argument should specify the
(file)name of the input stream; it will show up in tracebacks.
"""
super().__init__(locals, reroute_stdout=reroute_stdout,
reroute_stderr=reroute_stderr)
self.filename = filename
self.resetbuffer()
[docs] async def interact(self, banner=None, exitmsg=None):
"""Closely emulate the interactive Python console.
Arguments and behavior are as for :class:`code.InteractiveConsole`,
except that this is an asynchronous method that awaits inputs.
"""
try:
sys.ps1
except AttributeError:
sys.ps1 = ">>> "
try:
sys.ps2
except AttributeError:
sys.ps2 = "... "
aw = 'async REPL: use "await" directly instead of "asyncio.run()".'
cprt = 'Type "help", "copyright", "credits" or "license" for more information.'
if banner is None:
await self.write("Python {} on {}\n{}\n{}\n({})\n".format(
sys.version, sys.platform, aw, cprt,
self.__class__.__name__))
elif banner:
await self.write("{}\n".format(banner))
more = 0
while 1:
try:
if more:
prompt = sys.ps2
else:
prompt = sys.ps1
try:
line = await self.raw_input(prompt)
except EOFError:
await self.write("\n")
break
else:
more = await self.push(line)
except KeyboardInterrupt:
await self.write("\nKeyboardInterrupt\n")
self.resetbuffer()
more = 0
if exitmsg is None:
await self.write('now exiting {}...\n'.format(
self.__class__.__name__))
elif exitmsg != '':
await self.write('{}\n'.format(exitmsg))
[docs] async def push(self, line):
"""Push a line to the interpreter.
Arguments and behavior are as for :class:`code.InteractiveConsole`,
except that this is an asynchronous method.
"""
self.buffer.append(line)
source = "\n".join(self.buffer)
more = await self.runsource(source, self.filename)
if not more:
self.resetbuffer()
return more