fmartingr
/
jeeves
Archived
1
0
Fork 0

Task registry

This commit is contained in:
Felipe Martin 2019-09-29 14:30:19 +02:00
parent 9de33171e4
commit 3037e707f2
Signed by: fmartingr
GPG Key ID: 716BC147715E716F
10 changed files with 68 additions and 11 deletions

View File

@ -0,0 +1 @@
default_app_config = "jeeves.core.apps.CoreAppConfig"

View File

@ -3,6 +3,11 @@ from django.contrib import admin
# Register your models here.
from .models import Run, Flow, Task
class TaskAdmin(admin.ModelAdmin):
list_display = ("name", "type", "pk")
admin.site.register(Flow)
admin.site.register(Run)
admin.site.register(Task)
admin.site.register(Task, TaskAdmin)

View File

@ -1,5 +1,10 @@
from django.apps import AppConfig
from jeeves.registry import TaskRegistry
class CoreConfig(AppConfig):
name = "core"
class CoreAppConfig(AppConfig):
name = "jeeves.core"
def ready(self):
TaskRegistry.autodiscover()

View File

@ -1,5 +1,7 @@
import logging
from jeeves.tasks import PROVIDED_TASKS
class Singleton(type):
instance = None
@ -16,6 +18,26 @@ class TaskRegistry(metaclass=Singleton):
def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__)
@classmethod
def autodiscover(cls):
"""Loads all provided tasks."""
# TODO: Third party plugins
registry = cls()
for task_namespace in PROVIDED_TASKS:
registry.register_task(cls.get_task(task_namespace))
@classmethod
def register_task(cls, task):
registry = cls()
namespace = task.get_namespace()
if namespace in registry.tasks:
raise cls.TaskNamespaceConflict(
f"Namespace {namespace} is already registered"
)
registry.tasks[namespace] = task
@classmethod
def get_task(cls, namespace, **kwargs):
# Right now tasks are being imported and returned dinamically because it's easier,
@ -25,10 +47,16 @@ class TaskRegistry(metaclass=Singleton):
modulename, clsname = namespace.split(":")
try:
module = __import__(f"{modulename}", fromlist=(clsname,), level=0)
return getattr(module, clsname)(**kwargs)
task_cls = getattr(module, clsname)
if kwargs:
return task_cls(**kwargs)
return task_cls
except ModuleNotFoundError as error:
raise cls.TaskDoesNotExist(f"Error importing task {namespace}: {error.msg}")
class TaskNamespaceConflict(Exception):
pass
class TaskDoesNotExist(Exception):
"""
Used when there's a problem retrieving a task. More info will be available on the message.

View File

@ -0,0 +1 @@
PROVIDED_TASKS = ["jeeves.tasks.shell:ShellTask"]

View File

@ -9,6 +9,6 @@ class Task:
return f"{cls.__module__}:{cls.__name__}"
def __init__(self, **params):
for key, value in params.items():
setattr(self, key, value)
if hasattr(self, "Parameters"):
self.parameters = self.Parameters(**params)
self.logger = logging.getLogger(self.__class__.__name__)

View File

@ -1,15 +1,26 @@
import subprocess
from typing import Text
import pydantic
from jeeves.tasks.base import Task
class ShellTask(Task):
script: Text
verbose_name = "Execute Shell script"
class Parameters(pydantic.BaseModel):
script: Text
def execute(self, message=None):
# error = None
process = subprocess.run(self.script, shell=True, capture_output=True)
process = subprocess.run(
self.parameters.script, shell=True, capture_output=True
)
# if process.returncode > 0:
# error = {"error": process.stderr}
return process.returncode > 0
return (
process.returncode > 0,
process.stdout.decode("utf-8"),
process.stderr.decode("utf-8"),
)

View File

@ -4,8 +4,13 @@ from jeeves.registry import TaskRegistry
from jeeves.tasks.base import Task
def test_registry_get_task_ok():
def test_registry_get_task_cls_ok():
task = TaskRegistry.get_task("jeeves.tasks.dummy:DummyTask")
assert issubclass(task, Task) and not isinstance(task, Task)
def test_registry_get_task_instance_ok():
task = TaskRegistry.get_task("jeeves.tasks.dummy:DummyTask", foo="bar")
assert isinstance(task, Task)

View File

@ -9,6 +9,7 @@ python = "^3.7"
black = {version = "^18.3-alpha.0", allows-prereleases = true}
django = "^2.2"
django-environ = "^0.4.5"
pydantic = "^0.32.2"
[tool.poetry.dev-dependencies]
flake8 = "^3.7"
isort = "^4.3"

View File

@ -11,6 +11,6 @@ include_trailing_comma = True
length_sort = 1
lines_between_types = 0
line_length = 88
known_third_party = django,environ,pytest
known_third_party = django,environ,pydantic,pytest
sections = FUTURE, STDLIB, DJANGO, THIRDPARTY, FIRSTPARTY, LOCALFOLDER
no_lines_before = LOCALFOLDER