33 lines
1.1 KiB
Python
33 lines
1.1 KiB
Python
from jeeves.core.executor import Executor
|
|
from .factories import FlowFactory, TaskFactory
|
|
|
|
|
|
def test_executor_success_task_ok():
|
|
task = TaskFactory(type="jeeves.core.actions.stub:StubSuccessAction")
|
|
flow = FlowFactory(tasks=[task])
|
|
runner = Executor(flow)
|
|
runner.start()
|
|
assert runner._execution.steps[0].result
|
|
assert runner._execution.steps[0].result.success is True
|
|
assert runner._execution.success is True
|
|
|
|
|
|
def test_executor_non_success_task_ok():
|
|
task = TaskFactory(type="jeeves.core.actions.stub:StubNonSuccessAction")
|
|
flow = FlowFactory(tasks=[task])
|
|
runner = Executor(flow)
|
|
runner.start()
|
|
assert runner._execution.steps[0].result
|
|
assert runner._execution.steps[0].result.success is False
|
|
assert runner._execution.success is False
|
|
|
|
|
|
def test_executor_uncaught_exception_in_task_ok():
|
|
task = TaskFactory(type="jeeves.core.actions.stub:StubUncaughtExceptionAction")
|
|
flow = FlowFactory(tasks=[task])
|
|
runner = Executor(flow)
|
|
runner.start()
|
|
assert runner._execution.steps[0].result
|
|
assert runner._execution.steps[0].result.success is False
|
|
assert runner._execution.success is False
|