handle incoming message validation error #2565
Answered
by
yann-combarnous
pablo-esteban
asked this question in
Q&A
-
|
Hi, I want to handle the validation error raised by pydantic if the incoming json does not conform to the expected type. How do I do this? i was hoping to specify something like an exception handler (using the example from the docs): from pydantic import BaseModel, Field, NonNegativeInt
from faststream import FastStream
from faststream.kafka import KafkaBroker
broker = KafkaBroker("localhost:9092")
app = FastStream(broker)
class UserInfo(BaseModel):
name: str = Field(
..., examples=["John"], description="Registered user name"
)
user_id: NonNegativeInt = Field(
..., examples=[1], description="Registered user id"
)
def on_validation_err(exception, value):
log = logging.getLogger()
log.error(f"Could not validate {value}")
# maybe raise a different exception
@broker.subscriber("test-topic", on_error={pydantic_core._pydantic_core.ValidationError: on_validation_err})
async def handle(
user: UserInfo,
):
assert user.name == "John"
assert user.user_id == 1 |
Beta Was this translation helpful? Give feedback.
Answered by
yann-combarnous
Oct 24, 2025
Replies: 1 comment 1 reply
-
|
You can use something like: from faststream import ExceptionMiddleware
async def error_handler(
self, exc: Exception, message: KafkaMessage, context: ContextRepo, broker: BrokerAnnotation, logger: Logger
):
<your code here>
Broker(
...
middlewares=(ExceptionMiddleware(handlers={Exception: error_handler}),)
...
) |
Beta Was this translation helpful? Give feedback.
1 reply
Answer selected by
pablo-esteban
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
You can use something like: