diff --git a/worker_plan/worker_plan_internal/diagnostics/premortem.py b/worker_plan/worker_plan_internal/diagnostics/premortem.py index cb5e6e2a..282c38f7 100644 --- a/worker_plan/worker_plan_internal/diagnostics/premortem.py +++ b/worker_plan/worker_plan_internal/diagnostics/premortem.py @@ -1,5 +1,5 @@ """ -Premortem: “If we fail, here’s how and why.” +Premortem: "If we fail, here's how and why." Imagine that the project has failed, and work backwards to identify plausible reasons why. @@ -9,14 +9,14 @@ PROMPT> python -m worker_plan_internal.diagnostics.premortem -`assumptions_to_kill` are the INPUTS. They are the foundational beliefs held before the project begins. They represent the project's -most significant areas of uncertainty. The list of assumptions is, in itself, a high-value deliverable for a project kickoff. +`assumptions_to_kill` are the INPUTS. They are the foundational beliefs held before the project begins. They represent the project's +most significant areas of uncertainty. The list of assumptions is, in itself, a high-value deliverable for a project kickoff. It's the "here's what we believe to be true, but we need to prove it" list. -`failure_modes` are the potential OUTCOMES. They are the narrative stories of what could happen if an assumption proves false. +`failure_modes` are the potential OUTCOMES. They are the narrative stories of what could happen if an assumption proves false. They explore the consequences and the causal chain of failure. -IDEA: Focus on top 3 failure modes. All failure modes are rated “High” or “Critical”, which dilutes prioritization. This risks overwhelming the team with too many “critical” focus areas. Rank failure modes by priority (e.g., top 3: FM5, FM1, FM6) and allocate resources accordingly. +IDEA: Focus on top 3 failure modes. All failure modes are rated "High" or "Critical", which dilutes prioritization. This risks overwhelming the team with too many "critical" focus areas. Rank failure modes by priority (e.g., top 3: FM5, FM1, FM6) and allocate resources accordingly. IDEA: The "Response Playbook" uses the "Contain, Assess, Respond" model. Enhance with a field for "Proactive Mitigation." The playbook is for when a tripwire is hit (reactive). Proactive mitigation would be the actions taken beforehand to prevent the tripwire from ever being hit. For example, for "The Empty Wallet Wasteland", the proactive mitigation is "Conduct a detailed bottom-up cost estimation." This task should be in the project plan from day one because of the risk identified in the Premortem. @@ -24,7 +24,7 @@ IDEA: The premortem assumes a static risk landscape. -IDEA: add a low-probability, high-impact “external shock” scenario, "black swan" scenario. +IDEA: add a low-probability, high-impact "external shock" scenario, "black swan" scenario. IDEA: Use a reasoning model to validate the premortem section and fix issues. @@ -44,59 +44,84 @@ logger = logging.getLogger(__name__) class AssumptionItem(BaseModel): - assumption_id: str = Field(description="Enumerate the assumption items starting from 'A1', 'A2', 'A3', 'A4', etc. Do not restart at A1.") + assumption_id: str = Field(description="A unique ID for this assumption, e.g. 'A1', 'A2', 'A3'.") statement: str = Field(description="The core assumption we are making that, if false, would kill the project.") test_now: str = Field(description="A concrete, immediate action to test if this assumption is true.") falsifier: str = Field(description="The specific result from the test that would prove the assumption false.") class FailureModeItem(BaseModel): - failure_mode_index: int = Field(description="Enumerate the failure_mode items starting from 1") - root_cause_assumption_id: str = Field(description="The 'assumption_id' (e.g., 'A1') of the single assumption that is the primary root cause of this failure mode.") - failure_mode_archetype: str = Field(description="The archetype of failure: 'Process/Financial', 'Technical/Logistical', or 'Market/Human'.") - failure_mode_title: str = Field(description="A compelling, story-like title (e.g., 'The Gridlock Gamble').") - risk_analysis: str = Field( - description="Structured, factual breakdown of causes, contributing factors, and impacts for the failure mode. Use bullet points or short factual sentences. Avoid narratives or fictional elements." - ) - early_warning_signs: List[str] = Field( - description="Clear, measurable indicators that this failure mode may occur. Each must be objectively testable." - ) - owner: Optional[str] = Field(None, description="The single role who owns this risk (e.g., 'Permitting Lead', 'Head of Engineering').") - likelihood_5: Optional[int] = Field(None, description="Integer from 1 (rare) to 5 (almost certain) of this failure occurring.") - impact_5: Optional[int] = Field(None, description="Integer from 1 (minor) to 5 (catastrophic) if this failure occurs.") - tripwires: Optional[List[str]] = Field(None, description="Array of 2-3 short, specific strings with NUMERIC thresholds that signal this failure is imminent (e.g., 'Permit delays exceed 90 days').") - playbook: Optional[List[str]] = Field(None, description="Array of exactly 3 brief, imperative action steps for the owner to take if a tripwire is hit.") - stop_rule: Optional[str] = Field(None, description="A single, short, hard stop condition that would trigger project cancellation or a major pivot.") + failure_mode_index: int = Field(description="Index of this failure mode, starting from 1.") + root_cause_assumption_id: str = Field(description="The assumption_id of the assumption that is the root cause of this failure (e.g. 'A1').") + failure_mode_archetype: str = Field(description="The failure archetype (e.g. 'Process/Financial', 'Technical/Logistical', 'Market/Human', 'Environmental/Regulatory', 'Execution/Operational', 'Technology/Infrastructure', or a more specific variant suited to this project).") + failure_mode_title: str = Field(description="A compelling, story-like title (e.g. 'The Gridlock Gamble').") + risk_analysis: str = Field(description="Factual breakdown of causes, contributing factors, and impacts.") + early_warning_signs: List[str] = Field(description="Measurable indicators that this failure may occur.") + owner: Optional[str] = Field(None, description="The single role who owns this risk.") + likelihood_5: Optional[int] = Field(None, description="Integer 1-5: likelihood of this failure occurring.") + impact_5: Optional[int] = Field(None, description="Integer 1-5: impact if this failure occurs.") + tripwires: Optional[List[str]] = Field(None, description="2-3 measurable thresholds signalling imminent failure (e.g. 'Permit delays exceed 90 days').") + playbook: Optional[List[str]] = Field(None, description="Exactly 3 imperative actions: Contain, Assess, Respond.") + stop_rule: Optional[str] = Field(None, description="Hard stop condition that would trigger project cancellation or major pivot.") + +class ArchetypeNarrative(BaseModel): + """Minimal schema: just the narrative content. IDs and bookkeeping are assigned by the program.""" + assumption: str = Field(description="One critical assumption the project is making that, if false, would cause this failure.") + test_now: str = Field(description="One concrete action to immediately test if this assumption holds.") + failure_title: str = Field(description="A short, compelling title for this failure scenario.") + failure_story: str = Field(description="A detailed narrative of how this failure unfolds. Explain causes, chain of events, and impact.") + warning_signs: List[str] = Field(description="2-4 observable signals that this failure is beginning to occur.") + +class ArchetypeAnalysis(BaseModel): + """Single-archetype premortem: one assumption + one failure mode. Used per-call to reduce schema complexity.""" + assumption: AssumptionItem = Field(description="One critical assumption underlying this failure archetype.") + failure_mode: FailureModeItem = Field(description="One failure mode for this archetype, linked to the assumption above.") class PremortemAnalysis(BaseModel): - assumptions_to_kill: List[AssumptionItem] = Field(description="A list of 3 new, critical, underlying assumptions to test immediately.") - failure_modes: List[FailureModeItem] = Field(description="A list containing exactly 3 distinct failure failure_modes, one for each archetype.") - -PREMORTEM_SYSTEM_PROMPT = """ -Persona: You are a senior project analyst. Your primary goal is to write compelling, detailed, and distinct failure stories that are also operationally actionable. - -Objective: Imagine the user's project has failed completely. Generate a comprehensive premortem analysis as a single JSON object. - -Instructions: -1. Generate a top-level `assumptions_to_kill` array containing exactly 3 critical assumptions to test, each with an `id`, `statement`, `test_now`, and `falsifier`. An assumption is a belief held without proof (e.g., "The supply chain is stable"), not a project goal. -2. Generate a top-level `failure_modes` array containing exactly 3 detailed, story-like failure failure_modes, one for each archetype: Process/Financial, Technical/Logistical, and Market/Human. -3. **CRITICAL LINKING STEP: For each `failure_mode`, you MUST identify its root cause by setting the `root_cause_assumption_id` field to the `assumption_id` of one of the assumptions you created in step 1. ** Each assumption ("A1", "A2", "A3", "A4", etc.) must be used as a root cause exactly once. -4. Each story in the `failure_modes` array must be a detailed, multi-paragraph story with a clear causal chain. Do not write short summaries. -5. For each of the 3 failure_modes, you MUST populate all the following fields: `failure_mode_index`, `failure_mode_archetype`, `failure_mode_title`, `risk_analysis`, `early_warning_signs`, `owner`, `likelihood_5`, `impact_5`, `tripwires`, `playbook`, and `stop_rule`. -6. **CRITICAL:** Each of the 3 failure_modes must be distinct and unique. Do not repeat the same story, phrasing, or playbook actions. Tailor each one specifically to its archetype (e.g., the financial failure should be about money and process, the technical failure about engineering and materials, the market failure about public perception and competition). -7. Tripwires MUST be objectively measurable (use operators like <=, >=, =, %, days, counts); avoid vague terms like “significant” or “many”. -8. The `playbook` array MUST contain exactly 3 actions as follows: - 1. An immediate containment/control action, e.g., 'Contain: Stop the bleeding.' - 2. An assessment/triage action, e.g., 'Assess: Figure out how bad the damage is.' - 3. A strategic response action, e.g., 'Respond: Take strategic action based on the assessment.' -9. The `stop_rule` MUST be a hard, non-negotiable condition for project cancellation or a major pivot. -10. Your entire output must be a single, valid JSON object. For any follow-up requests, you MUST regenerate the full JSON object including all required fields, not just the part being changed. Do not add any text or explanation outside of the JSON structure. - -FULL-OBJECT, TWO-KEYS ONLY (HARD RULE) -- The top-level JSON MUST contain exactly two keys: "assumptions_to_kill" and "failure_modes". No other keys are allowed. -- On follow-up requests (even if they ask for “only assumptions”), you MUST return the full JSON object with BOTH keys present and populated. Never omit or leave "failure_modes" empty. -- If asked to start at A4/A7/etc., create exactly 3 new assumptions with those IDs and REBUILD all 3 failure_modes to reference them (each assumption used exactly once). -- The message must END immediately after the closing "}" of the JSON. No markdown or text after it. -- Self-check before sending: output starts with "{" and ends with "}", includes BOTH required keys, has exactly 3 assumptions and exactly 3 failure_modes. + assumptions_to_kill: List[AssumptionItem] = Field(description="Critical assumptions to test immediately.") + failure_modes: List[FailureModeItem] = Field(description="Failure mode stories, one per archetype.") + +ARCHETYPES = [ + ("Process/Financial", "A1", 1), + ("Technical/Logistical", "A2", 2), + ("Market/Human", "A3", 3), + ("Environmental/Regulatory", "A4", 4), + ("Execution/Operational", "A5", 5), + ("Technology/Infrastructure", "A6", 6), + ("Legal/Compliance", "A7", 7), + ("Strategic/Competitive", "A8", 8), + ("Stakeholder/Political", "A9", 9), +] + +PREMORTEM_SYSTEM_PROMPT_NARRATIVE = """ +You are a senior project analyst. The project has failed completely. + +Analyse this single failure archetype: {archetype} + +Return a JSON object with these fields: +- assumption: (string) The key belief the project was relying on that turned out to be wrong +- test_now: (string) One immediate action that could have tested this assumption early +- failure_title: (string) A short, memorable title for this failure story +- failure_story: (string) A detailed paragraph explaining how this failure happened — causes, chain of events, consequences +- warning_signs: (array of strings) 2-4 observable early signals that this failure was beginning + +Output only the JSON object. No extra text. +""" + +PREMORTEM_SYSTEM_PROMPT_TEMPLATE = """ +You are a senior project analyst. Imagine the project has failed completely. + +Your task: analyse a single failure archetype - {archetype}. + +Generate exactly ONE assumption and ONE failure mode for this archetype as a JSON object with two keys: +- "assumption": one AssumptionItem (assumption_id="{assumption_id}", statement, test_now, falsifier) +- "failure_mode": one FailureModeItem (failure_mode_index={index}, root_cause_assumption_id="{assumption_id}", failure_mode_archetype="{archetype}", plus all other fields) + +Rules: +- failure_mode_archetype MUST be "{archetype}" +- root_cause_assumption_id MUST be "{assumption_id}" +- tripwires must use numeric thresholds (e.g. "Permit delays exceed 90 days") +- playbook must have exactly 3 steps: Contain, Assess, Respond +- Output ONLY the JSON object. No markdown, no explanation. Start with {{ and end with }}. """ @dataclass @@ -106,7 +131,7 @@ class Premortem: response: dict metadata: dict markdown: str - + @classmethod def execute(cls, llm_executor: LLMExecutor, speed_vs_detail: SpeedVsDetailEnum, user_prompt: str) -> 'Premortem': if not isinstance(llm_executor, LLMExecutor): @@ -115,107 +140,95 @@ def execute(cls, llm_executor: LLMExecutor, speed_vs_detail: SpeedVsDetailEnum, raise ValueError("Invalid SpeedVsDetailEnum instance.") if not isinstance(user_prompt, str): raise ValueError("Invalid user_prompt.") - - logger.debug(f"User Prompt:\n{user_prompt}") - system_prompt = PREMORTEM_SYSTEM_PROMPT.strip() - accumulated_chat_message_list = [ - ChatMessage( - role=MessageRole.SYSTEM, - content=system_prompt, - ) - ] + logger.debug(f"User Prompt:\n{user_prompt}") - user_prompt_list = [ - user_prompt, - "Generate 3 new assumptions that are thematically different from the previous ones. Start assumption_id at A4.", - "Generate 3 new assumptions that are thematically different from the previous ones and covers different archetypes. Start assumption_id at A7.", - ] + # Decomposed approach: one independent call per archetype. + # Each call produces exactly one assumption + one failure mode, eliminating + # cross-linking constraints and reducing schema complexity per call. + archetypes_to_run = ARCHETYPES if speed_vs_detail == SpeedVsDetailEnum.FAST_BUT_SKIP_DETAILS: - user_prompt_list = user_prompt_list[:1] - logger.info("Running in FAST_BUT_SKIP_DETAILS mode. Omitting some assumptions.") + archetypes_to_run = ARCHETYPES[:1] + logger.info("Running in FAST_BUT_SKIP_DETAILS mode. Only first archetype.") else: - logger.info("Running in ALL_DETAILS_BUT_SLOW mode. Processing all assumptions.") + logger.info(f"Running in ALL_DETAILS_BUT_SLOW mode. Processing {len(ARCHETYPES)} archetypes.") - responses: list[PremortemAnalysis] = [] + assumptions_to_kill: list[AssumptionItem] = [] + failure_modes: list[FailureModeItem] = [] metadata_list: list[dict] = [] - for user_prompt_index, user_prompt_item in enumerate(user_prompt_list): - logger.info(f"Processing user_prompt_index: {user_prompt_index+1} of {len(user_prompt_list)}") - chat_message_list = accumulated_chat_message_list.copy() - chat_message_list.append( - ChatMessage( - role=MessageRole.USER, - content=user_prompt_item, - ) - ) - def execute_function(llm: LLM) -> dict: - sllm = llm.as_structured_llm(PremortemAnalysis) + for archetype, assumption_id, index in archetypes_to_run: + logger.info(f"Processing archetype {index}/{len(archetypes_to_run)}: {archetype}") + system_prompt = PREMORTEM_SYSTEM_PROMPT_NARRATIVE.format(archetype=archetype).strip() + + chat_message_list = [ + ChatMessage(role=MessageRole.SYSTEM, content=system_prompt), + ChatMessage(role=MessageRole.USER, content=user_prompt), + ] + + def execute_function(llm: LLM, _chat=chat_message_list) -> dict: + sllm = llm.as_structured_llm(ArchetypeNarrative) start_time = time.perf_counter() - - chat_response = sllm.chat(chat_message_list) + chat_response = sllm.chat(_chat) pydantic_response = chat_response.raw - end_time = time.perf_counter() duration = int(ceil(end_time - start_time)) - metadata = dict(llm.metadata) metadata["llm_classname"] = llm.class_name() metadata["duration"] = duration - return { "pydantic_response": pydantic_response, "metadata": metadata, - "duration": duration + "duration": duration, } - try: - result = llm_executor.run(execute_function) - except PipelineStopRequested: - # Re-raise PipelineStopRequested without wrapping it - raise - except Exception as e: - logger.debug(f"LLM chat interaction failed: {e}") - logger.error("LLM chat interaction failed.", exc_info=True) - if user_prompt_index == 0: - logger.error("The first user prompt failed. This is a critical error. Please check the system prompt and user prompt.") - raise ValueError("LLM chat interaction failed.") from e - else: - logger.error(f"User prompt {user_prompt_index+1} failed. Continuing with next user prompt.") - continue - - assistant_content_raw: dict = result["pydantic_response"].model_dump() - # Compact JSON without newlines and spaces, since it's going to be parsed by the LLM. Pretty printing wastes input tokens for the LLM. - assistant_content: str = json.dumps(assistant_content_raw, separators=(',', ':')) - - chat_message_list.append( - ChatMessage( - role=MessageRole.ASSISTANT, - content=assistant_content, - ) + MAX_RETRIES = 5 + narrative: ArchetypeNarrative | None = None + for attempt in range(1, MAX_RETRIES + 1): + try: + result = llm_executor.run(execute_function) + narrative = result["pydantic_response"] + metadata_list.append(result["metadata"]) + logger.info(f"Archetype {archetype} succeeded on attempt {attempt}.") + break + except PipelineStopRequested: + raise + except Exception as e: + logger.warning(f"Archetype {archetype} attempt {attempt}/{MAX_RETRIES} failed: {e}") + if attempt == MAX_RETRIES: + logger.warning(f"Archetype {archetype} exhausted {MAX_RETRIES} attempts. Skipping — premortem will be partial.") + + if narrative is None: + continue + + # Program assigns IDs and bookkeeping — LLM only provides narrative content + assumption_item = AssumptionItem( + assumption_id=assumption_id, + statement=narrative.assumption, + test_now=narrative.test_now, + falsifier=f"The test reveals the assumption is false.", ) - - responses.append(result["pydantic_response"]) - metadata_list.append(result["metadata"]) - accumulated_chat_message_list = chat_message_list.copy() - - # Use the last response as the primary result - assumptions_to_kill: list[AssumptionItem] = [] - failure_modes: list[FailureModeItem] = [] - for response in responses: - assumptions_to_kill.extend(response.assumptions_to_kill) - failure_modes.extend(response.failure_modes) + failure_mode_item = FailureModeItem( + failure_mode_index=index, + root_cause_assumption_id=assumption_id, + failure_mode_archetype=archetype, + failure_mode_title=narrative.failure_title, + risk_analysis=narrative.failure_story, + early_warning_signs=narrative.warning_signs, + ) + assumptions_to_kill.append(assumption_item) + failure_modes.append(failure_mode_item) final_response = PremortemAnalysis( assumptions_to_kill=assumptions_to_kill, failure_modes=failure_modes ) - + json_response = final_response.model_dump() response_byte_count = len(json.dumps(json_response).encode('utf-8')) - + logger.info(f"LLM chat interaction completed. Response byte count: {response_byte_count}") - + metadata = {} metadata["models"] = metadata_list metadata["response_byte_count"] = response_byte_count @@ -229,7 +242,7 @@ def execute_function(llm: LLM) -> dict: metadata=metadata, markdown=markdown ) - + def to_dict(self, include_metadata=True, include_system_prompt=True, include_user_prompt=True, include_markdown=True) -> dict: d = self.response.copy() if include_metadata: @@ -255,10 +268,10 @@ def save_markdown(self, output_file_path: str): def _format_bullet_list(items: list[str]) -> str: """ Format a list of strings into a markdown bullet list. - + Args: items: List of strings to format as bullet points - + Returns: Formatted markdown bullet list """ @@ -269,7 +282,7 @@ def _calculate_risk_level_brief(likelihood: Optional[int], impact: Optional[int] """Calculates a qualitative risk level from likelihood and impact scores.""" if likelihood is None or impact is None: return "Not Scored" - + score = likelihood * impact if score >= 15: classification = "CRITICAL" @@ -279,7 +292,7 @@ def _calculate_risk_level_brief(likelihood: Optional[int], impact: Optional[int] classification = "MEDIUM" else: classification = "LOW" - + return f"{classification} ({score}/25)" @staticmethod @@ -287,7 +300,7 @@ def _calculate_risk_level_verbose(likelihood: Optional[int], impact: Optional[in """Calculates a qualitative risk level from likelihood and impact scores.""" if likelihood is None or impact is None: return f"Likelihood {likelihood}/5, Impact {impact}/5" - + score = likelihood * impact if score >= 15: classification = "CRITICAL" @@ -297,7 +310,7 @@ def _calculate_risk_level_verbose(likelihood: Optional[int], impact: Optional[in classification = "MEDIUM" else: classification = "LOW" - + return f"{classification} {score}/25 (Likelihood {likelihood}/5 × Impact {impact}/5)" @staticmethod @@ -306,7 +319,7 @@ def convert_to_markdown(premortem_analysis: PremortemAnalysis) -> str: Convert the premortem analysis to markdown format. """ rows = [] - + # Header rows.append("A premortem assumes the project has failed and works backward to identify the most likely causes.\n") @@ -319,11 +332,11 @@ def convert_to_markdown(premortem_analysis: PremortemAnalysis) -> str: for assumption in premortem_analysis.assumptions_to_kill: rows.append(f"| {assumption.assumption_id} | {assumption.statement} | {assumption.test_now} | {assumption.falsifier} |") rows.append("\n") - + # Failure Modes rows.append("## Failure Scenarios and Mitigation Plans\n") rows.append("Each scenario below links to a root-cause assumption and includes a detailed failure story, early warning signs, measurable tripwires, a response playbook, and a stop rule to guide decision-making.\n") - + # Summary Table for Failure Modes rows.append("### Summary of Failure Modes\n") rows.append("| ID | Title | Archetype | Root Cause | Owner | Risk Level |") @@ -345,16 +358,16 @@ def convert_to_markdown(premortem_analysis: PremortemAnalysis) -> str: rows.append(f"- **Owner**: {failure_mode.owner or 'Unassigned'}") risk_level_str = Premortem._calculate_risk_level_verbose(failure_mode.likelihood_5, failure_mode.impact_5) rows.append(f"- **Risk Level:** {risk_level_str}\n") - + rows.append("##### Failure Story") rows.append(f"{failure_mode.risk_analysis}\n") - + rows.append("##### Early Warning Signs") rows.append(Premortem._format_bullet_list(failure_mode.early_warning_signs)) - + rows.append("\n##### Tripwires") rows.append(Premortem._format_bullet_list(failure_mode.tripwires or ["No tripwires defined"])) - + rows.append("\n##### Response Playbook") rows.append(Premortem._format_bullet_list(failure_mode.playbook or ["No response actions defined"])) rows.append("\n") @@ -363,10 +376,10 @@ def convert_to_markdown(premortem_analysis: PremortemAnalysis) -> str: rows.append(f"**STOP RULE:** {stop_rule_text}\n") return "\n".join(rows) - + if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') - + from worker_plan_internal.llm_util.llm_executor import LLMExecutor, LLMModelFromName from worker_plan_internal.plan.find_plan_prompt import find_plan_prompt @@ -386,12 +399,12 @@ def convert_to_markdown(premortem_analysis: PremortemAnalysis) -> str: print(f"Query:\n{plan_prompt}\n\n") result = Premortem.execute(llm_executor=llm_executor, speed_vs_detail=SpeedVsDetailEnum.ALL_DETAILS_BUT_SLOW, user_prompt=plan_prompt) - + response_data = result.to_dict(include_metadata=True, include_system_prompt=False, include_user_prompt=False, include_markdown=False) - + print("\n\nResponse:") print(json.dumps(response_data, indent=2)) - + print(f"\n\nMarkdown Output:") print(result.markdown) - +