117 lines
4.7 KiB
Python
117 lines
4.7 KiB
Python
from __future__ import annotations
|
|
|
|
from ._shared import *
|
|
|
|
@bp.get('/automations')
|
|
def automations_get():
|
|
from ..services import automation_rules
|
|
profile = preferences.active_profile()
|
|
if not profile:
|
|
return ok({'rules': [], 'history': [], 'error': 'No profile'})
|
|
try:
|
|
return ok({'rules': automation_rules.list_rules(profile['id']), 'history': automation_rules.list_history(profile['id'])})
|
|
except Exception as exc:
|
|
return jsonify({'ok': False, 'error': str(exc), 'rules': [], 'history': []}), 500
|
|
|
|
|
|
|
|
@bp.get('/automations/export')
|
|
def automations_export():
|
|
from ..services import automation_rules
|
|
profile = preferences.active_profile()
|
|
if not profile:
|
|
return jsonify({'ok': False, 'error': 'No profile'}), 400
|
|
try:
|
|
# Note: JSON export is profile-scoped and excludes execution history/cooldown state.
|
|
data = automation_rules.export_rules(profile['id'])
|
|
return ok({'export': data, 'count': len(data.get('rules') or [])})
|
|
except Exception as exc:
|
|
return jsonify({'ok': False, 'error': str(exc)}), 400
|
|
|
|
|
|
|
|
@bp.post('/automations/import')
|
|
def automations_import():
|
|
from ..services import automation_rules
|
|
profile = preferences.active_profile()
|
|
if not profile:
|
|
return jsonify({'ok': False, 'error': 'No profile'}), 400
|
|
try:
|
|
payload = request.get_json(silent=True) or {}
|
|
replace = str(request.args.get('replace') or '').lower() in {'1', 'true', 'yes'} or bool(payload.get('replace')) if isinstance(payload, dict) else False
|
|
# Note: Import appends rules by default, so existing automations remain untouched.
|
|
imported = automation_rules.import_rules(profile['id'], payload, replace=replace)
|
|
return ok({'imported': len(imported), 'rules': automation_rules.list_rules(profile['id'])})
|
|
except Exception as exc:
|
|
return jsonify({'ok': False, 'error': str(exc)}), 400
|
|
|
|
|
|
|
|
@bp.post('/automations')
|
|
def automations_save():
|
|
from ..services import automation_rules
|
|
profile = preferences.active_profile()
|
|
if not profile:
|
|
return jsonify({'ok': False, 'error': 'No profile'}), 400
|
|
try:
|
|
rule = automation_rules.save_rule(profile['id'], request.get_json(silent=True) or {})
|
|
return ok({'rule': rule, 'rules': automation_rules.list_rules(profile['id'])})
|
|
except Exception as exc:
|
|
return jsonify({'ok': False, 'error': str(exc)}), 400
|
|
|
|
|
|
|
|
@bp.delete('/automations/<int:rule_id>')
|
|
def automations_delete(rule_id: int):
|
|
from ..services import automation_rules
|
|
profile = preferences.active_profile()
|
|
if not profile:
|
|
return jsonify({'ok': False, 'error': 'No profile'}), 400
|
|
try:
|
|
automation_rules.delete_rule(rule_id, profile['id'])
|
|
return ok({'rules': automation_rules.list_rules(profile['id'])})
|
|
except Exception as exc:
|
|
return jsonify({'ok': False, 'error': str(exc)}), 400
|
|
|
|
|
|
|
|
@bp.post('/automations/<int:rule_id>/run')
|
|
def automations_run_rule(rule_id: int):
|
|
from ..services import automation_rules
|
|
profile = preferences.active_profile()
|
|
if not profile:
|
|
return jsonify({'ok': False, 'error': 'No profile'}), 400
|
|
try:
|
|
# Note: Single-rule run ignores disabled state and cooldown for manual troubleshooting.
|
|
return ok({'result': automation_rules.check(profile, force=True, rule_id=rule_id), 'rules': automation_rules.list_rules(profile['id']), 'history': automation_rules.list_history(profile['id'])})
|
|
except Exception as exc:
|
|
return jsonify({'ok': False, 'error': str(exc)}), 500
|
|
|
|
|
|
@bp.post('/automations/check')
|
|
def automations_check():
|
|
from ..services import automation_rules
|
|
profile = preferences.active_profile()
|
|
if not profile:
|
|
return jsonify({'ok': False, 'error': 'No profile'}), 400
|
|
try:
|
|
# Note: Force check ignores disabled state and cooldown, allowing a one-off manual automation pass.
|
|
return ok({'result': automation_rules.check(profile, force=True), 'rules': automation_rules.list_rules(profile['id']), 'history': automation_rules.list_history(profile['id'])})
|
|
except Exception as exc:
|
|
return jsonify({'ok': False, 'error': str(exc)}), 500
|
|
|
|
|
|
|
|
@bp.delete('/automations/history')
|
|
def automations_history_clear():
|
|
from ..services import automation_rules
|
|
profile = preferences.active_profile()
|
|
if not profile:
|
|
return jsonify({'ok': False, 'error': 'No profile'}), 400
|
|
try:
|
|
# Note: Clear only automation execution logs; rules and cooldown state stay unchanged.
|
|
deleted = automation_rules.clear_history(profile['id'])
|
|
return ok({'deleted': deleted, 'history': automation_rules.list_history(profile['id']), 'cleanup': cleanup_summary()})
|
|
except Exception as exc:
|
|
return jsonify({'ok': False, 'error': str(exc)}), 500
|