2626
2727logger = logging .getLogger (__name__ )
2828
29- router = APIRouter (prefix = "/api/jobs" , tags = ["Jobs (Dapr Callbacks)" ])
29+ router = APIRouter (tags = ["Jobs (Dapr Callbacks)" ])
30+
31+ # Dapr Jobs v1.0-alpha1 calls back to /job/{job_name} by default
32+ # We need to handle this path to receive job triggers
3033
3134
3235def calculate_next_due (pattern : str , from_time : datetime ) -> datetime :
@@ -45,26 +48,66 @@ def calculate_next_due(pattern: str, from_time: datetime) -> datetime:
4548 return from_time + patterns .get (pattern , timedelta (days = 1 ))
4649
4750
48- @router .post ("/trigger" )
51+ @router .post ("/job/{job_name}" )
52+ async def handle_dapr_job_callback (
53+ job_name : str ,
54+ request : Request ,
55+ session : AsyncSession = Depends (get_session ),
56+ ) -> dict :
57+ """Handle Dapr Jobs v1.0-alpha1 callback.
58+
59+ Dapr calls POST /job/{job_name} when a scheduled job fires.
60+ The job data we provided during scheduling is in the request body.
61+
62+ Job naming convention:
63+ - spawn-task-{id}: Create next recurring task occurrence
64+ - reminder-task-{id}: Send reminder notification
65+ """
66+ try :
67+ body = await request .json ()
68+ # Dapr wraps our data - extract it
69+ job_data = body .get ("data" , body )
70+
71+ task_id = job_data .get ("task_id" )
72+ job_type = job_data .get ("type" )
73+
74+ logger .info (
75+ "[DAPR-JOB] Callback received: job=%s, type=%s, task_id=%s" ,
76+ job_name ,
77+ job_type ,
78+ task_id ,
79+ )
80+
81+ if job_type == "spawn" :
82+ return await handle_spawn (session , task_id )
83+ elif job_type == "reminder" :
84+ return await handle_reminder (session , job_data )
85+ else :
86+ logger .warning ("[DAPR-JOB] Unknown job type: %s" , job_type )
87+ return {"status" : "unknown_type" }
88+
89+ except Exception as e :
90+ logger .exception ("[DAPR-JOB] Error handling job %s: %s" , job_name , e )
91+ return {"status" : "error" , "message" : str (e )}
92+
93+
94+ @router .post ("/api/jobs/trigger" )
4995async def handle_job_trigger (
5096 request : Request ,
5197 session : AsyncSession = Depends (get_session ),
5298) -> dict :
53- """Handle Dapr job trigger callback .
99+ """Legacy endpoint - kept for backwards compatibility .
54100
55- Dapr calls this endpoint when a scheduled job fires.
56- Job types:
57- - spawn: Create next recurring task occurrence
58- - reminder: Publish reminder event to Notification Service
101+ New jobs use /job/{job_name} callback (Dapr default).
59102 """
60103 try :
61104 body = await request .json ()
62- job_data = body .get ("data" , body ) # Handle both wrapped and raw payloads
105+ job_data = body .get ("data" , body )
63106
64107 task_id = job_data .get ("task_id" )
65108 job_type = job_data .get ("type" )
66109
67- logger .info ("[DAPR-JOB] Received trigger: type=%s, task_id=%s" , job_type , task_id )
110+ logger .info ("[DAPR-JOB] Legacy trigger: type=%s, task_id=%s" , job_type , task_id )
68111
69112 if job_type == "spawn" :
70113 return await handle_spawn (session , task_id )
0 commit comments