fmartingr
/
jeeves
Archived
1
0
Fork 0
This repository has been archived on 2021-02-14. You can view files and clone it, but cannot push or open issues or pull requests.
jeeves/jeeves/core/objects.py

76 lines
1.7 KiB
Python

import shutil
import tempfile
from typing import Any, Dict, List, Text, Optional
from pathlib import Path
from dataclasses import field
import pydantic
from jeeves.core.registry import ActionRegistry
class BaseObject(pydantic.BaseModel):
pass
class Result(BaseObject):
output: Text = ""
success: bool = False
class Task(BaseObject):
name: Text
type: Text
parameters: Optional[Dict[Any, Any]] = None
@property
def action(self):
"""
Returns the instanced :any:`jeeves.core.actions.base.Action` defined in ``type`` for this ``Task``.
"""
return ActionRegistry.get_action_cls(self.type)(parameters=self.parameters)
class Flow(BaseObject):
name: Text
tasks: List[Task] = field(default_factory=list)
class ExecutionStep(BaseObject):
task: Task
result: Optional[Result] = None
class Workspace(BaseObject):
path: Path = None # type: ignore
@pydantic.validator("path", pre=True, always=True)
def path_default(cls, v): # pylint: disable=no-self-argument
"""
Ensures that a Workspace always have a path set up.
"""
return v or tempfile.mkdtemp(prefix="jeeves_")
def destroy(self):
"""
Removes the workspace path from the filesystem
"""
shutil.rmtree(self.path)
class Execution(BaseObject):
flow: Flow
steps: List[ExecutionStep]
workspace: Workspace = None # type: ignore
success: bool = False
@pydantic.validator("workspace", pre=True, always=True)
def workspace_default(cls, v): # pylint: disable=no-self-argument
return v or Workspace()
@property
def output(self):
for step in self.steps:
if step.result.output:
yield step.result.output