@@ -306,6 +306,7 @@ class ListPermission(db.Model):
created_at = db . Column ( db . DateTime , default = datetime . utcnow )
__table_args__ = ( db . UniqueConstraint ( " list_id " , " user_id " , name = " uq_list_user " ) , )
ShoppingList . permitted_users = db . relationship (
" User " ,
secondary = " list_permission " ,
@@ -467,7 +468,7 @@ def get_total_expense_for_list(list_id, start_date=None, end_date=None):
def update_list_categories_from_form ( shopping_list , form ) :
raw_vals = form . getlist ( " categories " )
raw_vals = form . getlist ( " categories " )
candidate_ids = set ( )
for v in raw_vals :
@@ -562,17 +563,22 @@ def redirect_with_flash(
flash ( message , category )
return redirect ( url_for ( endpoint ) )
def can_view_list ( sl : ShoppingList ) - > bool :
if current_user . is_authenticated :
if sl . owner_id == current_user . id :
return True
if sl . is_public :
return True
return db . session . query ( ListPermission . id ) . filter_by (
list_id = sl . id , user_id = current_user . id
) . first ( ) is not None
return (
db . session . query ( ListPermission . id )
. filter_by ( list_id = sl . id , user_id = current_user . id )
. first ( )
is not None
)
return bool ( sl . is_public )
def db_bucket ( col , kind : str = " month " ) :
name = db . engine . name # 'sqlite', 'mysql', 'mariadb', 'postgresql', ...
kind = ( kind or " month " ) . lower ( )
@@ -587,9 +593,9 @@ def db_bucket(col, kind: str = "month"):
if kind == " week " :
if name == " sqlite " :
return func . printf ( " %s -W %s " ,
func . strftime ( " % Y " , col ) ,
func . strftime ( " % W " , col ) )
return func . printf (
" %s -W %s " , func . strftime ( " % Y " , col ) , func . strftime ( " % W " , col )
)
elif name in ( " mysql " , " mariadb " ) :
return func . date_format ( col , " %x -W % v " )
else :
@@ -630,6 +636,7 @@ def user_permission_subq(user_id):
ListPermission . user_id == user_id
)
def admin_required ( f ) :
@wraps ( f )
def decorated_function ( * args , * * kwargs ) :
@@ -851,6 +858,7 @@ def category_to_color(name):
r , g , b = colorsys . hls_to_rgb ( hue , lightness , saturation )
return f " # { int ( r * 255 ) : 02x } { int ( g * 255 ) : 02x } { int ( b * 255 ) : 02x } "
def get_total_expenses_grouped_by_category (
show_all , range_type , start_date , end_date , user_id , category_id = None
) :
@@ -885,13 +893,13 @@ def get_total_expenses_grouped_by_category(
except ( TypeError , ValueError ) :
pass
# ZAKRES: zawsze po created_at LISTY
if start_date and end_date :
try :
dt_start = datetime . strptime ( start_date , " % Y- % m- %d " )
dt_end = datetime . strptime ( end_date , " % Y- % m- %d " ) + timedelta ( days = 1 )
lists_q = lists_q . filter ( ShoppingList . created_at > = dt_start ,
ShoppingList . created_at < dt_end )
dt_end = datetime . strptime ( end_date , " % Y- % m- %d " ) + timedelta ( days = 1 )
lists_q = lists_q . filter (
ShoppingList . created_at > = dt_start , ShoppingList . created_at < dt_end
)
except Exception :
return { " error " : " Błędne daty " }
@@ -899,7 +907,6 @@ def get_total_expenses_grouped_by_category(
if not lists :
return { " labels " : [ ] , " datasets " : [ ] }
# SUMY: po wszystkich wydatkach tych list (bez filtra dat po Expense)
list_ids = [ l . id for l in lists ]
totals = (
db . session . query (
@@ -912,7 +919,6 @@ def get_total_expenses_grouped_by_category(
)
expense_map = { lid : float ( total or 0 ) for lid , total in totals }
# bucket wg created_at LISTY
def bucket_from_dt ( ts : datetime ) - > str :
if range_type == " daily " :
return ts . strftime ( " % Y- % m- %d " )
@@ -948,7 +954,7 @@ def get_total_expenses_grouped_by_category(
data_map [ key ] [ c . name ] + = total_expense
labels = sorted ( all_labels )
cats = sorted ( { cat for b in data_map . values ( ) for cat , v in b . items ( ) if v > 0 } )
cats = sorted ( { cat for b in data_map . values ( ) for cat , v in b . items ( ) if v > 0 } )
datasets = [
{
@@ -960,6 +966,7 @@ def get_total_expenses_grouped_by_category(
]
return { " labels " : labels , " datasets " : datasets }
def get_total_expenses_grouped_by_list_created_at (
user_only = False ,
admin = False ,
@@ -1006,13 +1013,13 @@ def get_total_expenses_grouped_by_list_created_at(
except ( TypeError , ValueError ) :
pass
# ZAKRES: zawsze po created_at LISTY
if start_date and end_date :
try :
dt_start = datetime . strptime ( start_date , " % Y- % m- %d " )
dt_end = datetime . strptime ( end_date , " % Y- % m- %d " ) + timedelta ( days = 1 )
lists_q = lists_q . filter ( ShoppingList . created_at > = dt_start ,
ShoppingList . created_at < dt_end )
dt_end = datetime . strptime ( end_date , " % Y- % m- %d " ) + timedelta ( days = 1 )
lists_q = lists_q . filter (
ShoppingList . created_at > = dt_start , ShoppingList . created_at < dt_end
)
except Exception :
return { " error " : " Błędne daty " }
@@ -1020,7 +1027,6 @@ def get_total_expenses_grouped_by_list_created_at(
if not lists :
return { " labels " : [ ] , " expenses " : [ ] }
# SUMY: po wszystkich wydatkach tych list (bez filtra dat po Expense)
list_ids = [ l . id for l in lists ]
totals = (
db . session . query (
@@ -1033,7 +1039,6 @@ def get_total_expenses_grouped_by_list_created_at(
)
expense_map = { lid : float ( total or 0 ) for lid , total in totals }
# bucket wg created_at LISTY
def bucket_from_dt ( ts : datetime ) - > str :
if range_type == " daily " :
return ts . strftime ( " % Y- % m- %d " )
@@ -1056,6 +1061,7 @@ def get_total_expenses_grouped_by_list_created_at(
expenses = [ round ( grouped [ l ] , 2 ) for l in labels ]
return { " labels " : labels , " expenses " : expenses }
def resolve_range ( range_type : str ) :
now = datetime . now ( timezone . utc )
sd = ed = None
@@ -1069,7 +1075,7 @@ def resolve_range(range_type: str):
elif rt in ( " last30days " , " last_30_days " ) :
sd = ( now - timedelta ( days = 30 ) ) . date ( ) . strftime ( " % Y- % m- %d " )
ed = now . date ( ) . strftime ( " % Y- % m- %d " )
bucket = " monthly "
bucket = " monthly "
elif rt in ( " last90days " , " last_90_days " ) :
sd = ( now - timedelta ( days = 90 ) ) . date ( ) . strftime ( " % Y- % m- %d " )
ed = now . date ( ) . strftime ( " % Y- % m- %d " )
@@ -1079,6 +1085,18 @@ def resolve_range(range_type: str):
sd = first . date ( ) . strftime ( " % Y- % m- %d " )
ed = now . date ( ) . strftime ( " % Y- % m- %d " )
bucket = " monthly "
elif rt in (
" currentmonth " ,
" thismonth " ,
" this_month " ,
" monthtodate " ,
" month_to_date " ,
" mtd " ,
) :
first = datetime ( now . year , now . month , 1 , tzinfo = timezone . utc )
sd = first . date ( ) . strftime ( " % Y- % m- %d " )
ed = now . date ( ) . strftime ( " % Y- % m- %d " )
bucket = " monthly "
return sd , ed , bucket
@@ -1493,9 +1511,7 @@ def favicon():
@app.route ( " / " )
def main_page ( ) :
perm_subq = (
user_permission_subq ( current_user . id )
if current_user . is_authenticated
else None
user_permission_subq ( current_user . id ) if current_user . is_authenticated else None
)
now = datetime . now ( timezone . utc )
@@ -1604,8 +1620,12 @@ def main_page():
db . session . query (
Item . list_id ,
func . count ( Item . id ) . label ( " total_count " ) ,
func . sum ( case ( ( Item . purchased == True , 1 ) , else_ = 0 ) ) . label ( " purchased_count " ) ,
func . sum ( case ( ( Item . not_purchased == True , 1 ) , else_ = 0 ) ) . label ( " not_purchased_count " ) ,
func . sum ( case ( ( Item . purchased == True , 1 ) , else_ = 0 ) ) . label (
" purchased_count "
) ,
func . sum ( case ( ( Item . not_purchased == True , 1 ) , else_ = 0 ) ) . label (
" not_purchased_count "
) ,
)
. filter ( Item . list_id . in_ ( all_ids ) )
. group_by ( Item . list_id )
@@ -1630,12 +1650,17 @@ def main_page():
)
for l in all_lists :
total_count , purchased_count , not_purchased_count = stats_map . get ( l . id , ( 0 , 0 , 0 ) )
total_count , purchased_count , not_purchased_count = stats_map . get (
l . id , ( 0 , 0 , 0 )
)
l . total_count = total_count
l . purchased_count = purchased_count
l . not_purchased_count = not_purchased_count
l . total_expense = latest_expenses_map . get ( l . id , 0 )
l . category_badges = [ { " name " : c . name , " color " : category_to_color ( c . name ) } for c in l . categories ]
l . category_badges = [
{ " name " : c . name , " color " : category_to_color ( c . name ) }
for c in l . categories
]
else :
for l in all_lists :
l . total_count = 0
@@ -1987,9 +2012,9 @@ def view_list(list_id):
@login_required
def expenses ( ) :
start_date_str = request . args . get ( " start_date " )
end_date_str = request . args . get ( " end_date " )
category_id = request . args . get ( " category_id " , type = str )
show_all = request . args . get ( " show_all " , " true " ) . lower ( ) == " true "
end_date_str = request . args . get ( " end_date " )
category_id = request . args . get ( " category_id " , type = str )
show_all = request . args . get ( " show_all " , " true " ) . lower ( ) == " true "
now = datetime . now ( timezone . utc )
@@ -2002,10 +2027,10 @@ def expenses():
if start_date_str and end_date_str :
try :
start = datetime . strptime ( start_date_str , " % Y- % m- %d " )
end = datetime . strptime ( end_date_str , " % Y- % m- %d " ) + timedelta ( days = 1 )
end = datetime . strptime ( end_date_str , " % Y- % m- %d " ) + timedelta ( days = 1 )
lists_q = lists_q . filter (
ShoppingList . created_at > = start ,
ShoppingList . created_at < end ,
ShoppingList . created_at < end ,
)
except ValueError :
flash ( " Błędny zakres dat " , " danger " )
@@ -2024,16 +2049,16 @@ def expenses():
pass
lists_filtered = (
lists_q
. options ( joinedload ( ShoppingList . owner ) , joinedload ( ShoppingList . categories ) )
lists_q . options (
joinedload ( ShoppingList . owner ) , joinedload ( ShoppingList . categories )
)
. order_by ( ShoppingList . created_at . desc ( ) )
. all ( )
)
list_ids = [ l . id for l in lists_filtered ] or [ - 1 ]
expenses = (
Expense . query
. options (
Expense . query . options (
joinedload ( Expense . shopping_list ) . joinedload ( ShoppingList . owner ) ,
joinedload ( Expense . shopping_list ) . joinedload ( ShoppingList . categories ) ,
)
@@ -2056,9 +2081,12 @@ def expenses():
totals_map = { row . lid : float ( row . total_expense or 0 ) for row in totals_rows }
categories = (
Category . query
. join ( shopping_list_category , shopping_list_category . c . category_id == Category . id )
. join ( ShoppingList , ShoppingList . id == shopping_list_category . c . shopping_list_id )
Category . query . join (
shopping_list_category , shopping_list_category . c . category_id == Category . id
)
. join (
ShoppingList , ShoppingList . id == shopping_list_category . c . shopping_list_id
)
. filter ( ShoppingList . id . in_ ( list_ids ) )
. distinct ( )
. order_by ( Category . name . asc ( ) )
@@ -2068,8 +2096,8 @@ def expenses():
expense_table = [
{
" title " : ( e . shopping_list . title if e . shopping_list else " Nieznana " ) ,
" amount " : e . amount ,
" title " : ( e . shopping_list . title if e . shopping_list else " Nieznana " ) ,
" amount " : e . amount ,
" added_at " : e . added_at ,
}
for e in expenses
@@ -2102,8 +2130,8 @@ def expenses():
def expenses_data ( ) :
range_type = request . args . get ( " range " , " monthly " )
start_date = request . args . get ( " start_date " )
end_date = request . args . get ( " end_date " )
show_all = request . args . get ( " show_all " , " true " ) . lower ( ) == " true "
end_date = request . args . get ( " end_date " )
show_all = request . args . get ( " show_all " , " true " ) . lower ( ) == " true "
category_id = request . args . get ( " category_id " )
by_category = request . args . get ( " by_category " , " false " ) . lower ( ) == " true "
@@ -2111,7 +2139,7 @@ def expenses_data():
sd , ed , bucket = resolve_range ( range_type )
if sd and ed :
start_date = sd
end_date = ed
end_date = ed
range_type = bucket
if by_category :
@@ -2141,7 +2169,7 @@ def expenses_data():
@app.route ( " /share/<token> " )
#@app.route("/guest-list/<int:list_id>")
# @app.route("/guest-list/<int:list_id>")
@app.route ( " /shared/<int:list_id> " )
def shared_list ( token = None , list_id = None ) :
now = datetime . now ( timezone . utc )
@@ -2150,7 +2178,11 @@ def shared_list(token=None, list_id=None):
shopping_list = ShoppingList . query . filter_by ( share_token = token ) . first_or_404 ( )
# jeśli lista wygasła – zablokuj (spójne z resztą aplikacji)
if shopping_list . is_temporary and shopping_list . expires_at and shopping_list . expires_at < = now :
if (
shopping_list . is_temporary
and shopping_list . expires_at
and shopping_list . expires_at < = now
) :
flash ( " Link wygasł. " , " warning " )
return redirect ( url_for ( " main_page " ) )
@@ -2203,7 +2235,6 @@ def shared_list(token=None, list_id=None):
)
@app.route ( " /copy/<int:list_id> " )
@login_required
def copy_list ( list_id ) :
@@ -3490,15 +3521,11 @@ def add_suggestion():
return redirect ( url_for ( " list_products " ) )
# ── Admin: zarządzanie dostępem do list ───────────────────────────────────────
@app.route ( " /admin/lists-access " , methods = [ " GET " , " POST " ] )
@login_required
@admin_required
def admin_lists_access ( ) :
# Prosta autoryzacja admina – dostosuj do swojej aplikacji
if not getattr ( current_user , " is_admin " , False ) :
abort ( 403 )
# Paginacja
try :
page = int ( request . args . get ( " page " , 1 ) )
except ValueError :
@@ -3509,19 +3536,14 @@ def admin_lists_access():
per_page = 25
per_page = max ( 1 , min ( 100 , per_page ) )
# Filtrowanie bazowe (bez archiwalnych? – tutaj pokazujemy wszystkie)
q = (
ShoppingList . query
. options ( db . joinedload ( ShoppingList . owner ) )
. order_by ( ShoppingList . created_at . desc ( ) )
q = ShoppingList . query . options ( db . joinedload ( ShoppingList . owner ) ) . order_by (
ShoppingList . created_at . desc ( )
)
# POST: grant/revoke per-wiersz + zbiorcza zmiana statusów
if request . method == " POST " :
action = request . form . get ( " action " )
target_list_id = request . form . get ( " target_list_id " , type = int )
# Grant pojedynczy
if action == " grant " and target_list_id :
login = ( request . form . get ( " grant_username " ) or " " ) . strip ( ) . lower ( )
l = db . session . get ( ShoppingList , target_list_id )
@@ -3545,36 +3567,32 @@ def admin_lists_access():
flash ( " Ten użytkownik już ma dostęp. " , " info " )
return redirect ( request . url )
# Revoke pojedynczy
if action == " revoke " and target_list_id :
uid = request . form . get ( " revoke_user_id " , type = int )
if uid :
ListPermission . query . filter_by ( list_id = target_list_id , user_id = uid ) . delete ( )
ListPermission . query . filter_by (
list_id = target_list_id , user_id = uid
) . delete ( )
db . session . commit ( )
flash ( " Odebrano dostęp użytkownikowi. " , " success " )
return redirect ( request . url )
# Zbiorcze zapisy statusów (checkboxy wierszy)
if action == " save_changes " :
# Zaktualizuj pola is_public / is_temporary / is_archived na podstawie POST
# Wysyłamy identyfikatory wszystkich list widocznych na stronie w ukrytym polu multiple
ids = request . form . getlist ( " visible_ids " , type = int )
if ids :
lists = ShoppingList . query . filter ( ShoppingList . id . in_ ( ids ) ) . all ( )
posted = request . form
for l in lists :
l . is_public = ( posted . get ( f " is_public_ { l . id } " ) is not None )
l . is_temporary = ( posted . get ( f " is_temporary_ { l . id } " ) is not None )
l . is_archived = ( posted . get ( f " is_archived_ { l . id } " ) is not None )
l . is_public = posted . get ( f " is_public_ { l . id } " ) is not None
l . is_temporary = posted . get ( f " is_temporary_ { l . id } " ) is not None
l . is_archived = posted . get ( f " is_archived_ { l . id } " ) is not None
db . session . commit ( )
flash ( " Zapisano zmiany statusów. " , " success " )
return redirect ( request . url )
# Dane do tabeli
pagination = q . paginate ( page = page , per_page = per_page , error_out = False )
lists = pagination . items
# Zbierz uprawnionych per lista (1 zapytanie)
list_ids = [ l . id for l in lists ]
perms = (
db . session . query (
@@ -3592,8 +3610,6 @@ def admin_lists_access():
for lid , uid , uname in perms :
permitted_by_list [ lid ] . append ( { " id " : uid , " username " : uname } )
# Query-string do paginacji
query_string = f " per_page= { per_page } "
return render_template (