Architectural decision - Durable message queues
Let’s do this right—elegant, professional, Flask-based, cPanel-ready, OOP, and ready for future code re-use.
High-Level Plan
-
Flask REST API microservice (
message-queue) with/enqueue,/poll,/ack,/dead-letterendpoints. -
OOP model: MessageQueue DAL class, Flask resource classes.
-
Data access layer (
dal/queue.py) is versioned, easily copy/paste into future services or released as a package. -
JWT protected endpoints for security.
-
MySQL connector, but abstracted for future adapters.
-
Config via
.envand environment variables (usepython-dotenv). -
Minimal dependencies, readable, real-world production structure.
Project Structure
message-queue/
├── dal/
│ └── queue.py # Data access layer (OOP, versioned)
├── app.py # Flask app
├── config.py # Config loader
├── requirements.txt
├── passenger_wsgi.py # cPanel integration
├── .env.example
└── README.md
Code
1. requirements.txt
Flask==2.3.2
PyJWT==2.8.0
python-dotenv==1.0.0
mysql-connector-python==8.3.0
2. config.py
import os
from dotenv import load_dotenv
load_dotenv()
class Config:
MYSQL_HOST = os.getenv("MYSQL_HOST")
MYSQL_USER = os.getenv("MYSQL_USER")
MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD")
MYSQL_DB = os.getenv("MYSQL_DB")
JWT_SECRET_KEY = os.getenv("JWT_SECRET_KEY")
JWT_ISSUER = os.getenv("JWT_ISSUER", "https://aurorahours.com/identity-backend")
JWT_AUDIENCE = os.getenv("JWT_AUDIENCE", "mq")
DEV_MODE = os.getenv("DEV_MODE", "false").lower() == "true"
@classmethod
def as_dict(cls):
return {k: getattr(cls, k) for k in dir(cls) if not k.startswith("__") and not callable(getattr(cls, k))}
3. dal/queue.py (OOP, versioned as 0.1.0)
# dal/queue.py
# Version: 0.1.0
import mysql.connector
from mysql.connector import pooling
import json
from datetime import datetime
class MessageQueueDAL:
def __init__(self, config):
self.pool = pooling.MySQLConnectionPool(
pool_name="mq_pool",
pool_size=5,
host=config.MYSQL_HOST,
user=config.MYSQL_USER,
password=config.MYSQL_PASSWORD,
database=config.MYSQL_DB,
autocommit=True,
)
def init_schema(self):
with self.pool.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS message_queue (
id INT AUTO_INCREMENT PRIMARY KEY,
event_type VARCHAR(128) NOT NULL,
payload TEXT NOT NULL,
status ENUM('NEW','IN_PROGRESS','DONE','FAILED') DEFAULT 'NEW',
retry_count INT DEFAULT 0,
last_error TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB;
""")
conn.commit()
def enqueue(self, event_type, payload):
with self.pool.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO message_queue (event_type, payload) VALUES (%s, %s)",
(event_type, json.dumps(payload))
)
conn.commit()
return cursor.lastrowid
def poll(self, limit=10):
with self.pool.get_connection() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute(
"SELECT * FROM message_queue WHERE status='NEW' ORDER BY created_at ASC LIMIT %s FOR UPDATE", (limit,)
)
rows = cursor.fetchall()
if rows:
ids = [row['id'] for row in rows]
cursor.execute(
"UPDATE message_queue SET status='IN_PROGRESS', updated_at=NOW() WHERE id IN (%s)" %
(','.join(['%s'] * len(ids))), ids
)
conn.commit()
return rows
def ack(self, msg_id):
with self.pool.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"UPDATE message_queue SET status='DONE', updated_at=NOW() WHERE id=%s", (msg_id,)
)
conn.commit()
def fail(self, msg_id, error):
with self.pool.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"UPDATE message_queue SET status='FAILED', last_error=%s, retry_count=retry_count+1, updated_at=NOW() WHERE id=%s",
(str(error)[:400], msg_id)
)
conn.commit()
def get_failed(self, limit=10):
with self.pool.get_connection() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute(
"SELECT * FROM message_queue WHERE status='FAILED' ORDER BY updated_at DESC LIMIT %s", (limit,)
)
return cursor.fetchall()
4. app.py
from flask import Flask, request, jsonify
from config import Config
from dal.queue import MessageQueueDAL
import jwt
from functools import wraps
mq = MessageQueueDAL(Config)
mq.init_schema()
app = Flask(__name__)
# --- JWT Auth Decorator ---
def jwt_required(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
auth = request.headers.get("Authorization", "")
if not auth.startswith("Bearer "):
return jsonify({"error": "Missing Bearer token"}), 401
token = auth.split(" ", 1)[1]
try:
payload = jwt.decode(
token,
Config.JWT_SECRET_KEY,
algorithms=["HS256"],
issuer=Config.JWT_ISSUER,
audience=Config.JWT_AUDIENCE,
)
# Optionally: validate scope/roles here
except Exception as e:
return jsonify({"error": f"Invalid token: {e}"}), 401
return fn(*args, **kwargs)
return wrapper
@app.route("/enqueue", methods=["POST"])
@jwt_required
def enqueue():
data = request.json
event_type = data.get("event_type")
payload = data.get("payload")
if not event_type or payload is None:
return jsonify({"error": "Missing event_type or payload"}), 400
msg_id = mq.enqueue(event_type, payload)
return jsonify({"ok": True, "message_id": msg_id})
@app.route("/poll", methods=["POST"])
@jwt_required
def poll():
limit = int(request.json.get("limit", 10))
rows = mq.poll(limit=limit)
return jsonify({"messages": rows})
@app.route("/ack", methods=["POST"])
@jwt_required
def ack():
msg_id = request.json.get("message_id")
if not msg_id:
return jsonify({"error": "Missing message_id"}), 400
mq.ack(msg_id)
return jsonify({"ok": True})
@app.route("/fail", methods=["POST"])
@jwt_required
def fail():
msg_id = request.json.get("message_id")
error = request.json.get("error", "")
if not msg_id:
return jsonify({"error": "Missing message_id"}), 400
mq.fail(msg_id, error)
return jsonify({"ok": True})
@app.route("/dead-letter", methods=["GET"])
@jwt_required
def dead_letter():
rows = mq.get_failed(limit=20)
return jsonify({"failed_messages": rows})
@app.route("/healthz")
def healthz():
return "OK", 200
if __name__ == "__main__":
app.run(debug=Config.DEV_MODE, port=5022)
5. passenger_wsgi.py
from app import app as application
6. .env.example
MYSQL_HOST=localhost
MYSQL_USER=your_mysql_user
MYSQL_PASSWORD=your_mysql_password
MYSQL_DB=your_mq_db
JWT_SECRET_KEY=your-very-secret-key
JWT_ISSUER=https://aurorahours.com/identity-backend
JWT_AUDIENCE=mq
DEV_MODE=true
7. README.md (snippet)
# message-queue
Simple, production-grade message queue for microservices using Flask + MySQL, ready for cPanel, JWT-protected.
## Endpoints
- `POST /enqueue` - Add a message/event
- `POST /poll` - Get NEW messages (locks for processing)
- `POST /ack` - Acknowledge successful processing
- `POST /fail` - Mark as failed, increments retry
- `GET /dead-letter` - List failed messages
- `GET /healthz` - Health check
All endpoints (except `/healthz`) require Bearer JWT.
## Reusable DAL
See `dal/queue.py` - OOP, versioned, copy or pip package for other services.
## Deploy
- Set environment vars or copy `.env.example` to `.env`
- Deploy via passenger_wsgi on cPanel
## License
MIT
This gives you a production-grade, OOP, JWT-secured, Flask-based message queue with a reusable data access layer.
Plug it in, deploy, and grow as you need.
If you want a setup.sql or ready-to-go setup.py for the DAL as a package, just say the word!
Comments
Post a Comment