fmartingr
/
jeeves
Archived
1
0
Fork 0
This repository has been archived on 2021-02-14. You can view files and clone it, but cannot push or open issues or pull requests.
jeeves/jeeves/core/tests/test_models.py

47 lines
1.1 KiB
Python

import json
import pytest
from jeeves.core.models import Run, Flow, Task
@pytest.fixture
def task_model():
TASK_DEFINITION = {"script": "#!/bin/bash\n\necho HELLO"}
TASK = {
"type": "jeeves.tasks.shell.ShellTask",
"_definition": json.dumps(TASK_DEFINITION),
}
task = Task.objects.create(**TASK)
yield task
task.delete()
@pytest.fixture
def flow_model(task_model):
FLOW_DEFINITION = {"tasks": [str(task_model.id)]}
FLOW = {"name": "Test", "_definition": json.dumps(FLOW_DEFINITION)}
flow = Flow.objects.create(**FLOW)
yield flow, task_model
flow.delete()
@pytest.mark.django_db
def test_task_from_reference_ok(task_model):
assert Task.objects.from_reference(str(task_model.id)) == task_model
@pytest.mark.django_db
def test_flow_definition_is_expanded_on_serialize_ok(flow_model):
flow, task = flow_model
flow_dict = flow.serialize()
flow_dict["definition"]["tasks"][0] == task.definition
@pytest.mark.django_db
def test_run_execute_flow_ok(flow_model):
flow, task = flow_model
run = Run.execute_flow(flow)
assert run.status == Run.FINISHED
assert run.success is True