Skip to content

Commit

Permalink
add step, next, continue, list and longlist commands
Browse files Browse the repository at this point in the history
xyb committed May 9, 2020
1 parent 57b86cf commit fa99c55
Showing 12 changed files with 463 additions and 95 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -33,4 +33,5 @@ jobs:
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=80 --statistics
- name: Test
run: |
python setup.py develop
python setup.py test
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -2,6 +2,8 @@
build
dist
*.pyc
*.txt
*.robot
log.html
output.xml
report.html
84 changes: 82 additions & 2 deletions DebugLibrary/debugcmd.py
Original file line number Diff line number Diff line change
@@ -4,12 +4,15 @@
from robot.errors import ExecutionFailed, HandlerExecutionFailed

from .cmdcompleter import CmdCompleter
from .globals import context
from .prompttoolkitcmd import PromptToolkitCmd
from .robotapp import get_robot_instance, reset_robotframework_exception
from .robotkeyword import get_keywords, get_lib_keywords, run_keyword
from .robotlib import get_builtin_libs, get_libs, get_libs_dict, match_libs
from .robotselenium import SELENIUM_WEBDRIVERS, start_selenium_commands
from .robotvar import assign_variable
from .sourcelines import (RobotNeedUpgrade, print_source_lines,
print_test_case_lines)
from .steplistener import is_step_mode, set_step_mode
from .styles import (DEBUG_PROMPT_STYLE, get_debug_prompt_tokens, print_error,
print_output)

@@ -56,7 +59,7 @@ def postcmd(self, stop, line):
"""Run after a command."""
return stop

def pre_loop(self):
def pre_loop_iter(self):
"""Reset robotframework before every loop iteration."""
reset_robotframework_exception()

@@ -208,3 +211,80 @@ def do_docs(self, kw_name):
print_error('< not find keyword', kw_name)

do_d = do_docs

def emptyline(self):
"""Repeat last nonempty command if in step mode."""
self.repeat_last_nonempty_command = is_step_mode()
return super(DebugCmd, self).emptyline()

def append_command(self, command):
"""Append a command to queue."""
self.cmdqueue.append(command)

def append_exit(self):
"""Append exit command to queue."""
self.append_command('exit')

def do_step(self, args):
"""Execute the current line, stop at the first possible occasion."""
set_step_mode(on=True)
self.append_exit() # pass control back to robot runner

do_s = do_step

def do_next(self, args):
"""Continue execution until the next line is reached or it returns."""
self.do_step(args)

do_n = do_next

def do_continue(self, args):
"""Continue execution."""
self.do_exit(args)

do_c = do_continue

def do_list(self, args):
"""List source code for the current file."""

self.list_source(longlist=False)

do_l = do_list

def do_longlist(self, args):
"""List the whole source code for the current test case."""

self.list_source(longlist=True)

do_ll = do_longlist

def list_source(self, longlist=False):
"""List source code."""
if not is_step_mode():
print('Please run `step` or `next` command first.')
return

if longlist:
print_function = print_test_case_lines
else:
print_function = print_source_lines

try:
print_function(context.current_source_path,
context.current_source_lineno)
except RobotNeedUpgrade:
print('Please upgrade robotframework to support list source code:')
print(' pip install "robotframework>=3.2" -U')

def do_exit(self, args):
"""Exit debug shell."""
set_step_mode(on=False) # explicitly exit REPL will disable step mode
self.append_exit()
return super(DebugCmd, self).do_exit(args)

def onecmd(self, line):
# restore last command acrossing different Cmd instances
self.lastcmd = context.last_command
stop = super(DebugCmd, self).onecmd(line)
context.last_command = self.lastcmd
return stop
15 changes: 15 additions & 0 deletions DebugLibrary/globals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
class SingletonContext:
in_step_mode = False
current_runner = None
current_runner_step = None
current_source_path = ''
current_source_lineno = 0
last_command = ''

def __new__(cls):
if not hasattr(cls, 'instance'):
cls.instance = super(SingletonContext, cls).__new__(cls)
return cls.instance


context = SingletonContext()
21 changes: 15 additions & 6 deletions DebugLibrary/keywords.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,38 @@
import sys

from .debugcmd import DebugCmd
from .steplistener import RobotLibraryStepListenerMixin, is_step_mode
from .styles import print_output
from .webdriver import get_remote_url, get_session_id, get_webdriver_remote


class DebugKeywords(object):
class DebugKeywords(RobotLibraryStepListenerMixin):
"""Debug Keywords for RobotFramework."""

def debug(self):
"""Open a interactive shell, run any RobotFramework keywords.
Keywords separated by two space or one tab, and Ctrl-D to exit.
"""

# re-wire stdout so that we can use the cmd module and have readline
# support
old_stdout = sys.stdout
sys.stdout = sys.__stdout__
print_output('\n>>>>>', 'Enter interactive shell')

debug_cmd = DebugCmd()
debug_cmd.cmdloop()
show_intro = not is_step_mode()
if show_intro:
print_output('\n>>>>>', 'Enter interactive shell')

self.debug_cmd = DebugCmd()
if show_intro:
self.debug_cmd.cmdloop()
else:
self.debug_cmd.cmdloop(intro='')

show_intro = not is_step_mode()
if show_intro:
print_output('\n>>>>>', 'Exit shell.')

print_output('\n>>>>>', 'Exit shell.')
# put stdout back where it was
sys.stdout = old_stdout

112 changes: 68 additions & 44 deletions DebugLibrary/prompttoolkitcmd.py
Original file line number Diff line number Diff line change
@@ -8,9 +8,13 @@

class BaseCmd(cmd.Cmd):
"""Basic REPL tool."""
prompt = '> '
repeat_last_nonempty_command = False

def emptyline(self):
"""Do not repeat last command if press enter only."""
"""Do not repeat last command if input empty unlese repeat_last_command."""
if self.repeat_last_nonempty_command:
return super(BaseCmd, self).emptyline()

def do_exit(self, arg):
"""Exit the interpreter. You can also use the Ctrl-D shortcut."""
@@ -51,14 +55,57 @@ def get_helps(self):
def get_completer(self):
"""Get completer instance."""

def pre_loop(self):
def pre_loop_iter(self):
"""Excute before every loop iteration."""

def loop_once(self):
self.pre_loop_iter()
if self.cmdqueue:
line = self.cmdqueue.pop(0)
else:
try:
line = self.get_input()
except KeyboardInterrupt:
return

if line == 'exit':
line = 'EOF'

line = self.precmd(line)
if line == 'EOF':
# do not run 'EOF' command to avoid override 'lastcmd'
stop = True
else:
stop = self.onecmd(line)
stop = self.postcmd(stop, line)
return stop

def cmdloop(self, intro=None):
"""Better command loop.
override default cmdloop method
"""
if intro is not None:
self.intro = intro
if self.intro:
self.stdout.write(self.intro)
self.stdout.write('\n')

self.preloop()

stop = None
while not stop:
stop = self.loop_once()

self.postloop()

def get_input(self):
return input(prompt=self.prompt)


class PromptToolkitCmd(BaseCmd):
"""CMD shell using prompt-toolkit."""

prompt = '> '
get_prompt_tokens = None
prompt_style = None
intro = '''\
@@ -71,44 +118,21 @@ def __init__(self, completekey='tab', stdin=None, stdout=None,
BaseCmd.__init__(self, completekey, stdin, stdout)
self.history = FileHistory(os.path.expanduser(history_path))

def cmdloop(self, intro=None):
"""Better command loop supported by prompt_toolkit.
override default cmdloop method
"""
if intro is not None:
self.intro = intro
if self.intro:
self.stdout.write(self.intro)
self.stdout.write('\n')

stop = None
while not stop:
self.pre_loop()
if self.cmdqueue:
line = self.cmdqueue.pop(0)
else:
kwargs = dict(
history=self.history,
auto_suggest=AutoSuggestFromHistory(),
enable_history_search=True,
completer=self.get_completer(),
complete_style=CompleteStyle.MULTI_COLUMN,
)
if self.get_prompt_tokens:
kwargs['style'] = self.prompt_style
prompt_str = self.get_prompt_tokens(self.prompt)
else:
prompt_str = self.prompt
try:
line = prompt(message=prompt_str, **kwargs)
except KeyboardInterrupt:
continue
except EOFError:
line = 'EOF'

line = self.precmd(line)
stop = self.onecmd(line)
stop = self.postcmd(stop, line)

self.postloop()
def get_input(self):
kwargs = dict(
history=self.history,
auto_suggest=AutoSuggestFromHistory(),
enable_history_search=True,
completer=self.get_completer(),
complete_style=CompleteStyle.MULTI_COLUMN,
)
if self.get_prompt_tokens:
kwargs['style'] = self.prompt_style
prompt_str = self.get_prompt_tokens(self.prompt)
else:
prompt_str = self.prompt
try:
line = prompt(message=prompt_str, **kwargs)
except EOFError:
line = 'EOF'
return line
3 changes: 1 addition & 2 deletions DebugLibrary/robotkeyword.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import re

from robot.libdocpkg.model import LibraryDoc

from .memoize import memoize
from .robotlib import ImportedLibraryDocBuilder, get_libs
from .robotvar import assign_variable
@@ -16,6 +14,7 @@

def parse_keyword(command):
"""Split a robotframework keyword string."""
# TODO use robotframework functions
return KEYWORD_SEP.split(command)


4 changes: 1 addition & 3 deletions DebugLibrary/shell.py
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ def shell():
args = default_no_logs.split() + [test_file.name]

try:
rc = run_cli(args)
sys.exit(run_cli(args))
finally:
test_file.close()
# pybot will raise PermissionError on Windows NT or later
@@ -39,8 +39,6 @@ def shell():
if os.path.exists(test_file.name):
os.unlink(test_file.name)

sys.exit(rc)


if __name__ == "__main__":
# Usage: python -m DebugLibrary.shell
75 changes: 75 additions & 0 deletions DebugLibrary/sourcelines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from robot.version import get_version

ROBOT_VERION_RUNNER_GET_STEP_LINENO = '3.2'


class RobotNeedUpgrade(Exception):
"""Need upgrade robotframework."""


def check_version():
if get_version() < ROBOT_VERION_RUNNER_GET_STEP_LINENO:
raise RobotNeedUpgrade


def print_source_lines(source_file, lineno, before_and_after=5):
check_version()

if not source_file or not lineno:
return

lines = open(source_file).readlines()
start_index = max(1, lineno - before_and_after - 1)
end_index = min(len(lines) + 1, lineno + before_and_after)
_print_lines(lines, start_index, end_index, lineno)


def print_test_case_lines(source_file, lineno):
check_version()

if not source_file or not lineno:
return

lines = open(source_file).readlines()
current_lineno = lineno

# find the first line of current test case
line_index = current_lineno - 1
while line_index >= 0:
line_index -= 1
line = lines[line_index]
if not _inside_test_case_block(line):
break
start_index = line_index

# find the last line of current test case
line_index = current_lineno - 1
while line_index < len(lines):
line = lines[line_index]
if not _inside_test_case_block(line):
break
line_index += 1
end_index = line_index

_print_lines(lines, start_index, end_index, lineno)


def _inside_test_case_block(line):
if line.startswith('#'):
return True
elif line.startswith(' '):
return True
elif line.startswith('\t'):
return True
return False


def _print_lines(lines, start_index, end_index, current_lineno):
display_lines = lines[start_index:end_index]
for lineno, line in enumerate(display_lines, start_index + 1):
current_line_sign = ''
if lineno == current_lineno:
current_line_sign = '->'
print('{:>3} {:2}\t{}'.format(lineno,
current_line_sign,
line.rstrip()))
60 changes: 60 additions & 0 deletions DebugLibrary/steplistener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import inspect

from .globals import context


class RobotLibraryStepListenerMixin:
ROBOT_LISTENER_API_VERSION = 2

def __init__(self):
super(RobotLibraryStepListenerMixin, self).__init__()
self.ROBOT_LIBRARY_LISTENER = [self]

def _start_keyword(self, name, attrs):
context.current_source_path = ''
context.current_source_lineno = 0

if not is_step_mode():
return

find_runner_step()
step = context.current_runner_step

if hasattr(step, 'lineno'):
path = step.source
lineno = step.lineno
lineno_0_based = lineno - 1
context.current_source_path = path
context.current_source_lineno = lineno
print('> {}({})'.format(path, lineno))
line = (open(path).readlines()[lineno_0_based].strip())
print('-> {}'.format(line))

if attrs['assign']:
assign = '%s = ' % ', '.join(attrs['assign'])
else:
assign = ''
name = '{}.{}'.format(attrs['libname'], attrs['kwname'])

translated = '{}{} {}'.format(assign, name, ' '.join(attrs['args']))
print('=> {}'.format(translated))

# callback debug interface
self.debug()


def find_runner_step():
stack = inspect.stack()
for frame in stack:
if frame.function == 'run_steps':
arginfo = inspect.getargvalues(frame.frame)
context.current_runner = arginfo.locals.get('runner')
context.current_runner_step = arginfo.locals.get('step')


def set_step_mode(on=True):
context.in_step_mode = on


def is_step_mode():
return context.in_step_mode
12 changes: 12 additions & 0 deletions DebugLibrary/tests/step.robot
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
*** Settings ***
Library DebugLibrary

** test case **
test1
debug
log to console working
@{list} = Create List hello world

test2
log to console another test case
log to console end
169 changes: 131 additions & 38 deletions DebugLibrary/tests/test_debuglibrary.py
Original file line number Diff line number Diff line change
@@ -1,75 +1,168 @@
#!/usr/bin/env python

import os
import tempfile
import unittest
from os.path import abspath, dirname, join

import pexpect
from robot.version import get_version

TIMEOUT_SECONDS = 2

child = None


def check_result(pattern):
index = child.expect([pattern, pexpect.EOF, pexpect.TIMEOUT],
timeout=TIMEOUT_SECONDS)
try:
assert index == 0
except AssertionError:
print('\n==== Screen buffer raw ====\n',
child._buffer.getvalue(),
'\n^^^^ Screen buffer raw ^^^^')
print('==== Screen buffer ====\n',
child._buffer.getvalue().decode('utf8'),
'\n^^^^ Screen buffer ^^^^')
raise

def functional_testing():
child = pexpect.spawn('/usr/bin/env python -m DebugLibrary.shell')
child.expect('Enter interactive shell', timeout=TIMEOUT_SECONDS * 3)

def check_result(pattern):
index = child.expect([pattern, pexpect.EOF, pexpect.TIMEOUT],
timeout=TIMEOUT_SECONDS)
try:
assert index == 0
except AssertionError:
print('Screen buffer: ', child._buffer.getvalue())
raise
def check_prompt(keys, pattern):
child.write(keys)
check_result(pattern)
child.write('\003') # ctrl-c: reset inputs

def check_prompt(keys, pattern):
child.write(keys)
check_result(pattern)
child.write('\003') # ctrl-c: reset inputs

def check_command(command, pattern):
child.sendline(command)
check_result(pattern)
def check_command(command, pattern):
child.sendline(command)
check_result(pattern)


def base_functional_testing():
global child
child = pexpect.spawn('/usr/bin/env python -m DebugLibrary.shell')
child.expect('Enter interactive shell', timeout=TIMEOUT_SECONDS * 3)

# auto complete
check_prompt('key\t', 'keywords')
check_prompt('key\t', 'Keyword Should Exist')
check_prompt('buil\t', 'Library: BuiltIn')
check_prompt('builtin.\t', 'Call Method')
check_prompt('get\t', 'Get Count')
check_prompt('get\t', 'Get Time')
check_prompt('key\t',
'keywords')
check_prompt('key\t',
'Keyword Should Exist')
check_prompt('buil\t',
'Library: BuiltIn')
check_prompt('builtin.\t',
'Call Method')
check_prompt('get\t',
'Get Count')
check_prompt('get\t',
'Get Time')

# keyword
check_command('log to console hello', 'hello')
check_command('get time', '.*-.*-.* .*:.*:.*')
check_prompt('g', 'et time')
check_command('help keywords', 'Print keywords of libraries')
check_command('k builtin', 'Sleep')
check_command('d sleep', 'Pauses the test executed for the given time')
check_command('log to console hello',
'hello')
check_command('get time',
'.*-.*-.* .*:.*:.*')
check_prompt('g',
'et time')
check_command('help keywords',
'Print keywords of libraries')
check_command('k builtin',
'Sleep')
check_command('d sleep',
'Pauses the test executed for the given time')

# var
check_command('@{{list}} = Create List hello world',
"@{{list}} = ['helo', 'world']")
check_command('${list}', "['helo', 'world']")
check_command('${list}',
"['helo', 'world']")
check_command('&{dict} = Create Dictionary name=admin',
"&{dict} = {'name': 'admin'}")
check_command('${dict.name}', 'admin')
check_command('${dict.name}',
'admin')

# fail-safe
check_command('fail', 'AssertionError')
check_command('nothing', "No keyword with name 'nothing' found.")
check_command('fail',
'AssertionError')
check_command('nothing',
"No keyword with name 'nothing' found.")

return 'OK'


def step_functional_testing():
global child
path = join(dirname(abspath(__file__)), 'step.robot')
child = pexpect.spawn('/usr/bin/env robot {}'.format(path))
child.expect('Enter interactive shell', timeout=TIMEOUT_SECONDS * 3)

check_command('list',
'Please run `step` or `next` command first.')

support_source_lineno = get_version() >= '3.2'

if support_source_lineno:
check_command('s', # step
'/DebugLibrary/tests/step.robot\(7\).*'
'-> log to console working.*'
'=> BuiltIn.Log To Console working')
check_command('l', # list
' 7 -> log to console working')
check_command('n', # next
'/DebugLibrary/tests/step.robot\(8\).*'
'@.* = Create List hello world.*'
'@.* = BuiltIn\.Create List hello world')
check_command('', # just repeat last command
'/DebugLibrary/tests/step.robot\(11\).*'
'-> log to console another test case.*'
'=> BuiltIn.Log To Console another test case')
check_command('l', # list
' 6 debug.*'
' 7 log to console working.*'
' 8 @.* = Create List hello world.*'
' 9.*'
' 10 test2.*'
' 11 -> log to console another test case.*'
' 12 log to console end')
check_command('ll', # longlist
' 10 test2.*'
' 11 -> log to console another test case.*'
' 12 log to console end')
else:
check_command('s', # step
'=> BuiltIn.Log To Console working')
check_command('l', # list
'Please upgrade robotframework')
check_command('n', # next
'@.* = BuiltIn\.Create List hello world')
check_command('', # repeat last command
'=> BuiltIn.Log To Console another test case')

# exit
check_command('c', # continue
'Exit shell.*'
'another test case.*'
'end')

return 'OK'


class FunctionalTestCase(unittest.TestCase):
def test_functional(self):
assert functional_testing() == 'OK'
def test_base_functional(self):
assert base_functional_testing() == 'OK'

def test_step_functional(self):
assert step_functional_testing() == 'OK'


def suite():
suite = unittest.TestSuite()
suite.addTest(FunctionalTestCase('test_functional'))
suite.addTest(FunctionalTestCase('test_base_functional'))
suite.addTest(FunctionalTestCase('test_step_functional'))
return suite


if __name__ == '__main__':
print(functional_testing())
print(base_functional_testing())
print(step_functional_testing())

0 comments on commit fa99c55

Please sign in to comment.