86 lines
2.4 KiB
Python
86 lines
2.4 KiB
Python
from datetime import datetime
|
|
|
|
from pydantic import BaseModel, EmailStr, Field, field_validator
|
|
|
|
from app.core.config import settings as app_settings
|
|
from app.core.cron_utils import CronValidationError, validate_cron_expression
|
|
|
|
|
|
class SettingsBase(BaseModel):
|
|
backup_retention_days: int = 7
|
|
log_retention_days: int = 7
|
|
export_cron: str = ''
|
|
binary_cron: str = ''
|
|
retention_cron: str = ''
|
|
enable_auto_export: bool = False
|
|
connection_test_interval_minutes: int = Field(default=0, ge=0, le=1440)
|
|
global_ssh_key: str | None = None
|
|
pushover_token: str | None = None
|
|
pushover_userkey: str | None = None
|
|
notify_failures_only: bool = True
|
|
smtp_host: str | None = None
|
|
smtp_port: int = 587
|
|
smtp_login: str | None = None
|
|
smtp_password: str | None = None
|
|
smtp_notifications_enabled: bool = False
|
|
recipient_email: EmailStr | None = None
|
|
|
|
@field_validator('export_cron', 'binary_cron', 'retention_cron', mode='before')
|
|
@classmethod
|
|
def normalize_cron(cls, value: str | None) -> str:
|
|
return (value or '').strip()
|
|
|
|
@field_validator('global_ssh_key', mode='before')
|
|
@classmethod
|
|
def normalize_key(cls, value: str | None) -> str | None:
|
|
normalized = (value or '').strip()
|
|
return normalized or None
|
|
|
|
@field_validator('export_cron', 'binary_cron', 'retention_cron')
|
|
@classmethod
|
|
def validate_cron(cls, value: str) -> str:
|
|
if not value:
|
|
return value
|
|
try:
|
|
validate_cron_expression(value, app_settings.timezone)
|
|
except CronValidationError as exc:
|
|
raise ValueError(f'Invalid cron expression: {exc}') from exc
|
|
return value
|
|
|
|
|
|
class SettingsUpdate(SettingsBase):
|
|
clear_global_ssh_key: bool = False
|
|
|
|
|
|
class SettingsResponse(SettingsBase):
|
|
id: int
|
|
has_global_ssh_key: bool = False
|
|
|
|
model_config = {'from_attributes': True}
|
|
|
|
|
|
class RevealSshKeyRequest(BaseModel):
|
|
password: str
|
|
|
|
|
|
class RevealSshKeyResponse(BaseModel):
|
|
global_ssh_key: str | None = None
|
|
|
|
|
|
class SchedulerJobStatus(BaseModel):
|
|
key: str
|
|
label: str
|
|
enabled: bool
|
|
cron: str | None = None
|
|
description: str
|
|
description_params: dict[str, str | int] | None = None
|
|
valid: bool
|
|
next_runs: list[datetime] = []
|
|
error: str | None = None
|
|
|
|
|
|
class SchedulerStatusResponse(BaseModel):
|
|
timezone: str
|
|
running: bool
|
|
jobs: list[SchedulerJobStatus]
|