Skip to content

Commit e6bbbdf

Browse files
back pending rabbitmq messages in postgres to allow for persistent retrying
1 parent b51f2b8 commit e6bbbdf

5 files changed

Lines changed: 168 additions & 16 deletions

File tree

src/data/base_data.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,8 @@ def delete(self, model: BaseModel):
190190
WHERE {model.pk_field} = :pk"""
191191
)
192192

193-
return self.execute(sql, pk=getattr(model, model.pk_field))
193+
with session_scope() as session:
194+
return session.execute(sql, {"pk": getattr(model, model.pk_field)}).rowcount
194195

195196
def execute(self, sql: Union[str, text], **kwargs):
196197
if isinstance(sql, str):
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from typing import Optional
2+
3+
from sqlalchemy.sql import text
4+
5+
from data.base_data import BaseModel, BaseData
6+
7+
8+
class RabbitmqPendingMessageModel(BaseModel):
9+
_table = "rabbitmq_pending_messages"
10+
_pk_field = "id"
11+
_columns = ["id", "type", "exchange_name", "queue_name", "json_body", "created_time"]
12+
13+
14+
class RabbitmqPendingMessageData(BaseData):
15+
def get_rabbitmq_pending_message_by_id(self, id: int) -> Optional[RabbitmqPendingMessageModel]:
16+
sql = text(
17+
"""
18+
SELECT * FROM rabbitmq_pending_messages
19+
WHERE id = :id;
20+
"""
21+
)
22+
23+
result_rows = self.execute(sql, id=id)
24+
if not result_rows:
25+
return None
26+
27+
return RabbitmqPendingMessageModel(result_rows[0])
28+
29+
def get_rabbitmq_pending_messages(self) -> list[RabbitmqPendingMessageModel]:
30+
sql = text(
31+
"""
32+
SELECT * FROM rabbitmq_pending_messages
33+
ORDER BY created_time ASC;
34+
"""
35+
)
36+
37+
result_rows = self.execute(sql)
38+
return [RabbitmqPendingMessageModel(row) for row in result_rows]

src/services/rabbit_service.py

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
import json
22
import pika
33
import praw
4-
from collections import deque
54
from json import JSONEncoder
65
from praw.models.mod_action import ModAction
76
from praw.models.reddit.comment import Comment
87
from praw.models.reddit.submission import Submission
98
from types import FunctionType
10-
from typing import Deque
119
from uuid import UUID
1210

11+
from services import rabbitmq_pending_message_service
1312
from data.comment_data import CommentModel
1413
from data.mod_action_data import ModActionModel
1514
from data.post_data import PostModel
@@ -34,8 +33,6 @@ def default(self, obj):
3433

3534

3635
class RabbitService:
37-
messages_to_retry: Deque[dict] = deque()
38-
3936
def __init__(self, config_dict: dict):
4037
self.config = config_dict
4138
self.connection = None
@@ -79,11 +76,7 @@ def __init__(self, config_dict: dict):
7976

8077
self.queues[key] = {"exchange": exchange_name, "queue": queue_name}
8178

82-
if self.messages_to_retry:
83-
logger.info(f"Retrying {len(self.messages_to_retry)} RabbitMQ messages")
84-
while self.messages_to_retry:
85-
exchange_name, queue_name, json_body = self.messages_to_retry.popleft()
86-
self._publish_message(exchange_name, queue_name, json_body)
79+
self._republish_messages()
8780

8881
def init_connection(self, reconnect: bool = True):
8982
logger.info(f"{"Rec" if reconnect else "C"}onnecting to RabbitMQ...")
@@ -95,21 +88,21 @@ def publish_post(self, reddit_post: Submission, post: PostModel, status: str = "
9588
logger.info(f"Publishing post to RabbitMQ: {reddit_post.id} ({status})")
9689
queue = self.queues["post"]
9790
body = {"status": status, "reddit": reddit_post, "db": post.to_dict()}
98-
self._publish_message(queue["exchange"], queue["queue"], json.dumps(body, cls=PRAWJSONEncoder))
91+
self._publish_message(queue["exchange"], queue["queue"], json.dumps(body, cls=PRAWJSONEncoder), "post")
9992

10093
def publish_comment(self, reddit_comment: Comment, comment: CommentModel, status: str = "new"):
10194
logger.info(f"Publishing comment to RabbitMQ: {reddit_comment.id} ({status})")
10295
queue = self.queues["comment"]
10396
body = {"status": status, "reddit": reddit_comment, "db": comment.to_dict()}
104-
self._publish_message(queue["exchange"], queue["queue"], json.dumps(body, cls=PRAWJSONEncoder))
97+
self._publish_message(queue["exchange"], queue["queue"], json.dumps(body, cls=PRAWJSONEncoder), "comment")
10598

10699
def publish_mod_action(self, reddit_mod_action: ModAction, mod_action: ModActionModel, status: str = "new"):
107100
logger.info(f"Publishing mod action to RabbitMQ: {reddit_mod_action.id} ({status})")
108101
queue = self.queues["mod_action"]
109102
body = {"status": status, "reddit": reddit_mod_action, "db": mod_action.to_dict()}
110-
self._publish_message(queue["exchange"], queue["queue"], json.dumps(body, cls=PRAWJSONEncoder))
103+
self._publish_message(queue["exchange"], queue["queue"], json.dumps(body, cls=PRAWJSONEncoder), "mod action")
111104

112-
def _publish_message(self, exchange_name: str, queue_name: str, json_body: str):
105+
def _publish_message(self, exchange_name: str, queue_name: str, json_body: str, type: str):
113106
try:
114107
self.channel.basic_publish(
115108
exchange=exchange_name,
@@ -136,6 +129,45 @@ def _publish_message(self, exchange_name: str, queue_name: str, json_body: str):
136129
)
137130
logger.info("Successfully send message after reconnect")
138131
except Exception:
139-
logger.error("Still couldn't connect to RabbitMQ. Saving message to retry memory list")
140-
self.messages_to_retry.append((exchange_name, queue_name, json_body))
132+
logger.exception("Still couldn't connect to RabbitMQ. Saving message to retry table")
133+
rabbitmq_pending_message_service.insert_pending_message(exchange_name, queue_name, json_body, type)
134+
raise
135+
136+
def _republish_messages(self):
137+
messages_to_retry = rabbitmq_pending_message_service.get_pending_messages()
138+
if messages_to_retry:
139+
logger.info(f"Retrying {len(messages_to_retry)} RabbitMQ messages")
140+
number_of_successful_retries = 0
141+
for pending_message in messages_to_retry:
142+
logger.info(
143+
f"Republishing {pending_message.type} to RabbitMQ"
144+
+ f": {pending_message.json_body["reddit"]["id"]} ({pending_message.json_body["status"]})"
145+
)
146+
try:
147+
json_body = json.dumps(pending_message.json_body, cls=PRAWJSONEncoder)
148+
self._republish_message(pending_message.exchange_name, pending_message.queue_name, json_body)
149+
number_of_successful_retries += rabbitmq_pending_message_service.delete_pending_message(pending_message)
150+
except Exception:
151+
if number_of_successful_retries > 0:
152+
logger.warn(
153+
f"Successfully retried {number_of_successful_retries}"
154+
+ f" / {len(messages_to_retry)} RabbitMQ messages"
155+
)
156+
logger.exception("RabbitMQ is still down. Messages will stay pending.")
141157
raise
158+
logger.info(f"Successfully retried all {number_of_successful_retries} RabbitMQ messages")
159+
160+
def _republish_message(self, exchange_name: str, queue_name: str, json_body: str):
161+
try:
162+
self.channel.basic_publish(
163+
exchange=exchange_name,
164+
routing_key=queue_name,
165+
body=json_body,
166+
properties=pika.BasicProperties(
167+
delivery_mode=pika.DeliveryMode.Persistent,
168+
content_type="application/json",
169+
headers={self.config["retry_attempt_header"]: 1},
170+
),
171+
)
172+
except Exception:
173+
logger.exception("Still couldn't connect to RabbitMQ. Message already saved to retry")
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
from typing import Optional
2+
3+
from data.rabbitmq_pending_message_data import RabbitmqPendingMessageData, RabbitmqPendingMessageModel
4+
5+
_rabbitmq_pending_message_data = RabbitmqPendingMessageData()
6+
7+
8+
def get_pending_message_by_id(id: int) -> Optional[RabbitmqPendingMessageModel]:
9+
"""
10+
Gets a single rabbitmq_pending_message from the database.
11+
"""
12+
13+
return _rabbitmq_pending_message_data.get_rabbitmq_pending_message_by_id(id)
14+
15+
16+
def get_pending_messages() -> list[RabbitmqPendingMessageModel]:
17+
"""
18+
Get all rabbitmq_pending_messages in the DB. Ordered by created_time ascending
19+
"""
20+
21+
return _rabbitmq_pending_message_data.get_rabbitmq_pending_messages()
22+
23+
24+
def insert_pending_message(
25+
exchange_name: str, queue_name: str, json_body: str, type: str
26+
) -> RabbitmqPendingMessageModel:
27+
"""Adds a new pending rabbitmq message to the database."""
28+
29+
db_model = RabbitmqPendingMessageModel()
30+
db_model.exchange_name = exchange_name
31+
db_model.queue_name = queue_name
32+
db_model.json_body = json_body
33+
db_model.type = type
34+
35+
saved_db_model = _rabbitmq_pending_message_data.insert(db_model)
36+
return saved_db_model
37+
38+
39+
def delete_pending_message(pending_message: RabbitmqPendingMessageModel):
40+
return _rabbitmq_pending_message_data.delete(pending_message)
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
"""Add rabbitmq pending table
2+
3+
Revision ID: a5f2ff2c36f8
4+
Revises: 03f4360f81fc
5+
Create Date: 2025-12-20 02:24:58.431032+00:00
6+
7+
"""
8+
9+
from alembic import op
10+
import sqlalchemy as sa
11+
12+
13+
# revision identifiers, used by Alembic.
14+
revision = "a5f2ff2c36f8"
15+
down_revision = "03f4360f81fc"
16+
branch_labels = None
17+
depends_on = None
18+
19+
20+
def upgrade():
21+
op.execute(
22+
"""
23+
CREATE TABLE rabbitmq_pending_messages (
24+
id BIGSERIAL PRIMARY KEY,
25+
type TEXT NOT NULL,
26+
exchange_name TEXT NOT NULL,
27+
queue_name TEXT NOT NULL,
28+
json_body JSONB NOT NULL,
29+
created_time TIMESTAMPTZ NOT NULL DEFAULT now()
30+
);
31+
CREATE UNIQUE INDEX IF NOT EXISTS idx_rabbitmq_pending_messages_created_time ON rabbitmq_pending_messages(created_time);
32+
"""
33+
)
34+
35+
36+
def downgrade():
37+
op.execute(
38+
"""
39+
DROP TABLE IF EXISTS rabbitmq_pending_messages;
40+
"""
41+
)

0 commit comments

Comments
 (0)