diff --git a/coolprompt/assistant.py b/coolprompt/assistant.py index 3a70501..b49830c 100644 --- a/coolprompt/assistant.py +++ b/coolprompt/assistant.py @@ -7,7 +7,7 @@ from coolprompt.task_detector.detector import TaskDetector from coolprompt.data_generator.generator import SyntheticDataGenerator from coolprompt.language_model.llm import DefaultLLM -from coolprompt.optimizer.hype import hype_optimizer +from coolprompt.optimizer.hype import HyPEOptimizer, HyPEROptimizer from coolprompt.optimizer.reflective_prompt import reflectiveprompt from coolprompt.optimizer.distill_prompt.run import distillprompt from coolprompt.utils.logging_config import logger, set_verbose, setup_logging @@ -23,10 +23,6 @@ CLASSIFICATION_TASK_TEMPLATE, GENERATION_TASK_TEMPLATE, ) -from coolprompt.utils.prompt_templates.hype_templates import ( - CLASSIFICATION_TASK_TEMPLATE_HYPE, - GENERATION_TASK_TEMPLATE_HYPE, -) from coolprompt.utils.correction.corrector import correct from coolprompt.utils.correction.rule import LanguageRule from coolprompt.prompt_assistant.prompt_assistant import PromptAssistant @@ -36,12 +32,8 @@ class PromptTuner: """Prompt optimization tool supporting multiple methods.""" TEMPLATE_MAP = { - (Task.CLASSIFICATION, Method.HYPE): CLASSIFICATION_TASK_TEMPLATE_HYPE, - (Task.CLASSIFICATION, Method.REFLECTIVE): CLASSIFICATION_TASK_TEMPLATE, - (Task.CLASSIFICATION, Method.DISTILL): CLASSIFICATION_TASK_TEMPLATE, - (Task.GENERATION, Method.HYPE): GENERATION_TASK_TEMPLATE_HYPE, - (Task.GENERATION, Method.REFLECTIVE): GENERATION_TASK_TEMPLATE, - (Task.GENERATION, Method.DISTILL): GENERATION_TASK_TEMPLATE, + Task.CLASSIFICATION: CLASSIFICATION_TASK_TEMPLATE, + Task.GENERATION: GENERATION_TASK_TEMPLATE, } def __init__( @@ -102,7 +94,7 @@ def get_task_prompt_template(self, task: str, method: str) -> str: The type of task, either "classification" or "generation". method (str): Optimization method to use. - Available methods are: ['hype', 'reflective', 'distill'] + Available methods are: ['hype', 'reflective', 'distill', 'hyper'] Returns: str: The prompt template for the given task. @@ -113,7 +105,7 @@ def get_task_prompt_template(self, task: str, method: str) -> str: ) task = validate_task(task) method = validate_method(method) - return self.TEMPLATE_MAP[(task, method)] + return self.TEMPLATE_MAP[task] def _get_dataset_split( self, @@ -182,7 +174,7 @@ def run( target (Iterable): Target iterable object for autoprompting optimization. method (str): Optimization method to use. - Available methods are: ['hype', 'reflective', 'distill'] + Available methods are: ['hype', 'reflective', 'distill', 'hyper'] Defaults to hype. metric (str): Metric to use for optimization. problem_description (str): a string that contains @@ -297,7 +289,7 @@ def run( prompt=start_prompt, task=task, problem_description=problem_description, - num_samples=generate_num_samples + num_samples=generate_num_samples, ) self.synthetic_dataset = dataset self.synthetic_target = target @@ -329,10 +321,28 @@ def run( logger.debug(f"Additional kwargs: {kwargs}") if method is Method.HYPE: - final_prompt = hype_optimizer( + hype_opt = HyPEOptimizer( model=self._target_model, + config=kwargs.get("hype_config", None), + meta_prompt=kwargs.get("hype_meta_prompt", None), + ) + meta_info = {"task_description": problem_description} + if kwargs.get("meta_info", None): + meta_info.update(kwargs["meta_info"]) + final_prompt = hype_opt.optimize( prompt=start_prompt, - problem_description=problem_description, + meta_info=meta_info, + ), + elif method is Method.HYPER: + hyper_opt = HyPEROptimizer( + model=self._target_model, + evaluator=evaluator, + **kwargs, + ) + final_prompt = hyper_opt.optimize( + prompt=start_prompt, + dataset_split=dataset_split, + meta_info={"task_description": problem_description}, ) elif method is Method.REFLECTIVE: final_prompt = reflectiveprompt( @@ -360,7 +370,7 @@ def run( ) logger.debug(f"Final prompt:\n{final_prompt}") - template = self.TEMPLATE_MAP[(task, method)] + template = self.TEMPLATE_MAP[task] logger.info(f"Evaluating on given dataset for {task} task...") self.init_metric = evaluator.evaluate( prompt=start_prompt, diff --git a/coolprompt/evaluator/evaluator.py b/coolprompt/evaluator/evaluator.py index c6dfc5a..5a0657a 100644 --- a/coolprompt/evaluator/evaluator.py +++ b/coolprompt/evaluator/evaluator.py @@ -1,7 +1,7 @@ -import random -from langchain_core.language_models.base import BaseLanguageModel -from typing import Optional +from dataclasses import dataclass +from typing import List, Optional +from langchain_core.language_models.base import BaseLanguageModel from langchain_core.messages.ai import AIMessage from coolprompt.evaluator.metrics import BaseMetric from coolprompt.utils.logging_config import logger @@ -12,6 +12,22 @@ ) +@dataclass +class FailedExampleDetailed: + instance: str + assistant_answer: str + model_answer_parsed: Optional[str] = None + metric_value: float | int = 0.0 + ground_truth: str | int = "" + + +@dataclass +class EvalResultDetailed: + aggregate_score: float + score_per_task: List[float | int] = None + failed_examples: List[FailedExampleDetailed] = None + + class Evaluator: """Evaluator class to perform model evaluation using a specified metric. @@ -64,7 +80,6 @@ def evaluate( logger.info( f"Evaluating prompt for {self.task} task on {len(dataset)} samples" ) - logger.debug(f"Prompt to evaluate:\n{prompt}") if self.task == Task.CLASSIFICATION: self.metric.extract_labels(targets) @@ -80,28 +95,64 @@ def evaluate( return self.metric.compute(answers, targets, dataset) - def _get_full_prompt( + def evaluate_detailed( self, prompt: str, - sample: str, + dataset: list[str], + targets: list[str | int], template: Optional[str] = None, - ) -> str: - """Inserts parts of the prompt into the task template. + ) -> EvalResultDetailed: + """Evaluate the model and return detailed results per sample.""" + if template is None: + template = self._get_default_template() - Args: - prompt (str): the main instruction for the task - sample (str): the input sample - template (Optional[str]): - Prompt template for defined task type. - If None, uses default template. + logger.info( + f"Evaluating (detailed) prompt for {self.task} task on {len(dataset)} samples" + ) + if self.task == Task.CLASSIFICATION: + self.metric.extract_labels(targets) + + answers = self.model.batch( + [ + self._get_full_prompt(prompt, sample, template) + for sample in dataset + ] + ) + answers = [ + a.content if isinstance(a, AIMessage) else a for a in answers + ] - Raises: - ValueError: if type of task is not supported + parsed_answers = [self.metric.parse_output(a) for a in answers] + aggregate_score, score_per_task = self.metric.compute_detailed( + answers, targets + ) - Returns: - str: the full prompt to be passed to the model - """ + failed_examples = [] + for i, score in enumerate(score_per_task): + if score == 0: + failed_examples.append( + FailedExampleDetailed( + instance=dataset[i], + assistant_answer=answers[i], + model_answer_parsed=parsed_answers[i], + metric_value=score, + ground_truth=targets[i], + ) + ) + + return EvalResultDetailed( + aggregate_score=aggregate_score, + score_per_task=score_per_task, + failed_examples=failed_examples, + ) + def _get_full_prompt( + self, + prompt: str, + sample: str, + template: Optional[str] = None, + ) -> str: + """Inserts parts of the prompt into the task template.""" if template is None: template = self._get_default_template() @@ -116,7 +167,6 @@ def _get_full_prompt( def _get_default_template(self) -> str: """Returns the default template for the task type.""" - match self.task: case Task.CLASSIFICATION: return CLASSIFICATION_TASK_TEMPLATE diff --git a/coolprompt/evaluator/metrics.py b/coolprompt/evaluator/metrics.py index 9c9b89f..bb815f7 100644 --- a/coolprompt/evaluator/metrics.py +++ b/coolprompt/evaluator/metrics.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Optional +from typing import List, Optional, Tuple from deepeval.metrics import GEval from deepeval.test_case import LLMTestCase, LLMTestCaseParams @@ -86,7 +86,7 @@ def _compute_raw( self, outputs: list[str | int], targets: list[str | int], - dataset: Optional[list[str]] = None + dataset: Optional[list[str]] = None, ) -> float: """Compute metric value from preprocessed model answers. @@ -120,7 +120,7 @@ def compute( self, outputs: list[str | int], targets: list[str | int], - dataset: Optional[list[str]] = None + dataset: Optional[list[str]] = None, ) -> float: """Compute metric value from text model outputs @@ -134,9 +134,7 @@ def compute( """ output_labels = list( map( - lambda x: extract_answer( - x, self.ANS_TAGS, self.FORMAT_MISMATCH_LABEL - ), + lambda x: extract_answer(x, self.ANS_TAGS, self.FORMAT_MISMATCH_LABEL), outputs, ) ) @@ -144,9 +142,38 @@ def compute( encoded_output_labels, encoded_targets = self._encode_labels( output_labels, targets ) - return self._compute_raw( - encoded_output_labels, encoded_targets, dataset - ) + return self._compute_raw(encoded_output_labels, encoded_targets, dataset) + + def parse_output(self, output: str) -> str: + """Extract parsed answer from model output. + + Args: + output: Raw model output string. + + Returns: + Extracted answer from tags, or original output if not found. + """ + return extract_answer(output, self.ANS_TAGS, format_mismatch_label=output) + + def compute_detailed( + self, + outputs: list[str | int], + targets: list[str | int], + dataset: Optional[list[str]] = None, + ) -> Tuple[float, List[float | int]]: + """Compute metric value per sample and aggregate. + + Returns: + Tuple of (aggregate_score, score_per_task). + score_per_task[i] - score for i-th sample. + aggregate_score - same as compute(). + """ + score_per_task = [] + for o, t in zip(outputs, targets): + s = self._compute_raw([o], [t], dataset) + score_per_task.append(s) + aggregate = self.compute(outputs, targets, dataset) + return aggregate, score_per_task def __str__(self) -> str: return self._get_name() @@ -219,7 +246,7 @@ class GenerationMetric(BaseMetric): FORMAT_MISMATCH_LABEL = "" - def __init__(self): + def __init__(self, name=None): """Initialize metric""" super().__init__() @@ -316,6 +343,15 @@ def _compute_raw(self, outputs, targets, dataset): f1_list = super()._compute_raw(outputs, targets) return sum(f1_list) / len(f1_list) + def compute_detailed( + self, + outputs: list[str | int], + targets: list[str | int], + dataset: Optional[list[str]] = None, + ) -> Tuple[float, List[float]]: + f1_list = super()._compute_raw(outputs, targets, dataset) + return sum(f1_list) / len(f1_list), f1_list + class LLMAsJudge(GenerationMetric): """LLM-as-a-judge metric for generation tasks.""" @@ -462,6 +498,21 @@ def _compute_raw(self, outputs, targets, dataset): outputs = [extract_number_from_text(item) for item in outputs] return float(mean([o == t for o, t in zip(outputs, targets)])) + def compute_detailed( + self, + outputs: list[str | int], + targets: list[str | int], + dataset: Optional[list[str]] = None, + ) -> Tuple[float, List[int]]: + targets = [extract_number_from_text(item) for item in targets] + outputs = [extract_number_from_text(item) for item in outputs] + score_per_task = [1 if o == t else 0 for o, t in zip(outputs, targets)] + return mean(score_per_task), score_per_task + + def parse_output(self, output: str) -> str: + extracted = extract_answer(output, self.ANS_TAGS, format_mismatch_label=output) + return extract_number_from_text(extracted) + def define_lang(outputs, targets): langs = [detect_language(target) for target in targets] @@ -469,8 +520,7 @@ def define_lang(outputs, targets): CLASSIFICATION_METRIC_NAME_MAPPING = { - metric._get_name(): metric - for metric in ClassificationMetric.__subclasses__() + metric._get_name(): metric for metric in ClassificationMetric.__subclasses__() } GENERATION_METRIC_NAME_MAPPING = { @@ -509,8 +559,9 @@ def validate_and_create_metric( return CLASSIFICATION_METRIC_NAME_MAPPING[metric]() error_msg = ( f"Invalid metric for {task} task: {metric}. " - f"Available metrics: {', '.join( - CLASSIFICATION_METRIC_NAME_MAPPING.keys())}." + f"Available metrics: { + ', '.join(CLASSIFICATION_METRIC_NAME_MAPPING.keys()) + }." ) logger.error(error_msg) raise ValueError(error_msg) @@ -544,8 +595,9 @@ def validate_and_create_metric( return GENERATION_METRIC_NAME_MAPPING[metric]() error_msg = ( f"Invalid metric for {task} task: {metric}. " - f"Available metrics: {', '.join( - GENERATION_METRIC_NAME_MAPPING.keys())}." + f"Available metrics: { + ', '.join(GENERATION_METRIC_NAME_MAPPING.keys()) + }." ) logger.error(error_msg) raise ValueError(error_msg) diff --git a/coolprompt/optimizer/hype/__init__.py b/coolprompt/optimizer/hype/__init__.py index c0ebaa4..264e83f 100644 --- a/coolprompt/optimizer/hype/__init__.py +++ b/coolprompt/optimizer/hype/__init__.py @@ -1,5 +1,5 @@ -from coolprompt.optimizer.hype.hype import hype_optimizer +from ...assistant import PromptTuner __all__ = [ - 'hype_optimizer' + "PromptTuner", ] diff --git a/coolprompt/optimizer/hype/feedback_module.py b/coolprompt/optimizer/hype/feedback_module.py new file mode 100644 index 0000000..1b97bc0 --- /dev/null +++ b/coolprompt/optimizer/hype/feedback_module.py @@ -0,0 +1,182 @@ +"""FeedbackModule for generating prompt improvement recommendations.""" + +import random +from typing import Any, List, Optional + +from coolprompt.evaluator.evaluator import FailedExampleDetailed +from coolprompt.utils.parsing import extract_json, get_model_answer_extracted + + +FEEDBACK_PROMPT_TEMPLATE = """You are an expert prompt engineer. + +The prompt was evaluated on benchmark task and failed on some examples. You will be given with a prompt and an example. + +Prompt: + +{prompt} + + +Failed task: + +{instance} + + +Model answer (raw): + +{model_answer} + + +Model answer (parsed): + +{model_answer_parsed} + + +Metric value: {metric_value} + +Сorrect answer: + +{ground_truth} + + +Identify the core reasoning error pattern. + +Give ONE general, universal recommendation to improve the prompt (no task-special details). + +Format: Consice, max 20-25 words, starts with action verb. Output nothing but the actual recommendation. Avoid meta‑comments (e.g., "similar to…", "as before…") – the recommendation must stand alone. + +Example: "Require step-by-step reasoning before classifying." + +Recommendation: +""" + +FILTER_RECOMMENDATIONS_PROMPT = """You have a list of recommendations for prompt improvement: + +{recommendations} + +TASK: +1. Group them into conceptual clusters (similar ideas). +2. For each cluster, **synthesize a single, new recommendation** that captures the essence of all items in that cluster. Do not just copy an existing one. +3. Rank clusters by size (largest first). If some clusters conflict - drop the less ones. +4. Output ONLY a JSON array of the synthesized recommendations, in rank order. + +GOOD EXAMPLES: +Input: ["step-by-step", "break down calc", "don't show work", "format clearly"] +Correct output: ["Require detailed step-by-step reasoning with calculations", "Specify the desired output format explicitly"] +Why good: +- Captured main ideas of reasoning cluster into 1 strong rec +- Didn't loose cluster from "format clearly" +- Resolved conflict: "don't show work" is less frequent recommendation, so its cluster was dropped + +BAD EXAMPLES: +Input: ["Focus on clarifying the output format requirements", + "Add examples of expected responses to the prompt", + "Make sure to specify exact sentiment labels", + "Include examples to avoid confusion with similar labels", + "Focus on tone analysis in the text", + "Clarify what constitutes positive vs negative", + "Add examples of positive responses", + "Similar to previous - add more examples"] +Wrong output: ["Similar to previous - add more examples", "Add examples of positive responses", "Make sure to specify exact sentiment labels", "Focus on tone analysis in the text"] +Why bad: +- "Similar to previous" = meta-trash +- No synthesis of 6+ example recs into 1 strong rec, uses only existing recommendations +- Two different recommendations with a similiar intent: adding examples (duplicates) +""" + + +class FeedbackModule: + """Generates recommendations for improving prompts based on failed examples.""" + + def __init__(self, model: Any) -> None: + self.model = model + + def generate_recommendation( + self, + prompt: str, + instance: str, + model_answer: str, + model_answer_parsed: Optional[str] = None, + metric_value: float | int = 0.0, + ground_truth: str | int = "", + ) -> str: + """Generate a single recommendation for a failed example. + + Args: + prompt: The original prompt that was used. + instance: The task instance (input/question). + model_answer: The model's answer (incorrect, raw). + model_answer_parsed: The model's parsed answer (for metric calculation). + metric_value: The metric value for this answer. + ground_truth: The correct answer. + + Returns: + A recommendation string for improving the prompt. + """ + formatted_prompt = FEEDBACK_PROMPT_TEMPLATE.format( + prompt=prompt, + instance=instance, + model_answer=model_answer, + model_answer_parsed=model_answer_parsed or "", + metric_value=metric_value, + ground_truth=ground_truth, + ) + result = get_model_answer_extracted(self.model, formatted_prompt) + return self._process_output(result) + + def generate_recommendations( + self, + prompt: str, + failed_examples: List[FailedExampleDetailed], + ) -> List[str]: + """Generate recommendations for all failed examples. + + Args: + prompt: The original prompt that was used. + failed_examples: List of failed examples. + + Returns: + List of recommendation strings. + """ + return [ + self.generate_recommendation( + prompt=prompt, + instance=fe.instance, + model_answer=fe.assistant_answer, + model_answer_parsed=fe.model_answer_parsed, + metric_value=fe.metric_value, + ground_truth=fe.ground_truth, + ) + for fe in failed_examples + ] + + def filter_recommendations(self, recommendations: List[str]) -> List[str]: + """Filter and deduplicate recommendations using LLM. + + Args: + recommendations: List of recommendation strings. + + Returns: + Deduplicated and filtered list of recommendations. + """ + if not recommendations: + return [] + + formatted_recs = "\n".join( + f"{i + 1}. {rec}" for i, rec in enumerate(recommendations) + ) + prompt = FILTER_RECOMMENDATIONS_PROMPT.format( + recommendations=formatted_recs + ) + result = get_model_answer_extracted(self.model, prompt) + try: + data = extract_json(result) + if data and isinstance(data, list): + return [str(x) for x in data] + except Exception: + pass + + return random.sample(recommendations, min(3, len(recommendations))) + + def _process_output(self, output: Any) -> str: + """Process model output to extract recommendation.""" + return output if isinstance(output, str) else str(output) diff --git a/coolprompt/optimizer/hype/hype.py b/coolprompt/optimizer/hype/hype.py index b96f2d5..2cc8d7e 100644 --- a/coolprompt/optimizer/hype/hype.py +++ b/coolprompt/optimizer/hype/hype.py @@ -1,47 +1,115 @@ -from langchain_core.language_models.base import BaseLanguageModel +from abc import ABC, abstractmethod +from typing import Any, List, Optional, Union -from coolprompt.utils.logging_config import logger -from coolprompt.utils.prompt_templates.hype_templates import ( - HYPE_PROMPT_TEMPLATE, -) -from coolprompt.utils.parsing import ( - extract_answer, - get_model_answer_extracted, - safe_template, +from coolprompt.utils.parsing import extract_answer, get_model_answer_extracted +from coolprompt.utils.prompt_templates.hyper_templates import ( + HypeMetaPromptBuilder, + HypeMetaPromptConfig, + META_INFO_SECTION, + META_PROMPT_SECTIONS, ) -INSTRUCTIVE_PROMPT_TAGS = ("[PROMPT_START]", "[PROMPT_END]") +def _build_full_meta_prompt_template(builder: HypeMetaPromptBuilder) -> str: + body = builder.build_meta_prompt() + return ( + body + + "\n\nUser query:\n\n{QUERY}\n\n" + + "{META_INFO_BLOCK}" + ) -def hype_optimizer( - model: BaseLanguageModel, prompt: str, problem_description: str -) -> str: - """Rewrites prompt by injecting it - into predefined template and querying LLM. - Args: - model (BaseLanguageModel): Any LangChain BaseLanguageModel instance. - prompt (str): Input prompt to optimize. - problem_description (str): Brief description of the task, explaining - its domain. - Returns: - str: LLM-generated rewritten prompt. - """ +class Optimizer(ABC): + def __init__(self, model): + self.model = model - logger.info("Running HyPE optimization...") - logger.debug(f"Start prompt:\n{prompt}") + @abstractmethod + def optimize(self): + pass - query = safe_template( - HYPE_PROMPT_TEMPLATE, - PROBLEM_DESCRIPTION=problem_description, - QUERY=prompt, - ) - answer = get_model_answer_extracted(model, query) +class HyPEOptimizer(Optimizer): + def __init__( + self, + model, + config: Optional[HypeMetaPromptConfig] = None, + meta_prompt: Optional[str] = None, + ) -> None: + super().__init__(model) + self.builder = HypeMetaPromptBuilder(config) + if meta_prompt is not None: + self.meta_prompt = meta_prompt + else: + self.meta_prompt = _build_full_meta_prompt_template(self.builder) - logger.info("HyPE optimization completed") - logger.debug(f"Raw HyPE output:\n{answer}") + def get_section(self, name: str) -> Any: + """Returns the current value of the section (for recommendations — List[str]).""" + if name not in META_PROMPT_SECTIONS: + raise ValueError( + f"Unknown section: {name}. Expected: {META_PROMPT_SECTIONS}" + ) + if name == "recommendations": + return list(self.builder.config.recommendations) + if name == "constraints": + return list(self.builder.config.constraints) + return self.builder.get_cached_section(name) - return extract_answer( - answer, INSTRUCTIVE_PROMPT_TAGS, format_mismatch_label=answer - ) + def update_section( + self, + name: str, + value: Union[str, List[str]], + ) -> None: + """Updates the section and rebuilds the meta-prompt.""" + if name not in META_PROMPT_SECTIONS: + raise ValueError( + f"Unknown section: {name}. Expected: {META_PROMPT_SECTIONS}" + ) + if name == "recommendations": + self.builder.config.recommendations = list(value) + elif name == "constraints": + self.builder.config.constraints = list(value) + elif name == "output_format" and isinstance(value, str): + self.builder.config.output_format_section = value + else: + raise ValueError(f"update_section for {name}: unsupported value type") + self.builder.rebuild_all_sections() + self._rebuild_meta_prompt() + + def _rebuild_meta_prompt(self) -> None: + self.meta_prompt = _build_full_meta_prompt_template(self.builder) + + def set_meta_prompt(self, meta_prompt: str) -> None: + self.meta_prompt = meta_prompt + + def optimize( + self, + prompt: str, + meta_info: Optional[dict[str, Any]] = None, + n_prompts: int = 1, + ) -> Union[str, List[str]]: + query = self._format_meta_prompt(prompt, **(meta_info or {})) + raw_result = get_model_answer_extracted(self.model, query, n=n_prompts) + if n_prompts == 1: + return self._process_model_output(raw_result) + return [self._process_model_output(r) for r in raw_result] + + def _format_meta_prompt(self, prompt: str, **kwargs) -> str: + if kwargs: + meta_info_content = "\n".join([f"{k}: {v}" for k, v in kwargs.items()]) + meta_info_block = META_INFO_SECTION.format( + meta_info_content=meta_info_content + ) + else: + meta_info_block = "" + + return self.meta_prompt.format(QUERY=prompt, META_INFO_BLOCK=meta_info_block) + + RESULT_PROMPT_TAGS = ("", "") + + def _process_model_output(self, output: Any) -> str: + result = extract_answer( + output, + self.RESULT_PROMPT_TAGS, + format_mismatch_label=output, + ) + return result if isinstance(result, str) else str(result) diff --git a/coolprompt/optimizer/hype/hyper.py b/coolprompt/optimizer/hype/hyper.py new file mode 100644 index 0000000..68e3bb2 --- /dev/null +++ b/coolprompt/optimizer/hype/hyper.py @@ -0,0 +1,222 @@ +"""HyPEROptimizer: HyPE with iterative refinement via recommendations.""" + +import random +from typing import Any, List, Optional, Sequence, Tuple + +from tqdm import tqdm + +from coolprompt.optimizer.hype.hype import HyPEOptimizer, Optimizer +from coolprompt.optimizer.hype.feedback_module import FeedbackModule +from coolprompt.utils.parsing import get_model_answer_extracted +from coolprompt.evaluator.evaluator import ( + Evaluator, + EvalResultDetailed, +) + + +def sample_mini_batch( + dataset: Sequence[str], + targets: Sequence[str | int], + size: int, + seed: Optional[int] = None, +) -> Tuple[List[str], List[str | int]]: + """Sample a mini-batch from the dataset. + + Returns: + (samples, targets) - lists of length size (or less if dataset is smaller). + """ + import random + + rng = random.Random(seed) + n = min(size, len(dataset)) + indices = rng.sample(range(len(dataset)), n) + return ( + [dataset[i] for i in indices], + [targets[i] for i in indices], + ) + + +def compute_pareto_front( + candidates: List[str], + results: List[EvalResultDetailed], +) -> List[Tuple[str, EvalResultDetailed]]: + """Compute Pareto front from candidates based on score_per_task. + + A candidate dominates another if its score_per_task >= other.score_per_task + for all tasks and > for at least one. + + Returns: + List of (candidate, result) that belong to the Pareto front. + """ + n = len(candidates) + is_pareto = [True] * n + + for i in range(n): + if not is_pareto[i]: + continue + for j in range(n): + if i == j or not is_pareto[j]: + continue + # Check if i dominates j + i_scores = results[i].score_per_task + j_scores = results[j].score_per_task + if not i_scores or not j_scores: + continue + if len(i_scores) != len(j_scores): + continue + i_dominates_j = all( + i_s >= j_s for i_s, j_s in zip(i_scores, j_scores) + ) and any(i_s > j_s for i_s, j_s in zip(i_scores, j_scores)) + if i_dominates_j: + is_pareto[j] = False + + return [(candidates[i], results[i]) for i in range(n) if is_pareto[i]] + + +class HyPEROptimizer(Optimizer): + """HyPE with iterative refinement via evaluation-based recommendations.""" + + def __init__( + self, + model: Any, + evaluator: Evaluator, + *, + n_iterations: int = 5, + patience: int = None, + n_candidates: int = 3, + top_n_candidates: int = 3, + k_samples: int = 3, + mini_batch_size: int = 16, + ) -> None: + super().__init__(model) + self.hype_module = HyPEOptimizer(model) + self.evaluator = evaluator + self.feedback_module = FeedbackModule(model) + self.n_iterations = n_iterations + self.patience = patience + self.n_candidates = n_candidates + self.top_n_candidates = top_n_candidates + self.k_samples = k_samples + self.mini_batch_size = mini_batch_size + + def _get_variants_from_best(self, best_prompt: str, n_candidates: int) -> List[str]: + paraphrase_prompt = f"""Generate an alternative version of the following prompt. The new version must: +- Use different words, sentence structure, and tone (e.g., more formal, casual, or creative). +- Preserve the original meaning, key details, and language. +- Vary in length: slightly shorter or longer (up to 10%). +- Feel natural and coherent. +- Output only the text of the alternative prompt, without any additional commentary or formatting. + +Original prompt: +{best_prompt} + +Alternative prompt:""" + raw_result = get_model_answer_extracted( + self.model, paraphrase_prompt, n=n_candidates, temperature=0.9 + ) + return [best_prompt] + [self._process_model_output(r) for r in raw_result] + + def _process_model_output(self, output: Any) -> str: + return output if isinstance(output, str) else str(output) + + def optimize( + self, + prompt: str, + dataset_split: Tuple[ + Sequence[str], Sequence[str], Sequence[str], Sequence[str] + ], + meta_info: Optional[dict[str, Any]] = None, + ) -> str: + """Generate candidates, evaluate, update recommendations, repeat.""" + train_samples, val_samples, train_targets, val_targets = dataset_split + best_prompt = prompt + best_score = self.evaluator.evaluate( + prompt, + list(val_samples), + list(val_targets), + batch_size=50, + show_progress=False, + ) + patience_counter = 0 + + for iteration in tqdm(range(self.n_iterations), desc="HyPER iterations"): + # 1. Generate candidates from best_prompt + candidates = self._get_variants_from_best( + best_prompt, n_candidates=self.n_candidates + ) + + if not candidates: + return best_prompt + + # 2. Mini-batch from train + samples, sample_targets = sample_mini_batch( + train_samples, train_targets, self.mini_batch_size + ) + if not samples: + continue + + # 3. Evaluate candidates on mini-batch via evaluate_detailed + results: List[EvalResultDetailed] = [ + self.evaluator.evaluate_detailed(cand, samples, sample_targets) + for cand in candidates + ] + + # 4. Pareto front + pareto_front = compute_pareto_front(candidates, results) + + # Fallback: if all candidates are in front, sort by aggregate_score + if len(pareto_front) == len(candidates) and self.top_n_candidates < len( + candidates + ): + scored = sorted( + zip(candidates, results), + key=lambda x: x[1].aggregate_score, + reverse=True, + ) + pareto_front = scored[: self.top_n_candidates] + + if not pareto_front: + continue + + # 5. Collect recommendations for all candidates from Pareto front + all_recs: List[str] = [] + for cand_prompt, res in pareto_front: + failed_sample = random.sample( + res.failed_examples, + min(self.k_samples, len(res.failed_examples)), + ) + recs = self.feedback_module.generate_recommendations( + cand_prompt, failed_sample + ) + all_recs.extend(recs) + + # Filter and update recommendations + all_recs = self.feedback_module.filter_recommendations(all_recs) + + self.hype_module.update_section("recommendations", all_recs) + + # 6. For each candidate from Pareto front + for cand_prompt, res in pareto_front: + optimized_prompt = self.hype_module.optimize( + cand_prompt, meta_info=meta_info + ) + + val_score = self.evaluator.evaluate( + optimized_prompt, + list(val_samples), + list(val_targets), + batch_size=50, + show_progress=False, + ) + + if val_score > best_score: + best_score = val_score + best_prompt = optimized_prompt + patience_counter = 0 + else: + patience_counter += 1 + + if self.patience and patience_counter >= self.patience: + break + + return best_prompt diff --git a/coolprompt/utils/parsing.py b/coolprompt/utils/parsing.py index ebec72e..515c3ee 100644 --- a/coolprompt/utils/parsing.py +++ b/coolprompt/utils/parsing.py @@ -1,7 +1,7 @@ from dirtyjson import DirtyJSONLoader from typing import Tuple + from langchain_core.language_models.base import BaseLanguageModel -from langchain_core.messages.ai import AIMessage def extract_answer( @@ -55,13 +55,13 @@ def safe_template(template: str, **kwargs) -> str: return template.format(**escaped) -def extract_json(text: str) -> dict | None: - """Extracts the first valid JSON with one text value from the `text`. +def extract_json(text: str) -> dict | list | None: + """Extracts the first valid JSON (object or array) from the text. Args: - text (str): text with JSON-lke substrings. + text (str): text with JSON-like substrings. Returns: - result (dict | None): dict from JSON or None + result (dict | list | None): dict or list from JSON or None (if no valid JSON substrings found). """ @@ -72,13 +72,30 @@ def extract_json(text: str) -> dict | None: pos = 0 while pos < len(text): + # Find both { and [ start_pos = text.find("{", pos) - if start_pos == -1: + bracket_pos = text.find("[", pos) + + # Get earliest position + if start_pos == -1 and bracket_pos == -1: break + elif start_pos == -1: + search_pos = bracket_pos + elif bracket_pos == -1: + search_pos = start_pos + else: + search_pos = min(start_pos, bracket_pos) + try: - return dict(loader.decode(start_index=start_pos)) - except: - pos = start_pos + 1 + result = loader.decode(start_index=search_pos) + if isinstance(result, dict): + return dict(result) + elif isinstance(result, list): + return list(result) + except Exception: + pass + + pos = search_pos + 1 return None @@ -118,21 +135,46 @@ def parse_assistant_response(answer: str) -> str: return answer.strip() -def get_model_answer_extracted(llm: BaseLanguageModel, prompt: str) -> str: - """Gets `llm`'s response for the `prompt` and extracts the answer. - - Args: - llm (BaseLanguageModel): LangChain language model. - prompt (str): prompt for the model. - Returns: - str: extracted answer or empty string if there is no final answer. - """ +from typing import Tuple - answer = llm.invoke(prompt) - if isinstance(answer, AIMessage): - answer = answer.content +def get_model_answer_extracted( + llm: BaseLanguageModel, + prompt: str, + n: int = 1, + temperature=None, +): + if temperature is not None: + llm = llm.bind(temperature=temperature) - answer = parse_assistant_response(answer) + if n == 1: + resp = llm.invoke(prompt) + text = resp.content if hasattr(resp, "content") else str(resp) + return parse_assistant_response(text) - return answer + if hasattr(llm, "generate"): + try: + llm_n = llm.bind(n=n) + result = llm_n.generate([prompt]) + gens = result.generations[0] + + outputs = [] + for g in gens: + text = getattr(g, "text", str(g)) + outputs.append(parse_assistant_response(text)) + + if len(outputs) >= n: + return outputs[:n] + except Exception: + pass + + duplicated = [prompt] * n + responses = llm.batch(duplicated) + + outputs = [] + for r in responses: + text = r.content if hasattr(r, "content") else str(r) + outputs.append(parse_assistant_response(text)) + outputs = list(dict.fromkeys(outputs)) # hard deduplication + + return outputs diff --git a/coolprompt/utils/prompt_templates/hyper_templates.py b/coolprompt/utils/prompt_templates/hyper_templates.py new file mode 100644 index 0000000..499c7bd --- /dev/null +++ b/coolprompt/utils/prompt_templates/hyper_templates.py @@ -0,0 +1,256 @@ +from dataclasses import dataclass, field +from typing import List, Optional + + +TARGET_PROMPT_FORMS = ["hypothetical ", "instructional "] + + +SIMPLE_HYPOTHETICAL_PROMPT = ( + "Write a {target_prompt_form}prompt that will solve the user query effectively." +) + +META_INFO_SECTION = ( + "Task-related meta-information which you must mention generating a new prompt:\n\n{meta_info_content}\n\n" +) + +META_PROMPT_SECTIONS = ( + "role", + "prompt_structure", + "recommendations", + "constraints", + "output_format", +) + + +@dataclass +class PromptSectionSpec: + name: str + description: str + + +@dataclass +class HypeMetaPromptConfig: + target_prompt_form: str = "hypothetical instructional " + require_markdown_prompt: bool = False + include_role: bool = True + section_names: List[str] = field( + default_factory=lambda: [ + "Role", + "Task context", + "Instructions", + "Output requirements", + ] + ) + section_specs: List[PromptSectionSpec] = field( + default_factory=lambda: [ + PromptSectionSpec( + name="Role", + description=( + "Briefly define the assistant's role and expertise " + "relevant to the user query." + ), + ), + PromptSectionSpec( + name="Task context", + description=( + "Summarize the user's query and any provided meta-information, " + "keeping all important constraints and domain details." + ), + ), + PromptSectionSpec( + name="Instructions", + description=( + "Main part - instructions the assistant must follow " + "to solve the user's query while respecting constraints." + ), + ), + PromptSectionSpec( + name="Output requirements", + description=( + "Clearly specify the desired tone " + "and the required level of detail for the assistant's answer. " + "If the user explicitly requests a particular output format or provides " + "an example response, restate that format and include the example verbatim, " + "without inventing any additional formatting or examples. Do not introduce any output format or examples that the user did not mention." + "CRITICAL: You MUST include the exact output format specified in the user's query or meta information block " + "as a requirement in your generated prompt if it was specified." + ), + ), + ] + ) + constraints: List[str] = field( + default_factory=lambda: [ + "Preserve the language of the user's query.", + "Preserve all code snippets, inline code, technical terms and special formatting.", + "Do not remove or alter any explicit formatting instructions from the user.", + "Do not change numerical values, units, or identifiers.", + ] + ) + recommendations: List[str] = field(default_factory=list) + output_format_section: Optional[str] = None + _cached_sections: dict = field(default_factory=dict, repr=False) + + +class HypeMetaPromptBuilder: + ROLE_LINE = "You are an expert prompt engineer.\n" + TASK_SECTION_TEMPLATE = ( + "Your only task is to write a {target_prompt_form}prompt that will " + "solve the user query as effectively as possible.\n" + "Do not answer the user query directly; only produce the new prompt.\n\n" + ) + + PROMPT_STRUCTURE_SECTION_TEMPLATE = ( + "### STRUCTURE OF THE PROMPT YOU MUST PRODUCE\n" + "The prompt you write MUST be structured into the following sections, " + "in this exact order, and each section must follow its guidelines:\n" + "{sections_with_guidelines}\n\n" + ) + + CONSTRAINTS_SECTION_TEMPLATE = "### HARD CONSTRAINTS\n{constraints_list}\n\n" + + RECOMMENDATIONS_SECTION_TEMPLATE = ( + "### RECOMMENDATIONS\n" + "Use these recommendations for writing the new prompt, " + "based on analysis of previous generations:\n" + "{recommendations_list}\n\n" + ) + + BASE_OUTPUT_FORMAT_SECTION = ( + "### YOUR RESPONSE FORMAT\n" + "Return ONLY the resulting prompt, wrapped in the following XML tags:\n" + "\n" + " ...your resulting prompt here...\n" + "\n" + "Do not include any explanations or additional text outside this XML element.\n\n" + ) + + MARKDOWN_OUTPUT_REQUIREMENTS = ( + "#### Markdown formatting for the resulting prompt\n" + "- Write the entire prompt inside using valid Markdown.\n" + "- Use headings (e.g., `#`, `##`) for major sections of the prompt.\n" + "- Use bulleted lists (e.g., `-` or `*`) for enumerations and checklists.\n" + "- Preserve any code or pseudo-code using fenced code blocks (``` ... ```).\n" + "- Do not introduce any additional formatting beyond what is necessary to make " + "the prompt clear and well-structured." + ) + + HYPE_META_PROMPT_TEMPLATE = ( + "{role_section}" + "{prompt_structure_section}" + "{recommendations_section}" + "{constraints_section}" + "{output_format_section}" + ) + + def __init__(self, config: HypeMetaPromptConfig | None = None) -> None: + self.config = config or HypeMetaPromptConfig() + self._cache_all_sections() + + def _cache_all_sections(self) -> None: + self.config._cached_sections = { + "role": self.build_role_section(), + "prompt_structure": self.build_prompt_structure_section(), + "output_format": self.build_output_format_section(), + } + + def get_cached_section(self, name: str) -> Optional[str]: + return self.config._cached_sections.get(name) + + # ----- секция роли ----- + def build_role_section(self, include_role: bool | None = None) -> str: + include_role = ( + include_role if include_role is not None else self.config.include_role + ) + form = self.config.target_prompt_form or "" + task_part = self.TASK_SECTION_TEMPLATE.format(target_prompt_form=form) + if include_role: + return self.ROLE_LINE + task_part + return task_part + + # ----- секция формата (список имён секций) ----- + def build_prompt_structure_section( + self, + specs: list[PromptSectionSpec] | None = None, + ) -> str: + specs = specs or self.config.section_specs + lines = [f"- [{spec.name}] {spec.description}" for spec in specs] + return self.PROMPT_STRUCTURE_SECTION_TEMPLATE.format( + sections_with_guidelines="\n".join(lines) + ) if lines else "" + + # ----- секция рекомендаций (на основе анализа предыдущих генераций) ----- + def build_recommendations_section( + self, + recommendations: List[str] | None = None, + ) -> str: + recs = ( + recommendations + if recommendations is not None + else self.config.recommendations + ) + if not recs: + return "" + lines = "\n".join(f"- {r}" for r in recs) + return self.RECOMMENDATIONS_SECTION_TEMPLATE.format(recommendations_list=lines) + + # ----- секция жёстких ограничений ----- + def build_constraints_section( + self, + constraints: List[str] | None = None, + ) -> str: + constraints = constraints or self.config.constraints + if not constraints: + return "" + lines = "\n".join(f"- {c}" for c in constraints) + return self.CONSTRAINTS_SECTION_TEMPLATE.format(constraints_list=lines) + + def build_output_format_section(self) -> str: + # если в конфиге уже передан кастомный текст — используем его как базу + section = self.config.output_format_section or self.BASE_OUTPUT_FORMAT_SECTION + if self.config.require_markdown_prompt: + section = section + self.MARKDOWN_OUTPUT_REQUIREMENTS + return section + + # ----- сборка всего мета‑промпта ----- + def build_meta_prompt( + self, + *, + target_prompt_form: str | None = None, + section_specs: List[PromptSectionSpec] | None = None, + recommendations: List[str] | None = None, + constraints: List[str] | None = None, + output_format_section: str | None = None, + include_role: bool | None = None, + ) -> str: + # локальный override конфигов + if target_prompt_form is not None: + self.config.target_prompt_form = target_prompt_form + if section_specs is not None: + self.config.section_specs = section_specs + if recommendations is not None: + self.config.recommendations = recommendations + if constraints is not None: + self.config.constraints = constraints + if output_format_section is not None: + self.config.output_format_section = output_format_section + if include_role is not None: + self.config.include_role = include_role + + role_section = self.build_role_section(include_role=include_role) + prompt_structure_section = self.build_prompt_structure_section() + recommendations_section = self.build_recommendations_section( + recommendations=recommendations + ) + constraints_section = self.build_constraints_section() + output_format_section = self.build_output_format_section() + + return self.HYPE_META_PROMPT_TEMPLATE.format( + role_section=role_section, + prompt_structure_section=prompt_structure_section, + recommendations_section=recommendations_section, + constraints_section=constraints_section, + output_format_section=output_format_section, + ) + + def rebuild_all_sections(self) -> None: + self._cache_all_sections()