Moved logic to jeeves.core
This commit is contained in:
parent
e71c2ddd51
commit
6bcbef4280
|
@ -0,0 +1,44 @@
|
|||
import traceback
|
||||
|
||||
from jeeves.core.objects import Flow, Result, Execution, ExecutionStep
|
||||
|
||||
|
||||
class Executor:
|
||||
def __init__(self, flow: Flow):
|
||||
self.step_count = len(flow.tasks)
|
||||
self._flow: Flow = flow
|
||||
self._execution = Execution(flow=flow, steps=self._get_steps(flow))
|
||||
|
||||
@property
|
||||
def steps(self):
|
||||
for step in self._execution.steps:
|
||||
yield step
|
||||
|
||||
def _get_steps(self, flow):
|
||||
for task in flow.tasks:
|
||||
yield ExecutionStep(task=task, result=Result())
|
||||
|
||||
def execute_step(self, step):
|
||||
try:
|
||||
step.result = step.task.runner.execute()
|
||||
except Exception as error:
|
||||
# Catch unhandled exceptions, mark the result as unsuccessful
|
||||
# and append the error as output.
|
||||
tb = traceback.format_exc()
|
||||
output = "\n".join(
|
||||
(
|
||||
"=" * 30,
|
||||
f"Uncaught exception on task {step.task.type}",
|
||||
f"\t{step.task}",
|
||||
f"Error: {error}",
|
||||
tb,
|
||||
"=" * 30,
|
||||
)
|
||||
)
|
||||
step.result = Result(success=False, output=output)
|
||||
return step.result
|
||||
|
||||
def start(self):
|
||||
for step in self._execution.steps:
|
||||
self.execute_step(step)
|
||||
self._execution.success = step.result.success
|
|
@ -0,0 +1,47 @@
|
|||
from typing import Any, Dict, List, Text, Optional
|
||||
from dataclasses import field
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from jeeves.core.registry import TaskRegistry
|
||||
|
||||
|
||||
class BaseObject(BaseModel):
|
||||
pass
|
||||
|
||||
|
||||
class Result(BaseObject):
|
||||
output: Text = ""
|
||||
success: bool = False
|
||||
|
||||
|
||||
class Task(BaseObject):
|
||||
name: Text
|
||||
type: Text
|
||||
parameters: Dict[Any, Any]
|
||||
|
||||
@property
|
||||
def runner(self):
|
||||
return TaskRegistry.get_task_cls(self.type)(parameters=self.parameters)
|
||||
|
||||
|
||||
class Flow(BaseObject):
|
||||
name: Text
|
||||
tasks: List[Task] = field(default_factory=list)
|
||||
|
||||
|
||||
class ExecutionStep(BaseObject):
|
||||
task: Task
|
||||
result: Optional[Result] = None
|
||||
|
||||
|
||||
class Execution(BaseObject):
|
||||
flow: Flow
|
||||
steps: List[ExecutionStep]
|
||||
success: bool = False
|
||||
|
||||
@property
|
||||
def output(self):
|
||||
for step in self.steps:
|
||||
if step.result.output:
|
||||
yield step.result.output
|
|
@ -0,0 +1,35 @@
|
|||
from typing import Dict, Text
|
||||
|
||||
import toml
|
||||
|
||||
from jeeves.core.objects import Flow, BaseObject
|
||||
|
||||
|
||||
class ObjectParser:
|
||||
object = None
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, serialized: Text) -> BaseObject:
|
||||
return cls.object.parse_raw(serialized)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, serialized: Dict) -> BaseObject:
|
||||
return cls.object.parse_obj(serialized)
|
||||
|
||||
@classmethod
|
||||
def from_toml(cls, serialized: Text) -> BaseObject:
|
||||
dct = toml.loads(serialized)
|
||||
return cls.from_dict(dct)
|
||||
|
||||
@classmethod
|
||||
def from_toml_file(cls, path) -> BaseObject:
|
||||
dct = toml.load(path)
|
||||
return cls.from_dict(dct)
|
||||
|
||||
@classmethod
|
||||
def to_dict(cls, obj: BaseObject) -> dict:
|
||||
return obj.dict()
|
||||
|
||||
|
||||
class FlowParser(ObjectParser):
|
||||
object = Flow
|
|
@ -1,15 +1,17 @@
|
|||
import logging
|
||||
|
||||
from jeeves.tasks import PROVIDED_TASKS
|
||||
from jeeves.core.tasks import PROVIDED_TASKS
|
||||
|
||||
|
||||
class Singleton(type):
|
||||
instance = None
|
||||
def __init__(cls, name, bases, attrs, **kwargs):
|
||||
super().__init__(name, bases, attrs, **kwargs)
|
||||
cls._instance = None
|
||||
|
||||
def __call__(cls, *args, **kw):
|
||||
if not cls.instance:
|
||||
cls.instance = super(Singleton, cls).__call__(*args, **kw)
|
||||
return cls.instance
|
||||
def __call__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__call__(*args, **kwargs)
|
||||
return cls._instance
|
||||
|
||||
|
||||
class TaskRegistry(metaclass=Singleton):
|
||||
|
@ -24,22 +26,23 @@ class TaskRegistry(metaclass=Singleton):
|
|||
# TODO: Third party plugins
|
||||
registry = cls()
|
||||
for task_namespace in PROVIDED_TASKS:
|
||||
registry.register_task(cls.get_task(task_namespace))
|
||||
registry.register_task(cls.get_task_cls(task_namespace))
|
||||
|
||||
@classmethod
|
||||
def register_task(cls, task):
|
||||
def register_task(cls, task_cls):
|
||||
registry = cls()
|
||||
namespace = task.get_namespace()
|
||||
# namespace = task_cls.id
|
||||
namespace = f"{task_cls.__module__}:{task_cls.__name__}"
|
||||
|
||||
if namespace in registry.tasks:
|
||||
raise cls.TaskNamespaceConflict(
|
||||
f"Namespace {namespace} is already registered"
|
||||
)
|
||||
|
||||
registry.tasks[namespace] = task
|
||||
registry.tasks[namespace] = task_cls
|
||||
|
||||
@classmethod
|
||||
def get_task(cls, namespace, **kwargs):
|
||||
def get_task_cls(cls, namespace, **kwargs):
|
||||
# Right now tasks are being imported and returned dinamically because it's easier,
|
||||
# but we will need a way of autodiscover all tasks (or register them manually) and
|
||||
# referencing them on a list so the user knows which tasks are available.
|
||||
|
@ -48,8 +51,6 @@ class TaskRegistry(metaclass=Singleton):
|
|||
try:
|
||||
module = __import__(f"{modulename}", fromlist=(clsname,), level=0)
|
||||
task_cls = getattr(module, clsname)
|
||||
if kwargs:
|
||||
return task_cls(**kwargs)
|
||||
return task_cls
|
||||
except ModuleNotFoundError as error:
|
||||
raise cls.TaskDoesNotExist(f"Error importing task {namespace}: {error.msg}")
|
|
@ -0,0 +1 @@
|
|||
PROVIDED_TASKS = ["jeeves.core.tasks.shell:ShellTask"]
|
|
@ -0,0 +1,14 @@
|
|||
import logging
|
||||
|
||||
import pydantic
|
||||
|
||||
|
||||
class Task:
|
||||
id = ""
|
||||
|
||||
class Parameters(pydantic.BaseModel):
|
||||
pass
|
||||
|
||||
def __init__(self, parameters=None):
|
||||
self.logger = logging.getLogger(self.__class__.__name__)
|
||||
self.parameters = self.Parameters(**(parameters or {}))
|
|
@ -3,10 +3,12 @@ from typing import Text
|
|||
|
||||
import pydantic
|
||||
|
||||
from jeeves.tasks.base import Task
|
||||
from jeeves.core.objects import Result
|
||||
from .base import Task
|
||||
|
||||
|
||||
class ShellTask(Task):
|
||||
id = "library/shell"
|
||||
verbose_name = "Execute Shell script"
|
||||
|
||||
class Parameters(pydantic.BaseModel):
|
||||
|
@ -19,4 +21,6 @@ class ShellTask(Task):
|
|||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
)
|
||||
return (process.returncode > 0, process.stdout.decode("utf-8"))
|
||||
return Result(
|
||||
success=process.returncode == 0, output=process.stdout.decode("utf-8")
|
||||
)
|
|
@ -0,0 +1,48 @@
|
|||
"""
|
||||
This is a collection of Tasks provided for testing purposes only.
|
||||
"""
|
||||
from typing import Text, Optional
|
||||
|
||||
from jeeves.core.objects import Result
|
||||
from .base import Task
|
||||
|
||||
|
||||
class StubSuccessTask(Task):
|
||||
id = "stub/success"
|
||||
|
||||
def execute(self, message=None):
|
||||
return Result(success=True)
|
||||
|
||||
|
||||
class StubNonSuccessTask(Task):
|
||||
id = "stub/non-success"
|
||||
|
||||
def execute(self):
|
||||
return Result(output="error!", success=False)
|
||||
|
||||
|
||||
class StubUncaughtExceptionTask(Task):
|
||||
id = "stub/uncaught-exception"
|
||||
|
||||
def execute(self):
|
||||
raise Exception("Oh god...")
|
||||
|
||||
|
||||
class StubNoParametersTask(StubSuccessTask):
|
||||
"""
|
||||
An empty task that provides no configurable parameters.
|
||||
"""
|
||||
|
||||
id = "stub/no-parameters"
|
||||
|
||||
|
||||
class StubParametersTask(StubSuccessTask):
|
||||
"""
|
||||
An empty task that provide two configurable parameters.
|
||||
"""
|
||||
|
||||
id = "sub/parameters"
|
||||
|
||||
class Parameters(Task.Parameters):
|
||||
mandatory: Text
|
||||
non_mandatory: Optional[Text] = None
|
|
@ -0,0 +1,24 @@
|
|||
import factory
|
||||
from faker import Faker
|
||||
|
||||
from jeeves.core.objects import Flow, Task
|
||||
from jeeves.core.registry import TaskRegistry
|
||||
|
||||
fake = Faker()
|
||||
|
||||
|
||||
class TaskFactory(factory.Factory):
|
||||
name = fake.sentence()
|
||||
type = factory.Iterator(TaskRegistry.tasks)
|
||||
parameters = {"script": "#!/bin/bash\necho test"}
|
||||
|
||||
class Meta:
|
||||
model = Task
|
||||
|
||||
|
||||
class FlowFactory(factory.Factory):
|
||||
name = fake.name()
|
||||
tasks = factory.LazyFunction(lambda: [TaskFactory() for _ in range(0, 2)])
|
||||
|
||||
class Meta:
|
||||
model = Flow
|
|
@ -0,0 +1,32 @@
|
|||
from jeeves.core.executor import Executor
|
||||
from .factories import FlowFactory, TaskFactory
|
||||
|
||||
|
||||
def test_executor_success_task_ok():
|
||||
task = TaskFactory(type="jeeves.core.tasks.stub:StubSuccessTask")
|
||||
flow = FlowFactory(tasks=[task])
|
||||
runner = Executor(flow)
|
||||
runner.start()
|
||||
assert runner._execution.steps[0].result
|
||||
assert runner._execution.steps[0].result.success is True
|
||||
assert runner._execution.success is True
|
||||
|
||||
|
||||
def test_executor_non_success_task_ok():
|
||||
task = TaskFactory(type="jeeves.core.tasks.stub:StubNonSuccessTask")
|
||||
flow = FlowFactory(tasks=[task])
|
||||
runner = Executor(flow)
|
||||
runner.start()
|
||||
assert runner._execution.steps[0].result
|
||||
assert runner._execution.steps[0].result.success is False
|
||||
assert runner._execution.success is False
|
||||
|
||||
|
||||
def test_executor_uncaught_exception_in_task_ok():
|
||||
task = TaskFactory(type="jeeves.core.tasks.stub:StubUncaughtExceptionTask")
|
||||
flow = FlowFactory(tasks=[task])
|
||||
runner = Executor(flow)
|
||||
runner.start()
|
||||
assert runner._execution.steps[0].result
|
||||
assert runner._execution.steps[0].result.success is False
|
||||
assert runner._execution.success is False
|
|
@ -0,0 +1,25 @@
|
|||
from jeeves.core.objects import BaseObject
|
||||
from jeeves.core.parsers import FlowParser
|
||||
from .factories import FlowFactory
|
||||
|
||||
EXPORTED_FLOW = {
|
||||
"name": "Test",
|
||||
"tasks": [
|
||||
{
|
||||
"type": "jeeves.core.tasks.stub:StubSuccessfulTask",
|
||||
"name": "Test task",
|
||||
"parameters": {},
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
def test_parser_object_to_dict_ok():
|
||||
obj = FlowFactory()
|
||||
result = FlowParser.to_dict(obj)
|
||||
obj = FlowParser.from_dict(result)
|
||||
|
||||
|
||||
def test_parser_dict_to_object_ok():
|
||||
obj = FlowParser.from_dict(EXPORTED_FLOW)
|
||||
assert isinstance(obj, BaseObject)
|
|
@ -0,0 +1,14 @@
|
|||
import pytest
|
||||
|
||||
from jeeves.core.registry import TaskRegistry
|
||||
from jeeves.core.tasks.base import Task
|
||||
|
||||
|
||||
def test_registry_get_task_cls_ok():
|
||||
task = TaskRegistry.get_task_cls("jeeves.core.tasks.stub:StubSuccessTask")
|
||||
assert issubclass(task, Task) and not isinstance(task, Task)
|
||||
|
||||
|
||||
def test_registry_get_task_ko():
|
||||
with pytest.raises(TaskRegistry.TaskDoesNotExist):
|
||||
TaskRegistry.get_task_cls("non.existant:task")
|
|
@ -0,0 +1,22 @@
|
|||
import pytest
|
||||
import pydantic
|
||||
|
||||
from jeeves.core.registry import TaskRegistry
|
||||
|
||||
|
||||
def test_task_with_empty_parameters_ok():
|
||||
task = TaskRegistry.get_task_cls("jeeves.core.tasks.stub:StubNoParametersTask")
|
||||
task()
|
||||
task(parameters=None)
|
||||
task(parameters={})
|
||||
|
||||
|
||||
def test_task_with_parameters_ok():
|
||||
task = TaskRegistry.get_task_cls("jeeves.core.tasks.stub:StubParametersTask")
|
||||
task(parameters=dict(mandatory="text", non_mandatory="text"))
|
||||
|
||||
|
||||
def test_task_with_parameters_ko():
|
||||
task = TaskRegistry.get_task_cls("jeeves.core.tasks.stub:StubParametersTask")
|
||||
with pytest.raises(pydantic.ValidationError):
|
||||
task(parameters=dict(thisshould="fail"))
|
|
@ -1,6 +1,6 @@
|
|||
from django.apps import AppConfig
|
||||
|
||||
from jeeves.registry import TaskRegistry
|
||||
from jeeves.core.registry import TaskRegistry
|
||||
|
||||
|
||||
class DBAppConfig(AppConfig):
|
||||
|
|
|
@ -5,7 +5,7 @@ import logging
|
|||
|
||||
from django.db import models
|
||||
|
||||
from jeeves.registry import TaskRegistry
|
||||
from jeeves.core.registry import TaskRegistry
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
|
@ -1,46 +0,0 @@
|
|||
import json
|
||||
|
||||
import pytest
|
||||
|
||||
from jeeves.db.models import Run, Flow, Task
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def task_model():
|
||||
TASK_DEFINITION = {"script": "#!/bin/bash\n\necho HELLO"}
|
||||
TASK = {
|
||||
"type": "jeeves.tasks.shell:ShellTask",
|
||||
"_definition": json.dumps(TASK_DEFINITION),
|
||||
}
|
||||
task = Task.objects.create(**TASK)
|
||||
yield task
|
||||
task.delete()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def flow_model(task_model):
|
||||
FLOW_DEFINITION = {"tasks": [str(task_model.id)]}
|
||||
FLOW = {"name": "Test", "_definition": json.dumps(FLOW_DEFINITION)}
|
||||
flow = Flow.objects.create(**FLOW)
|
||||
yield flow, task_model
|
||||
flow.delete()
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_task_from_reference_ok(task_model):
|
||||
assert Task.objects.from_reference(str(task_model.id)) == task_model
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_flow_definition_is_expanded_on_serialize_ok(flow_model):
|
||||
flow, task = flow_model
|
||||
flow_dict = flow.serialize()
|
||||
flow_dict["definition"]["tasks"][0] == task.definition
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_run_execute_flow_ok(flow_model):
|
||||
flow, task = flow_model
|
||||
run = flow.execute(foreground=True)
|
||||
assert run.status == Run.FINISHED
|
||||
assert run.success is True
|
|
@ -3,8 +3,8 @@ from django.views import View
|
|||
from django.contrib import messages
|
||||
from django.shortcuts import render, redirect
|
||||
|
||||
from jeeves.registry import TaskRegistry
|
||||
from jeeves.db.models import Run, Flow, Task
|
||||
from jeeves.core.registry import TaskRegistry
|
||||
from jeeves.frontend.forms import FlowForm, TaskForm
|
||||
|
||||
registry = TaskRegistry()
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
PROVIDED_TASKS = ["jeeves.tasks.shell:ShellTask"]
|
|
@ -1,14 +0,0 @@
|
|||
import logging
|
||||
|
||||
|
||||
class Task:
|
||||
params = None
|
||||
|
||||
@classmethod
|
||||
def get_namespace(cls):
|
||||
return f"{cls.__module__}:{cls.__name__}"
|
||||
|
||||
def __init__(self, **params):
|
||||
if hasattr(self, "Parameters"):
|
||||
self.parameters = self.Parameters(**params)
|
||||
self.logger = logging.getLogger(self.__class__.__name__)
|
|
@ -1,6 +0,0 @@
|
|||
from .base import Task
|
||||
|
||||
|
||||
class DummyTask(Task):
|
||||
def execute(self, message=None):
|
||||
self.logger.debug("Dummy task executed!")
|
|
@ -1,19 +0,0 @@
|
|||
import pytest
|
||||
|
||||
from jeeves.registry import TaskRegistry
|
||||
from jeeves.tasks.base import Task
|
||||
|
||||
|
||||
def test_registry_get_task_cls_ok():
|
||||
task = TaskRegistry.get_task("jeeves.tasks.dummy:DummyTask")
|
||||
assert issubclass(task, Task) and not isinstance(task, Task)
|
||||
|
||||
|
||||
def test_registry_get_task_instance_ok():
|
||||
task = TaskRegistry.get_task("jeeves.tasks.dummy:DummyTask", foo="bar")
|
||||
assert isinstance(task, Task)
|
||||
|
||||
|
||||
def test_registry_get_task_ko():
|
||||
with pytest.raises(TaskRegistry.TaskDoesNotExist):
|
||||
TaskRegistry.get_task("non.existant:task")
|
|
@ -23,6 +23,8 @@ ipdb = "^0.12.2"
|
|||
pytest = "^5.1"
|
||||
pytest-django = "^3.5"
|
||||
pytest-tldr = "^0.2.1"
|
||||
faker = "^2.0"
|
||||
factory-boy = "^2.12"
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry>=0.12"]
|
||||
|
|
|
@ -11,6 +11,6 @@ include_trailing_comma = True
|
|||
length_sort = 1
|
||||
lines_between_types = 0
|
||||
line_length = 88
|
||||
known_third_party = django,dramatiq,environ,pydantic,pytest
|
||||
known_third_party = django,dramatiq,environ,factory,faker,pydantic,pytest,toml
|
||||
sections = FUTURE, STDLIB, DJANGO, THIRDPARTY, FIRSTPARTY, LOCALFOLDER
|
||||
no_lines_before = LOCALFOLDER
|
||||
|
|
Reference in New Issue