python-reflexion/03-taches/demo.py
2025-10-21 20:38:10 +02:00

41 lines
1 KiB
Python

from base import BaseJob, Job, Data
from collections.abc import Sequence
def load_object(name: str) -> BaseJob | Job:
from importlib import import_module
module_name, obj_name = name.split(':')
module = import_module(module_name)
return getattr(module, obj_name)
def load_jobs() -> Sequence[Job]:
import inspect
import yaml
with open('config.yml') as config_file:
config = yaml.load(config_file, Loader=yaml.Loader)
jobs = []
for job_def in config['jobs']:
obj = load_object(job_def['name'])
job_config = job_def.get('config', {})
if inspect.isfunction(obj):
jobs.append(obj)
elif inspect.isclass(obj):
job = obj(**job_config)
jobs.append(job.run)
return jobs
def run_jobs(jobs: Sequence[Job], data: Data) -> Data:
for job in jobs:
data = job(data)
return data
if __name__ == '__main__':
import pprint
plugins = load_jobs()
pprint.pprint(run_jobs(plugins, {'domain': 'pycon.fr'}))