from __future__ import annotations from ._shared import * @bp.get('/automations') def automations_get(): from ..services import automation_rules profile = preferences.active_profile() if not profile: return ok({'rules': [], 'history': [], 'error': 'No profile'}) try: return ok({'rules': automation_rules.list_rules(profile['id']), 'history': automation_rules.list_history(profile['id'])}) except Exception as exc: return jsonify({'ok': False, 'error': str(exc), 'rules': [], 'history': []}), 500 @bp.get('/automations/export') def automations_export(): from ..services import automation_rules profile = preferences.active_profile() if not profile: return jsonify({'ok': False, 'error': 'No profile'}), 400 try: # Note: JSON export is profile-scoped and excludes execution history/cooldown state. data = automation_rules.export_rules(profile['id']) return ok({'export': data, 'count': len(data.get('rules') or [])}) except Exception as exc: return jsonify({'ok': False, 'error': str(exc)}), 400 @bp.post('/automations/import') def automations_import(): from ..services import automation_rules profile = preferences.active_profile() if not profile: return jsonify({'ok': False, 'error': 'No profile'}), 400 try: payload = request.get_json(silent=True) or {} replace = str(request.args.get('replace') or '').lower() in {'1', 'true', 'yes'} or bool(payload.get('replace')) if isinstance(payload, dict) else False # Note: Import appends rules by default, so existing automations remain untouched. imported = automation_rules.import_rules(profile['id'], payload, replace=replace) return ok({'imported': len(imported), 'rules': automation_rules.list_rules(profile['id'])}) except Exception as exc: return jsonify({'ok': False, 'error': str(exc)}), 400 @bp.post('/automations') def automations_save(): from ..services import automation_rules profile = preferences.active_profile() if not profile: return jsonify({'ok': False, 'error': 'No profile'}), 400 try: rule = automation_rules.save_rule(profile['id'], request.get_json(silent=True) or {}) return ok({'rule': rule, 'rules': automation_rules.list_rules(profile['id'])}) except Exception as exc: return jsonify({'ok': False, 'error': str(exc)}), 400 @bp.delete('/automations/') def automations_delete(rule_id: int): from ..services import automation_rules profile = preferences.active_profile() if not profile: return jsonify({'ok': False, 'error': 'No profile'}), 400 try: automation_rules.delete_rule(rule_id, profile['id']) return ok({'rules': automation_rules.list_rules(profile['id'])}) except Exception as exc: return jsonify({'ok': False, 'error': str(exc)}), 400 @bp.post('/automations//run') def automations_run_rule(rule_id: int): from ..services import automation_rules profile = preferences.active_profile() if not profile: return jsonify({'ok': False, 'error': 'No profile'}), 400 try: # Note: Single-rule run ignores disabled state and cooldown for manual troubleshooting. return ok({'result': automation_rules.check(profile, force=True, rule_id=rule_id), 'rules': automation_rules.list_rules(profile['id']), 'history': automation_rules.list_history(profile['id'])}) except Exception as exc: return jsonify({'ok': False, 'error': str(exc)}), 500 @bp.post('/automations/check') def automations_check(): from ..services import automation_rules profile = preferences.active_profile() if not profile: return jsonify({'ok': False, 'error': 'No profile'}), 400 try: # Note: Force check ignores disabled state and cooldown, allowing a one-off manual automation pass. return ok({'result': automation_rules.check(profile, force=True), 'rules': automation_rules.list_rules(profile['id']), 'history': automation_rules.list_history(profile['id'])}) except Exception as exc: return jsonify({'ok': False, 'error': str(exc)}), 500 @bp.delete('/automations/history') def automations_history_clear(): from ..services import automation_rules profile = preferences.active_profile() if not profile: return jsonify({'ok': False, 'error': 'No profile'}), 400 try: # Note: Clear only automation execution logs; rules and cooldown state stay unchanged. deleted = automation_rules.clear_history(profile['id']) return ok({'deleted': deleted, 'history': automation_rules.list_history(profile['id']), 'cleanup': cleanup_summary()}) except Exception as exc: return jsonify({'ok': False, 'error': str(exc)}), 500