Initial commit
This commit is contained in:
276
main.py
Normal file
276
main.py
Normal file
@ -0,0 +1,276 @@
|
||||
#!/usr/bin/env python3
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
import sqlite3
|
||||
from datetime import datetime
|
||||
from flask import Flask, request, jsonify, render_template, redirect, url_for
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
import plotly.express as px
|
||||
import pandas as pd
|
||||
|
||||
# Setup logging
|
||||
logging.basicConfig(
|
||||
level=os.environ.get("LOG_LEVEL", "INFO"),
|
||||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||||
)
|
||||
logger = logging.getLogger("expense_tracker")
|
||||
|
||||
# Load configuration
|
||||
try:
|
||||
with open("/data/options.json", "r") as options_file:
|
||||
config = json.load(options_file)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to load options.json, using defaults: {e}")
|
||||
config = {
|
||||
"database_path": "expense_tracker.db", # Local file in current directory
|
||||
"log_level": "info",
|
||||
}
|
||||
|
||||
# Set log level from config
|
||||
log_level = getattr(logging, config["log_level"].upper(), logging.INFO)
|
||||
logger.setLevel(log_level)
|
||||
|
||||
# Initialize Flask app
|
||||
app = Flask(__name__)
|
||||
app.config["SECRET_KEY"] = os.urandom(24)
|
||||
app.config["SQLALCHEMY_DATABASE_URI"] = f"sqlite:///{config['database_path']}"
|
||||
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
|
||||
|
||||
# Initialize database
|
||||
db = SQLAlchemy(app)
|
||||
|
||||
# Define database models
|
||||
class Category(db.Model):
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
name = db.Column(db.String(50), nullable=False, unique=True)
|
||||
description = db.Column(db.String(200))
|
||||
expenses = db.relationship("Expense", backref="category", lazy=True)
|
||||
|
||||
class Expense(db.Model):
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
amount = db.Column(db.Float, nullable=False)
|
||||
description = db.Column(db.String(200))
|
||||
date = db.Column(db.DateTime, nullable=False, default=datetime.now)
|
||||
category_id = db.Column(db.Integer, db.ForeignKey("category.id"), nullable=False)
|
||||
|
||||
# Create database tables
|
||||
with app.app_context():
|
||||
db.create_all()
|
||||
|
||||
# Add default categories if none exist
|
||||
if Category.query.count() == 0:
|
||||
default_categories = [
|
||||
Category(name="Lebensmittel", description="Ausgaben für Lebensmittel und Getränke"),
|
||||
Category(name="Transport", description="Ausgaben für öffentliche Verkehrsmittel, Taxi, etc."),
|
||||
Category(name="Freizeit", description="Ausgaben für Unterhaltung, Hobbys, etc."),
|
||||
Category(name="Wohnen", description="Miete, Nebenkosten, Reparaturen"),
|
||||
Category(name="Gesundheit", description="Medikamente, Arztbesuche"),
|
||||
Category(name="Sonstiges", description="Sonstige Ausgaben")
|
||||
]
|
||||
for category in default_categories:
|
||||
db.session.add(category)
|
||||
db.session.commit()
|
||||
logger.info("Added default categories")
|
||||
|
||||
# Routes
|
||||
@app.route("/")
|
||||
def index():
|
||||
categories = Category.query.all()
|
||||
expenses = Expense.query.order_by(Expense.date.desc()).limit(10).all()
|
||||
|
||||
# Calculate total expenses
|
||||
total_expenses = db.session.query(db.func.sum(Expense.amount)).scalar() or 0
|
||||
|
||||
# Calculate expenses by category
|
||||
category_expenses = db.session.query(
|
||||
Category.name, db.func.sum(Expense.amount)
|
||||
).join(Expense).group_by(Category.name).all()
|
||||
|
||||
# Prepare data for charts
|
||||
category_names = [item[0] for item in category_expenses]
|
||||
category_amounts = [float(item[1]) for item in category_expenses]
|
||||
|
||||
return render_template(
|
||||
"index.html",
|
||||
categories=categories,
|
||||
expenses=expenses,
|
||||
total_expenses=total_expenses,
|
||||
category_names=json.dumps(category_names),
|
||||
category_amounts=json.dumps(category_amounts),
|
||||
now=datetime.now()
|
||||
)
|
||||
|
||||
@app.route("/expenses", methods=["GET", "POST"])
|
||||
def expenses():
|
||||
if request.method == "POST":
|
||||
# Add new expense
|
||||
amount = float(request.form["amount"])
|
||||
description = request.form["description"]
|
||||
category_id = int(request.form["category_id"])
|
||||
date_str = request.form["date"]
|
||||
|
||||
# Parse date from form
|
||||
date = datetime.strptime(date_str, "%Y-%m-%d")
|
||||
|
||||
# Create new expense
|
||||
expense = Expense(
|
||||
amount=amount,
|
||||
description=description,
|
||||
category_id=category_id,
|
||||
date=date
|
||||
)
|
||||
|
||||
db.session.add(expense)
|
||||
db.session.commit()
|
||||
|
||||
return redirect(url_for("expenses"))
|
||||
|
||||
# GET request - show expenses and form
|
||||
categories = Category.query.all()
|
||||
expenses = Expense.query.order_by(Expense.date.desc()).all()
|
||||
|
||||
return render_template(
|
||||
"expenses.html",
|
||||
categories=categories,
|
||||
expenses=expenses,
|
||||
today=datetime.now().strftime("%Y-%m-%d")
|
||||
)
|
||||
|
||||
@app.route("/categories", methods=["GET", "POST"])
|
||||
def categories():
|
||||
if request.method == "POST":
|
||||
# Add new category
|
||||
name = request.form["name"]
|
||||
description = request.form["description"]
|
||||
|
||||
# Check if category already exists
|
||||
existing = Category.query.filter_by(name=name).first()
|
||||
if existing:
|
||||
return render_template(
|
||||
"categories.html",
|
||||
categories=Category.query.all(),
|
||||
error="Kategorie existiert bereits"
|
||||
)
|
||||
|
||||
# Create new category
|
||||
category = Category(name=name, description=description)
|
||||
db.session.add(category)
|
||||
db.session.commit()
|
||||
|
||||
return redirect(url_for("categories"))
|
||||
|
||||
# GET request - show categories and form
|
||||
categories = Category.query.all()
|
||||
return render_template("categories.html", categories=categories)
|
||||
|
||||
@app.route("/api/expenses", methods=["GET", "POST"])
|
||||
def api_expenses():
|
||||
if request.method == "POST":
|
||||
# Add new expense via API
|
||||
data = request.json
|
||||
|
||||
if not data or "amount" not in data or "category_id" not in data:
|
||||
return jsonify({"error": "Invalid data"}), 400
|
||||
|
||||
# Create new expense
|
||||
expense = Expense(
|
||||
amount=float(data["amount"]),
|
||||
description=data.get("description", ""),
|
||||
category_id=int(data["category_id"]),
|
||||
date=datetime.now() if "date" not in data else datetime.fromisoformat(data["date"])
|
||||
)
|
||||
|
||||
db.session.add(expense)
|
||||
db.session.commit()
|
||||
|
||||
return jsonify({"id": expense.id, "status": "success"}), 201
|
||||
|
||||
# GET request - return expenses as JSON
|
||||
expenses = Expense.query.order_by(Expense.date.desc()).all()
|
||||
result = []
|
||||
|
||||
for expense in expenses:
|
||||
result.append({
|
||||
"id": expense.id,
|
||||
"amount": expense.amount,
|
||||
"description": expense.description,
|
||||
"date": expense.date.isoformat(),
|
||||
"category_id": expense.category_id,
|
||||
"category_name": expense.category.name
|
||||
})
|
||||
|
||||
return jsonify(result)
|
||||
|
||||
@app.route("/api/categories", methods=["GET"])
|
||||
def api_categories():
|
||||
categories = Category.query.all()
|
||||
result = []
|
||||
|
||||
for category in categories:
|
||||
result.append({
|
||||
"id": category.id,
|
||||
"name": category.name,
|
||||
"description": category.description
|
||||
})
|
||||
|
||||
return jsonify(result)
|
||||
|
||||
@app.route("/reports")
|
||||
def reports():
|
||||
# Get date range filter
|
||||
start_date = request.args.get("start_date")
|
||||
end_date = request.args.get("end_date")
|
||||
|
||||
query = db.session.query(
|
||||
Expense.date,
|
||||
Category.name.label("category"),
|
||||
Expense.amount
|
||||
).join(Category)
|
||||
|
||||
# Apply date filters if provided
|
||||
if start_date:
|
||||
query = query.filter(Expense.date >= datetime.strptime(start_date, "%Y-%m-%d"))
|
||||
if end_date:
|
||||
query = query.filter(Expense.date <= datetime.strptime(end_date, "%Y-%m-%d"))
|
||||
|
||||
# Execute query and convert to DataFrame
|
||||
expenses_data = query.all()
|
||||
df = pd.DataFrame(expenses_data, columns=["date", "category", "amount"])
|
||||
|
||||
# Generate charts
|
||||
if not df.empty:
|
||||
# Expenses by category pie chart
|
||||
fig_category = px.pie(
|
||||
df,
|
||||
values="amount",
|
||||
names="category",
|
||||
title="Ausgaben nach Kategorie"
|
||||
)
|
||||
category_chart = fig_category.to_html(full_html=False)
|
||||
|
||||
# Expenses over time line chart
|
||||
df["date"] = pd.to_datetime(df["date"])
|
||||
df_by_date = df.groupby([df["date"].dt.date]).sum().reset_index()
|
||||
fig_timeline = px.line(
|
||||
df_by_date,
|
||||
x="date",
|
||||
y="amount",
|
||||
title="Ausgaben im Zeitverlauf"
|
||||
)
|
||||
timeline_chart = fig_timeline.to_html(full_html=False)
|
||||
else:
|
||||
category_chart = "<p>Keine Daten verfügbar</p>"
|
||||
timeline_chart = "<p>Keine Daten verfügbar</p>"
|
||||
|
||||
return render_template(
|
||||
"reports.html",
|
||||
category_chart=category_chart,
|
||||
timeline_chart=timeline_chart,
|
||||
start_date=start_date or "",
|
||||
end_date=end_date or ""
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Start the Flask application
|
||||
app.run(host="0.0.0.0", port=8099)
|
||||
Reference in New Issue
Block a user