Architectural decision - Durable message queues

 Let’s do this right—elegant, professional, Flask-based, cPanel-ready, OOP, and ready for future code re-use.


High-Level Plan

  • Flask REST API microservice (message-queue) with /enqueue, /poll, /ack, /dead-letter endpoints.

  • OOP model: MessageQueue DAL class, Flask resource classes.

  • Data access layer (dal/queue.py) is versioned, easily copy/paste into future services or released as a package.

  • JWT protected endpoints for security.

  • MySQL connector, but abstracted for future adapters.

  • Config via .env and environment variables (use python-dotenv).

  • Minimal dependencies, readable, real-world production structure.


Project Structure

message-queue/
├── dal/
│   └── queue.py         # Data access layer (OOP, versioned)
├── app.py               # Flask app
├── config.py            # Config loader
├── requirements.txt
├── passenger_wsgi.py    # cPanel integration
├── .env.example
└── README.md

Code

1. requirements.txt

Flask==2.3.2
PyJWT==2.8.0
python-dotenv==1.0.0
mysql-connector-python==8.3.0

2. config.py

import os
from dotenv import load_dotenv

load_dotenv()

class Config:
    MYSQL_HOST = os.getenv("MYSQL_HOST")
    MYSQL_USER = os.getenv("MYSQL_USER")
    MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD")
    MYSQL_DB = os.getenv("MYSQL_DB")
    JWT_SECRET_KEY = os.getenv("JWT_SECRET_KEY")
    JWT_ISSUER = os.getenv("JWT_ISSUER", "https://aurorahours.com/identity-backend")
    JWT_AUDIENCE = os.getenv("JWT_AUDIENCE", "mq")
    DEV_MODE = os.getenv("DEV_MODE", "false").lower() == "true"

    @classmethod
    def as_dict(cls):
        return {k: getattr(cls, k) for k in dir(cls) if not k.startswith("__") and not callable(getattr(cls, k))}

3. dal/queue.py (OOP, versioned as 0.1.0)

# dal/queue.py
# Version: 0.1.0

import mysql.connector
from mysql.connector import pooling
import json
from datetime import datetime

class MessageQueueDAL:
    def __init__(self, config):
        self.pool = pooling.MySQLConnectionPool(
            pool_name="mq_pool",
            pool_size=5,
            host=config.MYSQL_HOST,
            user=config.MYSQL_USER,
            password=config.MYSQL_PASSWORD,
            database=config.MYSQL_DB,
            autocommit=True,
        )

    def init_schema(self):
        with self.pool.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("""
            CREATE TABLE IF NOT EXISTS message_queue (
                id INT AUTO_INCREMENT PRIMARY KEY,
                event_type VARCHAR(128) NOT NULL,
                payload TEXT NOT NULL,
                status ENUM('NEW','IN_PROGRESS','DONE','FAILED') DEFAULT 'NEW',
                retry_count INT DEFAULT 0,
                last_error TEXT,
                created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
                updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
            ) ENGINE=InnoDB;
            """)
            conn.commit()

    def enqueue(self, event_type, payload):
        with self.pool.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "INSERT INTO message_queue (event_type, payload) VALUES (%s, %s)",
                (event_type, json.dumps(payload))
            )
            conn.commit()
            return cursor.lastrowid

    def poll(self, limit=10):
        with self.pool.get_connection() as conn:
            cursor = conn.cursor(dictionary=True)
            cursor.execute(
                "SELECT * FROM message_queue WHERE status='NEW' ORDER BY created_at ASC LIMIT %s FOR UPDATE", (limit,)
            )
            rows = cursor.fetchall()
            if rows:
                ids = [row['id'] for row in rows]
                cursor.execute(
                    "UPDATE message_queue SET status='IN_PROGRESS', updated_at=NOW() WHERE id IN (%s)" %
                    (','.join(['%s'] * len(ids))), ids
                )
                conn.commit()
            return rows

    def ack(self, msg_id):
        with self.pool.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "UPDATE message_queue SET status='DONE', updated_at=NOW() WHERE id=%s", (msg_id,)
            )
            conn.commit()

    def fail(self, msg_id, error):
        with self.pool.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "UPDATE message_queue SET status='FAILED', last_error=%s, retry_count=retry_count+1, updated_at=NOW() WHERE id=%s",
                (str(error)[:400], msg_id)
            )
            conn.commit()

    def get_failed(self, limit=10):
        with self.pool.get_connection() as conn:
            cursor = conn.cursor(dictionary=True)
            cursor.execute(
                "SELECT * FROM message_queue WHERE status='FAILED' ORDER BY updated_at DESC LIMIT %s", (limit,)
            )
            return cursor.fetchall()

4. app.py

from flask import Flask, request, jsonify
from config import Config
from dal.queue import MessageQueueDAL
import jwt
from functools import wraps

mq = MessageQueueDAL(Config)
mq.init_schema()

app = Flask(__name__)

# --- JWT Auth Decorator ---
def jwt_required(fn):
    @wraps(fn)
    def wrapper(*args, **kwargs):
        auth = request.headers.get("Authorization", "")
        if not auth.startswith("Bearer "):
            return jsonify({"error": "Missing Bearer token"}), 401
        token = auth.split(" ", 1)[1]
        try:
            payload = jwt.decode(
                token,
                Config.JWT_SECRET_KEY,
                algorithms=["HS256"],
                issuer=Config.JWT_ISSUER,
                audience=Config.JWT_AUDIENCE,
            )
            # Optionally: validate scope/roles here
        except Exception as e:
            return jsonify({"error": f"Invalid token: {e}"}), 401
        return fn(*args, **kwargs)
    return wrapper

@app.route("/enqueue", methods=["POST"])
@jwt_required
def enqueue():
    data = request.json
    event_type = data.get("event_type")
    payload = data.get("payload")
    if not event_type or payload is None:
        return jsonify({"error": "Missing event_type or payload"}), 400
    msg_id = mq.enqueue(event_type, payload)
    return jsonify({"ok": True, "message_id": msg_id})

@app.route("/poll", methods=["POST"])
@jwt_required
def poll():
    limit = int(request.json.get("limit", 10))
    rows = mq.poll(limit=limit)
    return jsonify({"messages": rows})

@app.route("/ack", methods=["POST"])
@jwt_required
def ack():
    msg_id = request.json.get("message_id")
    if not msg_id:
        return jsonify({"error": "Missing message_id"}), 400
    mq.ack(msg_id)
    return jsonify({"ok": True})

@app.route("/fail", methods=["POST"])
@jwt_required
def fail():
    msg_id = request.json.get("message_id")
    error = request.json.get("error", "")
    if not msg_id:
        return jsonify({"error": "Missing message_id"}), 400
    mq.fail(msg_id, error)
    return jsonify({"ok": True})

@app.route("/dead-letter", methods=["GET"])
@jwt_required
def dead_letter():
    rows = mq.get_failed(limit=20)
    return jsonify({"failed_messages": rows})

@app.route("/healthz")
def healthz():
    return "OK", 200

if __name__ == "__main__":
    app.run(debug=Config.DEV_MODE, port=5022)

5. passenger_wsgi.py

from app import app as application

6. .env.example

MYSQL_HOST=localhost
MYSQL_USER=your_mysql_user
MYSQL_PASSWORD=your_mysql_password
MYSQL_DB=your_mq_db

JWT_SECRET_KEY=your-very-secret-key
JWT_ISSUER=https://aurorahours.com/identity-backend
JWT_AUDIENCE=mq
DEV_MODE=true

7. README.md (snippet)

# message-queue

Simple, production-grade message queue for microservices using Flask + MySQL, ready for cPanel, JWT-protected.

## Endpoints

- `POST /enqueue` - Add a message/event
- `POST /poll` - Get NEW messages (locks for processing)
- `POST /ack` - Acknowledge successful processing
- `POST /fail` - Mark as failed, increments retry
- `GET /dead-letter` - List failed messages
- `GET /healthz` - Health check

All endpoints (except `/healthz`) require Bearer JWT.

## Reusable DAL

See `dal/queue.py` - OOP, versioned, copy or pip package for other services.

## Deploy

- Set environment vars or copy `.env.example` to `.env`
- Deploy via passenger_wsgi on cPanel

## License

MIT

This gives you a production-grade, OOP, JWT-secured, Flask-based message queue with a reusable data access layer.
Plug it in, deploy, and grow as you need.

If you want a setup.sql or ready-to-go setup.py for the DAL as a package, just say the word!

Comments

Popular posts from this blog

Feature: Audit log for one login, and identity service

Getting started - Build your data science lab environment

QA - Run #1 - Results