diff --git a/coolprompt/assistant.py b/coolprompt/assistant.py index 3a70501..93e12af 100644 --- a/coolprompt/assistant.py +++ b/coolprompt/assistant.py @@ -7,7 +7,7 @@ from coolprompt.task_detector.detector import TaskDetector from coolprompt.data_generator.generator import SyntheticDataGenerator from coolprompt.language_model.llm import DefaultLLM -from coolprompt.optimizer.hype import hype_optimizer +from coolprompt.optimizer.hype import HyPEOptimizer, HyPEROptimizer from coolprompt.optimizer.reflective_prompt import reflectiveprompt from coolprompt.optimizer.distill_prompt.run import distillprompt from coolprompt.utils.logging_config import logger, set_verbose, setup_logging @@ -23,10 +23,6 @@ CLASSIFICATION_TASK_TEMPLATE, GENERATION_TASK_TEMPLATE, ) -from coolprompt.utils.prompt_templates.hype_templates import ( - CLASSIFICATION_TASK_TEMPLATE_HYPE, - GENERATION_TASK_TEMPLATE_HYPE, -) from coolprompt.utils.correction.corrector import correct from coolprompt.utils.correction.rule import LanguageRule from coolprompt.prompt_assistant.prompt_assistant import PromptAssistant @@ -36,12 +32,8 @@ class PromptTuner: """Prompt optimization tool supporting multiple methods.""" TEMPLATE_MAP = { - (Task.CLASSIFICATION, Method.HYPE): CLASSIFICATION_TASK_TEMPLATE_HYPE, - (Task.CLASSIFICATION, Method.REFLECTIVE): CLASSIFICATION_TASK_TEMPLATE, - (Task.CLASSIFICATION, Method.DISTILL): CLASSIFICATION_TASK_TEMPLATE, - (Task.GENERATION, Method.HYPE): GENERATION_TASK_TEMPLATE_HYPE, - (Task.GENERATION, Method.REFLECTIVE): GENERATION_TASK_TEMPLATE, - (Task.GENERATION, Method.DISTILL): GENERATION_TASK_TEMPLATE, + Task.CLASSIFICATION: CLASSIFICATION_TASK_TEMPLATE, + Task.GENERATION: GENERATION_TASK_TEMPLATE, } def __init__( @@ -102,7 +94,7 @@ def get_task_prompt_template(self, task: str, method: str) -> str: The type of task, either "classification" or "generation". method (str): Optimization method to use. - Available methods are: ['hype', 'reflective', 'distill'] + Available methods are: ['hype', 'reflective', 'distill', 'hyper'] Returns: str: The prompt template for the given task. @@ -113,7 +105,7 @@ def get_task_prompt_template(self, task: str, method: str) -> str: ) task = validate_task(task) method = validate_method(method) - return self.TEMPLATE_MAP[(task, method)] + return self.TEMPLATE_MAP[task] def _get_dataset_split( self, @@ -182,7 +174,7 @@ def run( target (Iterable): Target iterable object for autoprompting optimization. method (str): Optimization method to use. - Available methods are: ['hype', 'reflective', 'distill'] + Available methods are: ['hype', 'reflective', 'distill', 'hyper'] Defaults to hype. metric (str): Metric to use for optimization. problem_description (str): a string that contains @@ -297,7 +289,7 @@ def run( prompt=start_prompt, task=task, problem_description=problem_description, - num_samples=generate_num_samples + num_samples=generate_num_samples, ) self.synthetic_dataset = dataset self.synthetic_target = target @@ -329,10 +321,21 @@ def run( logger.debug(f"Additional kwargs: {kwargs}") if method is Method.HYPE: - final_prompt = hype_optimizer( + hype_opt = HyPEOptimizer(model=self._target_model) + final_prompt = hype_opt.optimize( + prompt=start_prompt, + meta_info={"task_description": problem_description}, + ) + elif method is Method.HYPER: + hyper_opt = HyPEROptimizer( model=self._target_model, + evaluator=evaluator, + **kwargs, + ) + final_prompt = hyper_opt.optimize( prompt=start_prompt, - problem_description=problem_description, + dataset_split=dataset_split, + meta_info={"task_description": problem_description}, ) elif method is Method.REFLECTIVE: final_prompt = reflectiveprompt( @@ -360,7 +363,7 @@ def run( ) logger.debug(f"Final prompt:\n{final_prompt}") - template = self.TEMPLATE_MAP[(task, method)] + template = self.TEMPLATE_MAP[task] logger.info(f"Evaluating on given dataset for {task} task...") self.init_metric = evaluator.evaluate( prompt=start_prompt, diff --git a/coolprompt/data_generator/generator.py b/coolprompt/data_generator/generator.py index c0f566d..ddaf673 100644 --- a/coolprompt/data_generator/generator.py +++ b/coolprompt/data_generator/generator.py @@ -1,7 +1,5 @@ -import json from typing import Optional, List, Tuple, Any -import dirtyjson from langchain_core.language_models.base import BaseLanguageModel from langchain_core.language_models.chat_models import BaseChatModel from langchain_core.messages.ai import AIMessage @@ -52,11 +50,11 @@ def _generate( Returns: Any: generated data """ - if hasattr(self.model, 'model'): + if hasattr(self.model, "model"): wrapped_model = self.model.model else: wrapped_model = self.model - + if not isinstance(wrapped_model, BaseChatModel): output = self.model.invoke(request) if isinstance(output, AIMessage): diff --git a/coolprompt/evaluator/evaluator.py b/coolprompt/evaluator/evaluator.py index c6dfc5a..0888146 100644 --- a/coolprompt/evaluator/evaluator.py +++ b/coolprompt/evaluator/evaluator.py @@ -1,7 +1,7 @@ -import random -from langchain_core.language_models.base import BaseLanguageModel -from typing import Optional +from dataclasses import dataclass +from typing import List, Optional +from langchain_core.language_models.base import BaseLanguageModel from langchain_core.messages.ai import AIMessage from coolprompt.evaluator.metrics import BaseMetric from coolprompt.utils.logging_config import logger @@ -12,6 +12,22 @@ ) +@dataclass +class FailedExampleDetailed: + instance: str + assistant_answer: str + model_answer_parsed: Optional[str] = None + metric_value: float | int = 0.0 + ground_truth: str | int = "" + + +@dataclass +class EvalResultDetailed: + aggregate_score: float + score_per_task: List[float | int] = None + failed_examples: List[FailedExampleDetailed] = None + + class Evaluator: """Evaluator class to perform model evaluation using a specified metric. @@ -35,29 +51,17 @@ def evaluate( targets: list[str | int], template: Optional[str] = None, ) -> float: - """ - Evaluate the model on a dataset - by generating answers and computing the metric. - - For each sample in the dataset, - the prompt is concatenated with the sample, - passed to the model to generate an output, - and then all outputs are evaluated - against the targets using the metric. + """Evaluate the model on a dataset. Args: prompt (str): The prompt string to prepend to each dataset sample. dataset (list[str]): List of input samples to evaluate. - targets (list[str|int]): - Corresponding ground truth labels or references. - template (Optional[str]): - Prompt template for defined task type. - If None, uses default template. + targets (list[str|int]): Corresponding ground truth labels. + template (Optional[str]): Prompt template for defined task type. Returns: float: The computed evaluation metric score. """ - if template is None: template = self._get_default_template() @@ -80,28 +84,64 @@ def evaluate( return self.metric.compute(answers, targets, dataset) - def _get_full_prompt( + def evaluate_detailed( self, prompt: str, - sample: str, + dataset: list[str], + targets: list[str | int], template: Optional[str] = None, - ) -> str: - """Inserts parts of the prompt into the task template. + ) -> EvalResultDetailed: + """Evaluate the model and return detailed results per sample.""" + if template is None: + template = self._get_default_template() - Args: - prompt (str): the main instruction for the task - sample (str): the input sample - template (Optional[str]): - Prompt template for defined task type. - If None, uses default template. + logger.info( + f"Evaluating (detailed) prompt for {self.task} task on {len(dataset)} samples" + ) + if self.task == Task.CLASSIFICATION: + self.metric.extract_labels(targets) + + answers = self.model.batch( + [ + self._get_full_prompt(prompt, sample, template) + for sample in dataset + ] + ) + answers = [ + a.content if isinstance(a, AIMessage) else a for a in answers + ] - Raises: - ValueError: if type of task is not supported + parsed_answers = [self.metric.parse_output(a) for a in answers] + aggregate_score, score_per_task = self.metric.compute_detailed( + answers, targets + ) - Returns: - str: the full prompt to be passed to the model - """ + failed_examples = [] + for i, score in enumerate(score_per_task): + if score == 0: + failed_examples.append( + FailedExampleDetailed( + instance=dataset[i], + assistant_answer=answers[i], + model_answer_parsed=parsed_answers[i], + metric_value=score, + ground_truth=targets[i], + ) + ) + + return EvalResultDetailed( + aggregate_score=aggregate_score, + score_per_task=score_per_task, + failed_examples=failed_examples, + ) + def _get_full_prompt( + self, + prompt: str, + sample: str, + template: Optional[str] = None, + ) -> str: + """Inserts parts of the prompt into the task template.""" if template is None: template = self._get_default_template() @@ -116,7 +156,6 @@ def _get_full_prompt( def _get_default_template(self) -> str: """Returns the default template for the task type.""" - match self.task: case Task.CLASSIFICATION: return CLASSIFICATION_TASK_TEMPLATE diff --git a/coolprompt/evaluator/metrics.py b/coolprompt/evaluator/metrics.py index 9c9b89f..bb815f7 100644 --- a/coolprompt/evaluator/metrics.py +++ b/coolprompt/evaluator/metrics.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Optional +from typing import List, Optional, Tuple from deepeval.metrics import GEval from deepeval.test_case import LLMTestCase, LLMTestCaseParams @@ -86,7 +86,7 @@ def _compute_raw( self, outputs: list[str | int], targets: list[str | int], - dataset: Optional[list[str]] = None + dataset: Optional[list[str]] = None, ) -> float: """Compute metric value from preprocessed model answers. @@ -120,7 +120,7 @@ def compute( self, outputs: list[str | int], targets: list[str | int], - dataset: Optional[list[str]] = None + dataset: Optional[list[str]] = None, ) -> float: """Compute metric value from text model outputs @@ -134,9 +134,7 @@ def compute( """ output_labels = list( map( - lambda x: extract_answer( - x, self.ANS_TAGS, self.FORMAT_MISMATCH_LABEL - ), + lambda x: extract_answer(x, self.ANS_TAGS, self.FORMAT_MISMATCH_LABEL), outputs, ) ) @@ -144,9 +142,38 @@ def compute( encoded_output_labels, encoded_targets = self._encode_labels( output_labels, targets ) - return self._compute_raw( - encoded_output_labels, encoded_targets, dataset - ) + return self._compute_raw(encoded_output_labels, encoded_targets, dataset) + + def parse_output(self, output: str) -> str: + """Extract parsed answer from model output. + + Args: + output: Raw model output string. + + Returns: + Extracted answer from tags, or original output if not found. + """ + return extract_answer(output, self.ANS_TAGS, format_mismatch_label=output) + + def compute_detailed( + self, + outputs: list[str | int], + targets: list[str | int], + dataset: Optional[list[str]] = None, + ) -> Tuple[float, List[float | int]]: + """Compute metric value per sample and aggregate. + + Returns: + Tuple of (aggregate_score, score_per_task). + score_per_task[i] - score for i-th sample. + aggregate_score - same as compute(). + """ + score_per_task = [] + for o, t in zip(outputs, targets): + s = self._compute_raw([o], [t], dataset) + score_per_task.append(s) + aggregate = self.compute(outputs, targets, dataset) + return aggregate, score_per_task def __str__(self) -> str: return self._get_name() @@ -219,7 +246,7 @@ class GenerationMetric(BaseMetric): FORMAT_MISMATCH_LABEL = "" - def __init__(self): + def __init__(self, name=None): """Initialize metric""" super().__init__() @@ -316,6 +343,15 @@ def _compute_raw(self, outputs, targets, dataset): f1_list = super()._compute_raw(outputs, targets) return sum(f1_list) / len(f1_list) + def compute_detailed( + self, + outputs: list[str | int], + targets: list[str | int], + dataset: Optional[list[str]] = None, + ) -> Tuple[float, List[float]]: + f1_list = super()._compute_raw(outputs, targets, dataset) + return sum(f1_list) / len(f1_list), f1_list + class LLMAsJudge(GenerationMetric): """LLM-as-a-judge metric for generation tasks.""" @@ -462,6 +498,21 @@ def _compute_raw(self, outputs, targets, dataset): outputs = [extract_number_from_text(item) for item in outputs] return float(mean([o == t for o, t in zip(outputs, targets)])) + def compute_detailed( + self, + outputs: list[str | int], + targets: list[str | int], + dataset: Optional[list[str]] = None, + ) -> Tuple[float, List[int]]: + targets = [extract_number_from_text(item) for item in targets] + outputs = [extract_number_from_text(item) for item in outputs] + score_per_task = [1 if o == t else 0 for o, t in zip(outputs, targets)] + return mean(score_per_task), score_per_task + + def parse_output(self, output: str) -> str: + extracted = extract_answer(output, self.ANS_TAGS, format_mismatch_label=output) + return extract_number_from_text(extracted) + def define_lang(outputs, targets): langs = [detect_language(target) for target in targets] @@ -469,8 +520,7 @@ def define_lang(outputs, targets): CLASSIFICATION_METRIC_NAME_MAPPING = { - metric._get_name(): metric - for metric in ClassificationMetric.__subclasses__() + metric._get_name(): metric for metric in ClassificationMetric.__subclasses__() } GENERATION_METRIC_NAME_MAPPING = { @@ -509,8 +559,9 @@ def validate_and_create_metric( return CLASSIFICATION_METRIC_NAME_MAPPING[metric]() error_msg = ( f"Invalid metric for {task} task: {metric}. " - f"Available metrics: {', '.join( - CLASSIFICATION_METRIC_NAME_MAPPING.keys())}." + f"Available metrics: { + ', '.join(CLASSIFICATION_METRIC_NAME_MAPPING.keys()) + }." ) logger.error(error_msg) raise ValueError(error_msg) @@ -544,8 +595,9 @@ def validate_and_create_metric( return GENERATION_METRIC_NAME_MAPPING[metric]() error_msg = ( f"Invalid metric for {task} task: {metric}. " - f"Available metrics: {', '.join( - GENERATION_METRIC_NAME_MAPPING.keys())}." + f"Available metrics: { + ', '.join(GENERATION_METRIC_NAME_MAPPING.keys()) + }." ) logger.error(error_msg) raise ValueError(error_msg) diff --git a/coolprompt/language_model/llm.py b/coolprompt/language_model/llm.py index 0b1e234..7b9feed 100644 --- a/coolprompt/language_model/llm.py +++ b/coolprompt/language_model/llm.py @@ -2,6 +2,7 @@ from langchain_huggingface import ChatHuggingFace, HuggingFacePipeline from langchain_core.language_models.base import BaseLanguageModel +from langchain_community.callbacks.manager import get_openai_callback from coolprompt.utils.logging_config import logger from coolprompt.utils.default import ( DEFAULT_MODEL_NAME, @@ -39,3 +40,77 @@ def init( model_kwargs={'dtype': 'float16'} ) return ChatHuggingFace(llm=llm) + + +class TrackedLLMWrapper: + """Простая обертка вокруг ChatOpenAI с трекингом""" + + def __init__(self, model, tracker): + self.model = model + self.tracker = tracker + + def invoke(self, input, **kwargs): + with get_openai_callback() as cb: + result = self.model.invoke(input, **kwargs) + self.tracker._update_stats(cb, True) + return result + + def batch(self, inputs, **kwargs): + with get_openai_callback() as cb: + results = self.model.batch(inputs, **kwargs) + self.tracker._update_stats(cb, False, batch_size=len(inputs)) + return results + + def reset_stats(self): + self.tracker.reset_stats() + + def get_stats(self): + return self.tracker.get_stats() + + # Проксируем остальные методы + def __getattr__(self, name): + return getattr(self.model, name) + + +class OpenAITracker: + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._reset_stats() + return cls._instance + + def _reset_stats(self): + self.stats = { + "total_calls": 0, + "total_tokens": 0, + "prompt_tokens": 0, + "completion_tokens": 0, + "total_cost": 0.0, + "invoke_calls": 0, + "batch_calls": 0, + "batch_items": 0, + } + + def _update_stats(self, callback, invoke_flag, **kwargs): + self.stats["total_calls"] += 1 + self.stats["total_tokens"] += callback.total_tokens + self.stats["prompt_tokens"] += callback.prompt_tokens + self.stats["completion_tokens"] += callback.completion_tokens + self.stats["total_cost"] += callback.total_cost + if invoke_flag: + self.stats["invoke_calls"] += 1 + else: + self.stats["batch_calls"] += 1 + self.stats["batch_items"] += kwargs.get("batch_size", 0) + + def wrap_model(self, model): + """Обертывает модель для трекинга""" + return TrackedLLMWrapper(model, self) + + def get_stats(self): + return self.stats.copy() + + def reset_stats(self): + self._reset_stats() diff --git a/coolprompt/optimizer/hype/__init__.py b/coolprompt/optimizer/hype/__init__.py index c0ebaa4..f2aa268 100644 --- a/coolprompt/optimizer/hype/__init__.py +++ b/coolprompt/optimizer/hype/__init__.py @@ -1,5 +1,8 @@ -from coolprompt.optimizer.hype.hype import hype_optimizer +from coolprompt.optimizer.hype.hype import HyPEOptimizer, Optimizer +from coolprompt.optimizer.hype.hyper import HyPEROptimizer __all__ = [ - 'hype_optimizer' + "Optimizer", + "HyPEOptimizer", + "HyPEROptimizer", ] diff --git a/coolprompt/optimizer/hype/feedback_module.py b/coolprompt/optimizer/hype/feedback_module.py new file mode 100644 index 0000000..1b97bc0 --- /dev/null +++ b/coolprompt/optimizer/hype/feedback_module.py @@ -0,0 +1,182 @@ +"""FeedbackModule for generating prompt improvement recommendations.""" + +import random +from typing import Any, List, Optional + +from coolprompt.evaluator.evaluator import FailedExampleDetailed +from coolprompt.utils.parsing import extract_json, get_model_answer_extracted + + +FEEDBACK_PROMPT_TEMPLATE = """You are an expert prompt engineer. + +The prompt was evaluated on benchmark task and failed on some examples. You will be given with a prompt and an example. + +Prompt: + +{prompt} + + +Failed task: + +{instance} + + +Model answer (raw): + +{model_answer} + + +Model answer (parsed): + +{model_answer_parsed} + + +Metric value: {metric_value} + +Сorrect answer: + +{ground_truth} + + +Identify the core reasoning error pattern. + +Give ONE general, universal recommendation to improve the prompt (no task-special details). + +Format: Consice, max 20-25 words, starts with action verb. Output nothing but the actual recommendation. Avoid meta‑comments (e.g., "similar to…", "as before…") – the recommendation must stand alone. + +Example: "Require step-by-step reasoning before classifying." + +Recommendation: +""" + +FILTER_RECOMMENDATIONS_PROMPT = """You have a list of recommendations for prompt improvement: + +{recommendations} + +TASK: +1. Group them into conceptual clusters (similar ideas). +2. For each cluster, **synthesize a single, new recommendation** that captures the essence of all items in that cluster. Do not just copy an existing one. +3. Rank clusters by size (largest first). If some clusters conflict - drop the less ones. +4. Output ONLY a JSON array of the synthesized recommendations, in rank order. + +GOOD EXAMPLES: +Input: ["step-by-step", "break down calc", "don't show work", "format clearly"] +Correct output: ["Require detailed step-by-step reasoning with calculations", "Specify the desired output format explicitly"] +Why good: +- Captured main ideas of reasoning cluster into 1 strong rec +- Didn't loose cluster from "format clearly" +- Resolved conflict: "don't show work" is less frequent recommendation, so its cluster was dropped + +BAD EXAMPLES: +Input: ["Focus on clarifying the output format requirements", + "Add examples of expected responses to the prompt", + "Make sure to specify exact sentiment labels", + "Include examples to avoid confusion with similar labels", + "Focus on tone analysis in the text", + "Clarify what constitutes positive vs negative", + "Add examples of positive responses", + "Similar to previous - add more examples"] +Wrong output: ["Similar to previous - add more examples", "Add examples of positive responses", "Make sure to specify exact sentiment labels", "Focus on tone analysis in the text"] +Why bad: +- "Similar to previous" = meta-trash +- No synthesis of 6+ example recs into 1 strong rec, uses only existing recommendations +- Two different recommendations with a similiar intent: adding examples (duplicates) +""" + + +class FeedbackModule: + """Generates recommendations for improving prompts based on failed examples.""" + + def __init__(self, model: Any) -> None: + self.model = model + + def generate_recommendation( + self, + prompt: str, + instance: str, + model_answer: str, + model_answer_parsed: Optional[str] = None, + metric_value: float | int = 0.0, + ground_truth: str | int = "", + ) -> str: + """Generate a single recommendation for a failed example. + + Args: + prompt: The original prompt that was used. + instance: The task instance (input/question). + model_answer: The model's answer (incorrect, raw). + model_answer_parsed: The model's parsed answer (for metric calculation). + metric_value: The metric value for this answer. + ground_truth: The correct answer. + + Returns: + A recommendation string for improving the prompt. + """ + formatted_prompt = FEEDBACK_PROMPT_TEMPLATE.format( + prompt=prompt, + instance=instance, + model_answer=model_answer, + model_answer_parsed=model_answer_parsed or "", + metric_value=metric_value, + ground_truth=ground_truth, + ) + result = get_model_answer_extracted(self.model, formatted_prompt) + return self._process_output(result) + + def generate_recommendations( + self, + prompt: str, + failed_examples: List[FailedExampleDetailed], + ) -> List[str]: + """Generate recommendations for all failed examples. + + Args: + prompt: The original prompt that was used. + failed_examples: List of failed examples. + + Returns: + List of recommendation strings. + """ + return [ + self.generate_recommendation( + prompt=prompt, + instance=fe.instance, + model_answer=fe.assistant_answer, + model_answer_parsed=fe.model_answer_parsed, + metric_value=fe.metric_value, + ground_truth=fe.ground_truth, + ) + for fe in failed_examples + ] + + def filter_recommendations(self, recommendations: List[str]) -> List[str]: + """Filter and deduplicate recommendations using LLM. + + Args: + recommendations: List of recommendation strings. + + Returns: + Deduplicated and filtered list of recommendations. + """ + if not recommendations: + return [] + + formatted_recs = "\n".join( + f"{i + 1}. {rec}" for i, rec in enumerate(recommendations) + ) + prompt = FILTER_RECOMMENDATIONS_PROMPT.format( + recommendations=formatted_recs + ) + result = get_model_answer_extracted(self.model, prompt) + try: + data = extract_json(result) + if data and isinstance(data, list): + return [str(x) for x in data] + except Exception: + pass + + return random.sample(recommendations, min(3, len(recommendations))) + + def _process_output(self, output: Any) -> str: + """Process model output to extract recommendation.""" + return output if isinstance(output, str) else str(output) diff --git a/coolprompt/optimizer/hype/hype.py b/coolprompt/optimizer/hype/hype.py index b96f2d5..42c26b8 100644 --- a/coolprompt/optimizer/hype/hype.py +++ b/coolprompt/optimizer/hype/hype.py @@ -1,47 +1,115 @@ -from langchain_core.language_models.base import BaseLanguageModel +from abc import ABC, abstractmethod +from typing import Any, List, Optional, Union -from coolprompt.utils.logging_config import logger -from coolprompt.utils.prompt_templates.hype_templates import ( - HYPE_PROMPT_TEMPLATE, -) -from coolprompt.utils.parsing import ( - extract_answer, - get_model_answer_extracted, - safe_template, +from coolprompt.utils.parsing import extract_answer, get_model_answer_extracted +from coolprompt.utils.prompt_templates.hyper_templates import ( + HypeMetaPromptBuilder, + HypeMetaPromptConfig, + META_INFO_SECTION, + META_PROMPT_SECTIONS, ) -INSTRUCTIVE_PROMPT_TAGS = ("[PROMPT_START]", "[PROMPT_END]") +def _build_full_meta_prompt_template(builder: HypeMetaPromptBuilder) -> str: + body = builder.build_meta_prompt() + return ( + body + + "\n\n{META_INFO_BLOCK}" + + "User query:\n\n{QUERY}\n\n" + ) -def hype_optimizer( - model: BaseLanguageModel, prompt: str, problem_description: str -) -> str: - """Rewrites prompt by injecting it - into predefined template and querying LLM. - Args: - model (BaseLanguageModel): Any LangChain BaseLanguageModel instance. - prompt (str): Input prompt to optimize. - problem_description (str): Brief description of the task, explaining - its domain. - Returns: - str: LLM-generated rewritten prompt. - """ +class Optimizer(ABC): + def __init__(self, model): + self.model = model - logger.info("Running HyPE optimization...") - logger.debug(f"Start prompt:\n{prompt}") + @abstractmethod + def optimize(self): + pass - query = safe_template( - HYPE_PROMPT_TEMPLATE, - PROBLEM_DESCRIPTION=problem_description, - QUERY=prompt, - ) - answer = get_model_answer_extracted(model, query) +class HyPEOptimizer(Optimizer): + def __init__( + self, model, config: Optional[HypeMetaPromptConfig] = None + ) -> None: + super().__init__(model) + self.builder = HypeMetaPromptBuilder(config) + self.meta_prompt = _build_full_meta_prompt_template(self.builder) - logger.info("HyPE optimization completed") - logger.debug(f"Raw HyPE output:\n{answer}") + def get_section(self, name: str) -> Any: + """Returns the current value of the section (for recommendations — List[str]).""" + if name not in META_PROMPT_SECTIONS: + raise ValueError( + f"Unknown section: {name}. Expected: {META_PROMPT_SECTIONS}" + ) + if name == "recommendations": + return list(self.builder.config.recommendations) + if name == "constraints": + return list(self.builder.config.constraints) + return self.builder.get_cached_section(name) - return extract_answer( - answer, INSTRUCTIVE_PROMPT_TAGS, format_mismatch_label=answer - ) + def update_section( + self, + name: str, + value: Union[str, List[str]], + ) -> None: + """Updates the section and rebuilds the meta-prompt.""" + if name not in META_PROMPT_SECTIONS: + raise ValueError( + f"Unknown section: {name}. Expected: {META_PROMPT_SECTIONS}" + ) + if name == "recommendations": + self.builder.config.recommendations = list(value) + elif name == "constraints": + self.builder.config.constraints = list(value) + elif name == "output_format" and isinstance(value, str): + self.builder.config.output_format_section = value + else: + raise ValueError( + f"update_section for {name}: unsupported value type" + ) + self.builder.rebuild_all_sections() + self._rebuild_meta_prompt() + + def _rebuild_meta_prompt(self) -> None: + self.meta_prompt = _build_full_meta_prompt_template(self.builder) + + def set_meta_prompt(self, meta_prompt: str) -> None: + self.meta_prompt = meta_prompt + + def optimize( + self, + prompt: str, + meta_info: Optional[dict[str, Any]] = None, + n_prompts: int = 1, + ) -> Union[str, List[str]]: + query = self._format_meta_prompt(prompt, **(meta_info or {})) + raw_result = get_model_answer_extracted(self.model, query, n=n_prompts) + if n_prompts == 1: + return self._process_model_output(raw_result) + return [self._process_model_output(r) for r in raw_result] + + def _format_meta_prompt(self, prompt: str, **kwargs) -> str: + if kwargs: + meta_info_content = "\n".join( + [f"{k}: {v}" for k, v in kwargs.items()] + ) + meta_info_block = META_INFO_SECTION.format( + meta_info_content=meta_info_content + ) + else: + meta_info_block = "" + + return self.meta_prompt.format( + QUERY=prompt, META_INFO_BLOCK=meta_info_block + ) + + RESULT_PROMPT_TAGS = ("", "") + + def _process_model_output(self, output: Any) -> str: + result = extract_answer( + output, + self.RESULT_PROMPT_TAGS, + format_mismatch_label=output, + ) + return result if isinstance(result, str) else str(result) diff --git a/coolprompt/optimizer/hype/hyper.py b/coolprompt/optimizer/hype/hyper.py new file mode 100644 index 0000000..dae910f --- /dev/null +++ b/coolprompt/optimizer/hype/hyper.py @@ -0,0 +1,220 @@ +"""HyPEROptimizer: HyPE with iterative refinement via recommendations.""" + +import random +from typing import Any, List, Optional, Sequence, Tuple + +from tqdm import tqdm + +from coolprompt.optimizer.hype.hype import HyPEOptimizer, Optimizer +from coolprompt.optimizer.hype.feedback_module import FeedbackModule +from coolprompt.utils.parsing import get_model_answer_extracted +from coolprompt.evaluator.evaluator import ( + Evaluator, + EvalResultDetailed, +) + + +def sample_mini_batch( + dataset: Sequence[str], + targets: Sequence[str | int], + size: int, + seed: Optional[int] = None, +) -> Tuple[List[str], List[str | int]]: + """Sample a mini-batch from the dataset. + + Returns: + (samples, targets) - lists of length size (or less if dataset is smaller). + """ + import random + + rng = random.Random(seed) + n = min(size, len(dataset)) + indices = rng.sample(range(len(dataset)), n) + return ( + [dataset[i] for i in indices], + [targets[i] for i in indices], + ) + + +def compute_pareto_front( + candidates: List[str], + results: List[EvalResultDetailed], +) -> List[Tuple[str, EvalResultDetailed]]: + """Compute Pareto front from candidates based on score_per_task. + + A candidate dominates another if its score_per_task >= other.score_per_task + for all tasks and > for at least one. + + Returns: + List of (candidate, result) that belong to the Pareto front. + """ + n = len(candidates) + is_pareto = [True] * n + + for i in range(n): + if not is_pareto[i]: + continue + for j in range(n): + if i == j or not is_pareto[j]: + continue + # Check if i dominates j + i_scores = results[i].score_per_task + j_scores = results[j].score_per_task + if not i_scores or not j_scores: + continue + if len(i_scores) != len(j_scores): + continue + i_dominates_j = all( + i_s >= j_s for i_s, j_s in zip(i_scores, j_scores) + ) and any(i_s > j_s for i_s, j_s in zip(i_scores, j_scores)) + if i_dominates_j: + is_pareto[j] = False + + return [(candidates[i], results[i]) for i in range(n) if is_pareto[i]] + + +class HyPEROptimizer(Optimizer): + """HyPE with iterative refinement via evaluation-based recommendations.""" + + def __init__( + self, + model: Any, + evaluator: Evaluator, + *, + n_iterations: int = 5, + patience: int = None, + n_candidates: int = 3, + top_n_candidates: int = 3, + k_samples: int = 3, + mini_batch_size: int = 16, + ) -> None: + super().__init__(model) + self.hype_module = HyPEOptimizer(model) + self.evaluator = evaluator + self.feedback_module = FeedbackModule(model) + self.n_iterations = n_iterations + self.patience = patience + self.n_candidates = n_candidates + self.top_n_candidates = top_n_candidates + self.k_samples = k_samples + self.mini_batch_size = mini_batch_size + + def _get_variants_from_best( + self, best_prompt: str, n_candidates: int + ) -> List[str]: + paraphrase_prompt = f"""Generate an alternative version of the following prompt. The new version must: +- Use different words, sentence structure, and tone (e.g., more formal, casual, or creative). +- Preserve the original meaning, key details, and language. +- Vary in length: slightly shorter or longer (up to 10%). +- Feel natural and coherent. +- Output only the text of the alternative prompt, without any additional commentary or formatting. + +Original prompt: +{best_prompt} + +Alternative prompt:""" + raw_result = get_model_answer_extracted( + self.model, paraphrase_prompt, n=n_candidates, temperature=0.9 + ) + return [best_prompt] + [ + self._process_model_output(r) for r in raw_result + ] + + def _process_model_output(self, output: Any) -> str: + return output if isinstance(output, str) else str(output) + + def optimize( + self, + prompt: str, + dataset_split: Tuple[ + Sequence[str], Sequence[str], Sequence[str], Sequence[str] + ], + meta_info: Optional[dict[str, Any]] = None, + ) -> str: + """Generate candidates, evaluate, update recommendations, repeat.""" + train_samples, val_samples, train_targets, val_targets = dataset_split + best_prompt = prompt + best_score = self.evaluator.evaluate( + prompt, list(val_samples), list(val_targets) + ) + patience_counter = 0 + + for iteration in tqdm( + range(self.n_iterations), desc="HyPER iterations" + ): + # 1. Generate candidates from best_prompt + candidates = self._get_variants_from_best( + best_prompt, n_candidates=self.n_candidates + ) + + if not candidates: + return best_prompt + + # 2. Mini-batch from train + samples, sample_targets = sample_mini_batch( + train_samples, train_targets, self.mini_batch_size + ) + if not samples: + continue + + # 3. Evaluate candidates on mini-batch via evaluate_detailed + results: List[EvalResultDetailed] = [ + self.evaluator.evaluate_detailed(cand, samples, sample_targets) + for cand in candidates + ] + + # 4. Pareto front + pareto_front = compute_pareto_front(candidates, results) + + # Fallback: if all candidates are in front, sort by aggregate_score + if len(pareto_front) == len( + candidates + ) and self.top_n_candidates < len(candidates): + scored = sorted( + zip(candidates, results), + key=lambda x: x[1].aggregate_score, + reverse=True, + ) + pareto_front = scored[: self.top_n_candidates] + + if not pareto_front: + continue + + # 5. Collect recommendations for all candidates from Pareto front + all_recs: List[str] = [] + for cand_prompt, res in pareto_front: + failed_sample = random.sample( + res.failed_examples, + min(self.k_samples, len(res.failed_examples)), + ) + recs = self.feedback_module.generate_recommendations( + cand_prompt, failed_sample + ) + all_recs.extend(recs) + + # Filter and update recommendations + all_recs = self.feedback_module.filter_recommendations(all_recs) + + self.hype_module.update_section("recommendations", all_recs) + + # 6. For each candidate from Pareto front + for cand_prompt, res in pareto_front: + optimized_prompt = self.hype_module.optimize( + cand_prompt, meta_info=meta_info + ) + + val_score = self.evaluator.evaluate( + optimized_prompt, list(val_samples), list(val_targets) + ) + + if val_score > best_score: + best_score = val_score + best_prompt = optimized_prompt + patience_counter = 0 + else: + patience_counter += 1 + + if self.patience and patience_counter >= self.patience: + break + + return best_prompt diff --git a/coolprompt/optimizer/hype/hyper_refine.py b/coolprompt/optimizer/hype/hyper_refine.py new file mode 100644 index 0000000..9238f6d --- /dev/null +++ b/coolprompt/optimizer/hype/hyper_refine.py @@ -0,0 +1,183 @@ +"""HyPEROptimizer: HyPE with iterative refinement via feedback.""" + +from dataclasses import dataclass, field +from typing import Any, List, Optional, Sequence, Tuple + +from coolprompt.optimizer.hype.hyper import HyPEOptimizer, Optimizer + + +# --- Structures --- + + +@dataclass +class FailedExample: + """Один неудачный пример для формирования рекомендаций. + + Отдаётся Evaluator при детальной оценке. + """ + + instance: str # инстанс из датасета + assistant_answer: str + metric_value: float # значение метрики для этого примера + ground_truth: str | int # целевой ответ + + +@dataclass +class EvalResult: + """Результат оценки кандидата на мини-батче.""" + + aggregate_score: float + failed_examples: List[FailedExample] = field(default_factory=list) + + +# --- Stubs --- + + +def sample_mini_batch( + dataset: Sequence[str], + targets: Sequence[str | int], + size: int, + seed: Optional[int] = None, +) -> Tuple[List[str], List[str | int]]: + """Сэмплирует мини-батч из датасета. + + Returns: + (samples, targets) — списки длины size (или меньше, если датасет меньше). + """ + import random + + rng = random.Random(seed) + n = min(size, len(dataset)) + indices = rng.sample(range(len(dataset)), n) + return ( + [dataset[i] for i in indices], + [targets[i] for i in indices], + ) + + +def _evaluate_candidate_stub( + prompt: str, + dataset: List[str], + targets: List[str | int], +) -> EvalResult: + """Заглушка Evaluator: оценивает кандидата на мини-батче. + + TODO: подключить coolprompt.evaluator.Evaluator. + """ + return EvalResult( + aggregate_score=0.0, + failed_examples=[ + FailedExample( + instance=dataset[i], + assistant_answer="", + metric_value=0.0, + ground_truth=targets[i], + ) + for i in range(min(3, len(dataset))) + ], + ) + + +def _feedback_module_stub( + failed_examples: List[FailedExample], + k_samples: int, +) -> List[str]: + """Заглушка FeedbackModule: по неудачным примерам выдаёт рекомендации. + + TODO: реализовать LLM-based feedback. + """ + return [f"Consider improving based on example: {fe.instance[:50]}..." for fe in failed_examples[:k_samples]] + + +def filter_recommendations(recommendations: List[str]) -> List[str]: + """Фильтрует рекомендации (заглушка). + + TODO: убрать дубликаты, нерелевантные и т.д. + """ + return list(recommendations) + + +# --- HyPEROptimizer --- + + +class HyPEROptimizer(Optimizer): + """HyPE с итеративным уточнением через рекомендации на основе оценки.""" + + def __init__( + self, + model: Any, + *, + n_candidates: int = 3, + top_n_candidates: int = 2, + k_samples: int = 3, + mini_batch_size: int = 16, + n_iterations: int = 2, + ) -> None: + super().__init__(model) + self.hype = HyPEOptimizer(model) + self.n_candidates = n_candidates + self.top_n_candidates = top_n_candidates + self.k_samples = k_samples + self.mini_batch_size = mini_batch_size + self.n_iterations = n_iterations + + def optimize( + self, + prompt: str, + dataset: Sequence[str], + targets: Sequence[str | int], + meta_info: Optional[dict[str, Any]] = None, + ) -> str: + """Генерирует кандидатов, оценивает, обновляет recommendations, повторяет.""" + hype = self.hype + best_candidate = prompt + + for iteration in range(self.n_iterations): + # 1. Генерация n_candidates + candidates: List[str] = [] + for _ in range(self.n_candidates): + candidate = hype.optimize(prompt, meta_info) + candidates.append(candidate) + + if not candidates: + return best_candidate + + # 2. Мини-батч + samples, sample_targets = sample_mini_batch( + dataset, targets, self.mini_batch_size + ) + if not samples: + best_candidate = candidates[0] + if iteration == self.n_iterations - 1: + return best_candidate + continue + + # 3. Оценка (заглушка Evaluator) + scored: List[Tuple[float, str, EvalResult]] = [] + for cand in candidates: + res = _evaluate_candidate_stub(cand, samples, sample_targets) + scored.append((res.aggregate_score, cand, res)) + + # 4. Top-k кандидатов + scored.sort(key=lambda x: x[0], reverse=True) + best_candidate = scored[0][1] + + if iteration == self.n_iterations - 1: + return best_candidate + + top = scored[: self.top_n_candidates] + + # 5. Собираем k_samples FailedExample для top + all_failed: List[FailedExample] = [] + for _, _, res in top: + for fe in res.failed_examples[: self.k_samples]: + all_failed.append(fe) + + # 6. FeedbackModule → рекомендации + recs = _feedback_module_stub(all_failed, self.k_samples) + recs = filter_recommendations(recs) + + # 7. Обновляем recommendations в мета-промпте hype + hype.update_section("recommendations", recs) + + return best_candidate diff --git a/coolprompt/prompt_assistant/test.py b/coolprompt/prompt_assistant/test.py new file mode 100644 index 0000000..8395d49 --- /dev/null +++ b/coolprompt/prompt_assistant/test.py @@ -0,0 +1,23 @@ +from pathlib import Path +import sys + +from langchain_openai import ChatOpenAI + +path_proj = str(Path(__file__).resolve().parent.parent.parent) +print(path_proj) +sys.path.append(path_proj) +from coolprompt.assistant import PromptTuner + +llm = ChatOpenAI( + model="gpt-3.5-turbo", + openai_api_key="", + temperature=0.7, + max_tokens=4000, + timeout=60, + max_retries=2, + # rate_limiter=rate_limiter +) +start_prompt = "а как мне стать лучшей версией себя" +final_prompt = PromptTuner(llm).run(start_prompt) +# assistant = PromptAssistant(llm) +# print(assistant.get_feedback(start_prompt, final_prompt)) diff --git a/coolprompt/task_detector/detector.py b/coolprompt/task_detector/detector.py index eefa330..94565e1 100644 --- a/coolprompt/task_detector/detector.py +++ b/coolprompt/task_detector/detector.py @@ -6,10 +6,10 @@ from pydantic import BaseModel from coolprompt.task_detector.pydantic_formatters import ( - TaskDetectionStructuredOutputSchema + TaskDetectionStructuredOutputSchema, ) from coolprompt.utils.prompt_templates.task_detector_templates import ( - TASK_DETECTOR_TEMPLATE + TASK_DETECTOR_TEMPLATE, ) from coolprompt.utils.logging_config import logger from coolprompt.utils.parsing import extract_json @@ -42,11 +42,11 @@ def _generate( Returns: Any: generated data """ - if hasattr(self.model, 'model'): + if hasattr(self.model, "model"): wrapped_model = self.model.model else: wrapped_model = self.model - + if not isinstance(wrapped_model, BaseChatModel): output = self.model.invoke(request) if isinstance(output, AIMessage): @@ -81,18 +81,12 @@ def generate( schema = TaskDetectionStructuredOutputSchema request = TASK_DETECTOR_TEMPLATE - request = request.format( - query=prompt - ) + request = request.format(query=prompt) - logger.info( - "Detecting the task by query" - ) + logger.info("Detecting the task by query") task = self._generate(request, schema, "task") - logger.info( - f"Task defined as {task}" - ) + logger.info(f"Task defined as {task}") return task diff --git a/coolprompt/utils/arithmetics.py b/coolprompt/utils/arithmetics.py index e8855ee..afd9ee2 100644 --- a/coolprompt/utils/arithmetics.py +++ b/coolprompt/utils/arithmetics.py @@ -14,4 +14,7 @@ def mean(lst): def extract_number_from_text(text): - return re.findall(r'-?\d+(?:\.\d+)?', text)[-1] + try: + return re.findall(r"-?\d+(?:\.\d+)?", text)[-1] + except: + return "" diff --git a/coolprompt/utils/enums.py b/coolprompt/utils/enums.py index 1492647..1ff74f1 100644 --- a/coolprompt/utils/enums.py +++ b/coolprompt/utils/enums.py @@ -1,23 +1,24 @@ -from enum import Enum - - -class Method(Enum): - HYPE = "hype" - REFLECTIVE = "reflective" - DISTILL = "distill" - - def is_data_driven(self) -> bool: - if self is Method.HYPE: - return False - return True - - def __str__(self): - return self.value - - -class Task(Enum): - CLASSIFICATION = "classification" - GENERATION = "generation" - - def __str__(self): - return self.value +from enum import Enum + + +class Method(Enum): + HYPE = "hype" + HYPER = "hyper" + REFLECTIVE = "reflective" + DISTILL = "distill" + + def is_data_driven(self) -> bool: + if self is Method.HYPE: + return False + return True + + def __str__(self): + return self.value + + +class Task(Enum): + CLASSIFICATION = "classification" + GENERATION = "generation" + + def __str__(self): + return self.value diff --git a/coolprompt/utils/parsing.py b/coolprompt/utils/parsing.py index ebec72e..515c3ee 100644 --- a/coolprompt/utils/parsing.py +++ b/coolprompt/utils/parsing.py @@ -1,7 +1,7 @@ from dirtyjson import DirtyJSONLoader from typing import Tuple + from langchain_core.language_models.base import BaseLanguageModel -from langchain_core.messages.ai import AIMessage def extract_answer( @@ -55,13 +55,13 @@ def safe_template(template: str, **kwargs) -> str: return template.format(**escaped) -def extract_json(text: str) -> dict | None: - """Extracts the first valid JSON with one text value from the `text`. +def extract_json(text: str) -> dict | list | None: + """Extracts the first valid JSON (object or array) from the text. Args: - text (str): text with JSON-lke substrings. + text (str): text with JSON-like substrings. Returns: - result (dict | None): dict from JSON or None + result (dict | list | None): dict or list from JSON or None (if no valid JSON substrings found). """ @@ -72,13 +72,30 @@ def extract_json(text: str) -> dict | None: pos = 0 while pos < len(text): + # Find both { and [ start_pos = text.find("{", pos) - if start_pos == -1: + bracket_pos = text.find("[", pos) + + # Get earliest position + if start_pos == -1 and bracket_pos == -1: break + elif start_pos == -1: + search_pos = bracket_pos + elif bracket_pos == -1: + search_pos = start_pos + else: + search_pos = min(start_pos, bracket_pos) + try: - return dict(loader.decode(start_index=start_pos)) - except: - pos = start_pos + 1 + result = loader.decode(start_index=search_pos) + if isinstance(result, dict): + return dict(result) + elif isinstance(result, list): + return list(result) + except Exception: + pass + + pos = search_pos + 1 return None @@ -118,21 +135,46 @@ def parse_assistant_response(answer: str) -> str: return answer.strip() -def get_model_answer_extracted(llm: BaseLanguageModel, prompt: str) -> str: - """Gets `llm`'s response for the `prompt` and extracts the answer. - - Args: - llm (BaseLanguageModel): LangChain language model. - prompt (str): prompt for the model. - Returns: - str: extracted answer or empty string if there is no final answer. - """ +from typing import Tuple - answer = llm.invoke(prompt) - if isinstance(answer, AIMessage): - answer = answer.content +def get_model_answer_extracted( + llm: BaseLanguageModel, + prompt: str, + n: int = 1, + temperature=None, +): + if temperature is not None: + llm = llm.bind(temperature=temperature) - answer = parse_assistant_response(answer) + if n == 1: + resp = llm.invoke(prompt) + text = resp.content if hasattr(resp, "content") else str(resp) + return parse_assistant_response(text) - return answer + if hasattr(llm, "generate"): + try: + llm_n = llm.bind(n=n) + result = llm_n.generate([prompt]) + gens = result.generations[0] + + outputs = [] + for g in gens: + text = getattr(g, "text", str(g)) + outputs.append(parse_assistant_response(text)) + + if len(outputs) >= n: + return outputs[:n] + except Exception: + pass + + duplicated = [prompt] * n + responses = llm.batch(duplicated) + + outputs = [] + for r in responses: + text = r.content if hasattr(r, "content") else str(r) + outputs.append(parse_assistant_response(text)) + outputs = list(dict.fromkeys(outputs)) # hard deduplication + + return outputs diff --git a/coolprompt/utils/prompt_templates/hype_templates.py b/coolprompt/utils/prompt_templates/hype_templates.py deleted file mode 100644 index fbc09cc..0000000 --- a/coolprompt/utils/prompt_templates/hype_templates.py +++ /dev/null @@ -1,51 +0,0 @@ -HYPE_PROMPT_TEMPLATE = ( - "You are an expert prompt engineer. Your only task is to " - "generate a hypothetical instructive prompt that would help " - "a large language model effectively answer the following query. " - "The prompt must solve the same underlying task as the original query while being more effective.\n" - "### HARD CONSTRAINTS ###\n" - "1. LANGUAGE:\n" - " - Output MUST be in the EXACT SAME LANGUAGE as the query.\n" - "2. CONTENT:\n" - " - Output ONLY the hypothetical instructive prompt - do NOT answer the original query directly.\n" - " - The hypothetical prompt must solve the same task as the original query provided by user.\n" - " - If the original query contains any code snippets, you must include it in final prompt.\n" - "3. TECHNICAL PRESERVATION:\n" - " - Code blocks must be preserved with original syntax and formatting.\n" - " - Variables, placeholders ({{var}}), and technical terms kept unchanged.\n" - " - Markdown and special formatting replicated precisely.\n" - "### YOUR OUTPUT FORMAT ###\n" - "[PROMPT_START][PROMPT_END]\n" - "### INPUT ###\n" - "User's query: {QUERY}\n" - "Problem description: {PROBLEM_DESCRIPTION}\n" - "### OUTPUT ###\n" - "Hypothetical Instructive Prompt: " -) - -CLASSIFICATION_TASK_TEMPLATE_HYPE = """{PROMPT} - -Answer using exactly one label from [{LABELS}]. -Generate the final answer bracketed with and . -Examples: -1. Labels are [(A), (B), (C)] and you chose the first option - Output will be: (A) -2. Labels are [A, B, C] and you chose the first option - Output will be: A - -Input: -{INPUT} - -Response: -""" - -GENERATION_TASK_TEMPLATE_HYPE = """{PROMPT} - -Provide a direct answer without additional explanations or commentary. -Generate the final answer bracketed with and . - -INPUT: -{INPUT} - -RESPONSE: -""" diff --git a/coolprompt/utils/prompt_templates/hyper_templates.py b/coolprompt/utils/prompt_templates/hyper_templates.py new file mode 100644 index 0000000..b367d2a --- /dev/null +++ b/coolprompt/utils/prompt_templates/hyper_templates.py @@ -0,0 +1,261 @@ +from dataclasses import dataclass, field +from typing import List, Optional + + +TARGET_PROMPT_FORMS = ["hypothetical ", "instructional "] + + +SIMPLE_HYPOTHETICAL_PROMPT = "Write a {target_prompt_form}prompt that will solve the user query effectively." + +META_INFO_SECTION = "Task-related meta-information:\n\n{meta_info_content}\n\n" + +META_PROMPT_SECTIONS = ( + "role", + "prompt_structure", + "recommendations", + "constraints", + "output_format", +) + + +@dataclass +class PromptSectionSpec: + name: str + description: str + + +@dataclass +class HypeMetaPromptConfig: + target_prompt_form: str = "hypothetical instructional " + require_markdown_prompt: bool = True + include_role: bool = True + section_names: List[str] = field( + default_factory=lambda: [ + "Role", + "Task context", + "Instructions", + "Output requirements", + ] + ) + section_specs: List[PromptSectionSpec] = field( + default_factory=lambda: [ + PromptSectionSpec( + name="Role", + description=( + "Briefly define the assistant's role and expertise " + "relevant to the user query." + ), + ), + PromptSectionSpec( + name="Task context", + description=( + "Provide the full context of the user's task: restate the query, " + "include all provided meta-information, domain details, constraints, " + "and any other information necessary to produce a correct solution. " + "Do not evaluate or condense — pass through everything relevant." + ), + ), + PromptSectionSpec( + name="Instructions", + description=( + "Main part - instructions the assistant must follow " + "to solve the user's query while respecting constraints." + ), + ), + PromptSectionSpec( + name="Output requirements", + description=( + "Clearly specify the desired tone " + "and the required level of detail for the assistant's answer. " + "If the user explicitly requests a particular output format or provides " + "an example response, restate that format and include the example verbatim, " + "without inventing any additional formatting or examples. Do not introduce any output format or examples that the user did not mention." + ), + ), + ] + ) + constraints: List[str] = field( + default_factory=lambda: [ + "Preserve the language of the user's query.", + "Preserve all code snippets, inline code, technical terms and special formatting.", + "Do not remove or alter any explicit formatting instructions from the user.", + "Do not change numerical values, units, or identifiers.", + ] + ) + recommendations: List[str] = field(default_factory=list) + output_format_section: Optional[str] = None + _cached_sections: dict = field(default_factory=dict, repr=False) + + +class HypeMetaPromptBuilder: + ROLE_LINE = "You are an expert prompt engineer.\n" + TASK_SECTION_TEMPLATE = ( + "Your only task is to write a {target_prompt_form}prompt that will " + "solve the user query as effectively as possible.\n" + "Do not answer the user query directly; only produce the new prompt.\n\n" + ) + + PROMPT_STRUCTURE_SECTION_TEMPLATE = ( + "### STRUCTURE OF THE PROMPT YOU MUST PRODUCE\n" + "The prompt you write MUST be structured into the following sections, " + "in this exact order, and each section must follow its guidelines:\n" + "{sections_with_guidelines}\n\n" + ) + + CONSTRAINTS_SECTION_TEMPLATE = ( + "### HARD CONSTRAINTS\n{constraints_list}\n\n" + ) + + RECOMMENDATIONS_SECTION_TEMPLATE = ( + "### RECOMMENDATIONS\n" + "Use these recommendations for writing the new prompt, " + "based on analysis of previous generations:\n" + "{recommendations_list}\n\n" + ) + + BASE_OUTPUT_FORMAT_SECTION = ( + "### YOUR RESPONSE FORMAT\n" + "Return ONLY the resulting prompt, wrapped in the following XML tags:\n" + "\n" + " ...your resulting prompt here...\n" + "\n" + "Do not include any explanations or additional text outside this XML element.\n\n" + ) + + MARKDOWN_OUTPUT_REQUIREMENTS = ( + "#### Markdown formatting for the resulting prompt\n" + "- Write the entire prompt inside using valid Markdown.\n" + "- Use headings (e.g., `#`, `##`) for major sections of the prompt.\n" + "- Use bulleted lists (e.g., `-` or `*`) for enumerations and checklists.\n" + "- Preserve any code or pseudo-code using fenced code blocks (``` ... ```).\n" + "- Do not introduce any additional formatting beyond what is necessary to make " + "the prompt clear and well-structured." + ) + + HYPE_META_PROMPT_TEMPLATE = ( + "{role_section}" + "{prompt_structure_section}" + "{recommendations_section}" + "{constraints_section}" + "{output_format_section}" + ) + + def __init__(self, config: HypeMetaPromptConfig | None = None) -> None: + self.config = config or HypeMetaPromptConfig() + self._cache_all_sections() + + def _cache_all_sections(self) -> None: + self.config._cached_sections = { + "role": self.build_role_section(), + "prompt_structure": self.build_prompt_structure_section(), + "output_format": self.build_output_format_section(), + } + + def get_cached_section(self, name: str) -> Optional[str]: + return self.config._cached_sections.get(name) + + # ----- секция роли ----- + def build_role_section(self, include_role: bool | None = None) -> str: + include_role = ( + include_role + if include_role is not None + else self.config.include_role + ) + form = self.config.target_prompt_form or "" + task_part = self.TASK_SECTION_TEMPLATE.format(target_prompt_form=form) + if include_role: + return self.ROLE_LINE + task_part + return task_part + + # ----- секция формата (список имён секций) ----- + def build_prompt_structure_section( + self, + specs: list[PromptSectionSpec] | None = None, + ) -> str: + specs = specs or self.config.section_specs + lines = [f"- [{spec.name}] {spec.description}" for spec in specs] + return self.PROMPT_STRUCTURE_SECTION_TEMPLATE.format( + sections_with_guidelines="\n".join(lines) + ) + + # ----- секция рекомендаций (на основе анализа предыдущих генераций) ----- + def build_recommendations_section( + self, + recommendations: List[str] | None = None, + ) -> str: + recs = ( + recommendations + if recommendations is not None + else self.config.recommendations + ) + if not recs: + return "" + lines = "\n".join(f"- {r}" for r in recs) + return self.RECOMMENDATIONS_SECTION_TEMPLATE.format( + recommendations_list=lines + ) + + # ----- секция жёстких ограничений ----- + def build_constraints_section( + self, + constraints: List[str] | None = None, + ) -> str: + constraints = constraints or self.config.constraints + if not constraints: + return "" + lines = "\n".join(f"- {c}" for c in constraints) + return self.CONSTRAINTS_SECTION_TEMPLATE.format(constraints_list=lines) + + def build_output_format_section(self) -> str: + # если в конфиге уже передан кастомный текст — используем его как базу + section = ( + self.config.output_format_section + or self.BASE_OUTPUT_FORMAT_SECTION + ) + if self.config.require_markdown_prompt: + section = section + self.MARKDOWN_OUTPUT_REQUIREMENTS + return section + + # ----- сборка всего мета‑промпта ----- + def build_meta_prompt( + self, + *, + target_prompt_form: str | None = None, + section_specs: List[PromptSectionSpec] | None = None, + recommendations: List[str] | None = None, + constraints: List[str] | None = None, + output_format_section: str | None = None, + include_role: bool | None = None, + ) -> str: + # локальный override конфигов + if target_prompt_form is not None: + self.config.target_prompt_form = target_prompt_form + if section_specs is not None: + self.config.section_specs = section_specs + if recommendations is not None: + self.config.recommendations = recommendations + if constraints is not None: + self.config.constraints = constraints + if output_format_section is not None: + self.config.output_format_section = output_format_section + if include_role is not None: + self.config.include_role = include_role + + role_section = self.build_role_section(include_role=include_role) + prompt_structure_section = self.build_prompt_structure_section() + recommendations_section = self.build_recommendations_section( + recommendations=recommendations + ) + constraints_section = self.build_constraints_section() + output_format_section = self.build_output_format_section() + + return self.HYPE_META_PROMPT_TEMPLATE.format( + role_section=role_section, + prompt_structure_section=prompt_structure_section, + recommendations_section=recommendations_section, + constraints_section=constraints_section, + output_format_section=output_format_section, + ) + + def rebuild_all_sections(self) -> None: + self._cache_all_sections() diff --git a/notebooks/experiments/ablation_analysis.ipynb b/notebooks/experiments/ablation_analysis.ipynb new file mode 100644 index 0000000..e1efefb --- /dev/null +++ b/notebooks/experiments/ablation_analysis.ipynb @@ -0,0 +1,589 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# HyPE Ablation Study Analysis\n", + "\n", + "This notebook analyzes the results of the ablation study for the HyPE (Hypothetical Prompt Engineering) meta-prompt variants.\n", + "\n", + "## Factors analyzed:\n", + "- **TF (Target Form)**: `inst` vs `hyp_inst`\n", + "- **R (Include Role)**: Whether to include the role section\n", + "- **US (Use Sections)**: Whether to use structured sections (TC, RS, OS)\n", + "- **TC (Task Context)**: Include task context section\n", + "- **RS (Role Section)**: Include role section in meta-prompt\n", + "- **OS (Output Section)**: Include output format section\n", + "- **MD (Markdown)**: Always 0 (disabled)\n", + "\n", + "## Benchmarks:\n", + "- gsm8k (Exact Match)\n", + "- squad_v2 (BertScore)\n", + "- common_gen (BertScore)\n", + "- tweeteval (F1)\n", + "- xsum (BertScore)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import json\n", + "import pandas as pd\n", + "import numpy as np\n", + "from pathlib import Path\n", + "from scipy.stats import hmean\n", + "import matplotlib.pyplot as plt\n", + "import seaborn as sns\n", + "\n", + "# Set style\n", + "plt.style.use('seaborn-v0_8-whitegrid')\n", + "sns.set_palette(\"husl\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Load results\n", + "results_path = Path(\"../ablation_prompts/ablation_scores.json\")\n", + "with open(results_path) as f:\n", + " data = json.load(f)\n", + "\n", + "print(f\"Loaded results from: {results_path}\")\n", + "print(f\"Meta info: {data['meta']}\")\n", + "print(f\"\\nNumber of variants: {len(data['results'])}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Parse variant names and extract factor values\n", + "def parse_variant_name(name: str) -> dict:\n", + " \"\"\"Parse variant name like 'TFhyp_inst_R0_US0_TC0_RS0_OS0_MD0' into factors.\"\"\"\n", + " parts = name.split('_')\n", + " result = {}\n", + " for part in parts:\n", + " if part.startswith('TF'):\n", + " result['TF'] = part.replace('TF', '')\n", + " elif part.startswith('R'):\n", + " result['R'] = int(part[1:])\n", + " elif part.startswith('US'):\n", + " result['US'] = int(part[2:])\n", + " elif part.startswith('TC'):\n", + " result['TC'] = int(part[2:])\n", + " elif part.startswith('RS'):\n", + " result['RS'] = int(part[2:])\n", + " elif part.startswith('OS'):\n", + " result['OS'] = int(part[2:])\n", + " elif part.startswith('MD'):\n", + " result['MD'] = int(part[2:])\n", + " return result\n", + "\n", + "# Build DataFrame from results\n", + "rows = []\n", + "for variant_name, variant_data in data['results'].items():\n", + " factors = parse_variant_name(variant_name)\n", + " \n", + " for bench_name, bench_data in variant_data.get('benchmarks', {}).items():\n", + " metric_value = bench_data.get('metric_value')\n", + " format_compliance = bench_data.get('format_compliance', 0.0)\n", + " \n", + " # Skip failed entries\n", + " if metric_value is None:\n", + " continue\n", + " \n", + " row = {\n", + " 'variant': variant_name,\n", + " 'benchmark': bench_name,\n", + " 'metric_value': metric_value,\n", + " 'format_compliance': format_compliance,\n", + " **factors\n", + " }\n", + " rows.append(row)\n", + "\n", + "df = pd.DataFrame(rows)\n", + "print(f\"Total rows (variant × benchmark): {len(df)}\")\n", + "print(f\"Unique variants: {df['variant'].nunique()}\")\n", + "print(f\"Unique benchmarks: {df['benchmark'].nunique()}\")\n", + "df.head(10)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Check which variants have complete data\n", + "variant_counts = df.groupby('variant')['benchmark'].count()\n", + "print(\"Variants with complete benchmark coverage:\")\n", + "complete_variants = variant_counts[variant_counts == 5].index.tolist()\n", + "print(f\" {len(complete_variants)} / {len(variant_counts)} variants have all 5 benchmarks\")\n", + "\n", + "print(\"\\nVariants with missing benchmarks:\")\n", + "incomplete = variant_counts[variant_counts < 5]\n", + "for var, count in incomplete.items():\n", + " missing = set(df['benchmark'].unique()) - set(df[df['variant'] == var]['benchmark'])\n", + " print(f\" {var}: {count}/5 benchmarks, missing: {missing}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 1. Per-Variant Metrics Table" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Pivot table: variants × benchmarks with metric values\n", + "variant_bench_pivot = df.pivot_table(\n", + " index='variant', \n", + " columns='benchmark', \n", + " values='metric_value',\n", + " aggfunc='mean'\n", + ")\n", + "\n", + "# Calculate average quality across benchmarks\n", + "variant_bench_pivot['avg_quality'] = variant_bench_pivot.mean(axis=1)\n", + "\n", + "# Calculate harmonic mean of quality metrics\n", + "def calc_harmonic_mean(row):\n", + " values = row.drop('avg_quality').values\n", + " valid = values[~np.isnan(values)]\n", + " if len(valid) == 0:\n", + " return np.nan\n", + " return hmean(valid)\n", + "\n", + "variant_bench_pivot['harmonic_mean'] = variant_bench_pivot.apply(calc_harmonic_mean, axis=1)\n", + "\n", + "# Add format compliance (average across benchmarks)\n", + "fmt_pivot = df.pivot_table(\n", + " index='variant', \n", + " columns='benchmark', \n", + " values='format_compliance',\n", + " aggfunc='mean'\n", + ")\n", + "variant_bench_pivot['avg_format_compliance'] = fmt_pivot.mean(axis=1)\n", + "\n", + "# Final score: harmonic mean × average format compliance\n", + "variant_bench_pivot['final_score'] = variant_bench_pivot['harmonic_mean'] * variant_bench_pivot['avg_format_compliance']\n", + "\n", + "# Sort by final score\n", + "variant_bench_pivot = variant_bench_pivot.sort_values('final_score', ascending=False)\n", + "\n", + "print(\"Per-variant metrics (sorted by final_score):\\n\")\n", + "variant_bench_pivot.round(4)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Show top 10 variants\n", + "print(\"Top 10 variants by final_score:\\n\")\n", + "display_cols = ['avg_quality', 'harmonic_mean', 'avg_format_compliance', 'final_score']\n", + "variant_bench_pivot[display_cols].head(10).round(4)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2. Per-Factor Analysis\n", + "\n", + "Analyze the effect of each factor (on/off) on the average metric value." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Per-factor analysis\n", + "factors = ['TF', 'R', 'US', 'TC', 'RS', 'OS']\n", + "\n", + "factor_analysis = {}\n", + "for factor in factors:\n", + " if factor == 'TF':\n", + " # Special handling for TF (categorical)\n", + " grouped = df.groupby('TF')['metric_value'].agg(['mean', 'std', 'count'])\n", + " else:\n", + " grouped = df.groupby(factor)['metric_value'].agg(['mean', 'std', 'count'])\n", + " factor_analysis[factor] = grouped\n", + " \n", + "print(\"## Factor Impact Analysis\\n\")\n", + "for factor, stats in factor_analysis.items():\n", + " print(f\"### {factor}\")\n", + " print(stats.round(4))\n", + " print()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Visualize factor impact\n", + "fig, axes = plt.subplots(2, 3, figsize=(15, 10))\n", + "axes = axes.flatten()\n", + "\n", + "for idx, factor in enumerate(factors):\n", + " ax = axes[idx]\n", + " \n", + " if factor == 'TF':\n", + " stats = factor_analysis[factor]\n", + " bars = ax.bar(stats.index.astype(str), stats['mean'], yerr=stats['std'], capsize=5)\n", + " ax.set_xlabel('Target Form')\n", + " else:\n", + " stats = factor_analysis[factor]\n", + " bars = ax.bar(['Off (0)', 'On (1)'], stats['mean'], yerr=stats['std'], capsize=5)\n", + " ax.set_xlabel(factor)\n", + " \n", + " ax.set_ylabel('Avg Metric Value')\n", + " ax.set_title(f'Impact of {factor}')\n", + " ax.set_ylim(0, 1)\n", + "\n", + "plt.tight_layout()\n", + "plt.savefig('factor_impact.png', dpi=150, bbox_inches='tight')\n", + "plt.show()\n", + "print(\"Saved factor_impact.png\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3. Per-Benchmark Breakdown" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Per-benchmark statistics\n", + "bench_stats = df.groupby('benchmark')['metric_value'].agg(['mean', 'std', 'min', 'max', 'count'])\n", + "bench_stats = bench_stats.sort_values('mean', ascending=False)\n", + "print(\"Benchmark-level statistics:\\n\")\n", + "bench_stats.round(4)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Visualize benchmark performance\n", + "fig, ax = plt.subplots(figsize=(10, 6))\n", + "\n", + "bench_means = df.groupby('benchmark')['metric_value'].mean().sort_values(ascending=True)\n", + "bench_stds = df.groupby('benchmark')['metric_value'].std()\n", + "\n", + "bars = ax.barh(bench_means.index, bench_means.values, xerr=bench_stds[bench_means.index], capsize=5)\n", + "ax.set_xlabel('Average Metric Value')\n", + "ax.set_title('Performance by Benchmark')\n", + "ax.set_xlim(0, 1)\n", + "\n", + "for bar, mean_val in zip(bars, bench_means.values):\n", + " ax.text(mean_val + 0.02, bar.get_y() + bar.get_height()/2, f'{mean_val:.3f}', va='center')\n", + "\n", + "plt.tight_layout()\n", + "plt.savefig('benchmark_performance.png', dpi=150, bbox_inches='tight')\n", + "plt.show()\n", + "print(\"Saved benchmark_performance.png\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 4. Format Compliance Analysis" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Format compliance by benchmark\n", + "fmt_by_bench = df.groupby('benchmark')['format_compliance'].agg(['mean', 'std', 'min', 'max'])\n", + "print(\"Format Compliance by Benchmark:\\n\")\n", + "fmt_by_bench.round(4)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Format compliance by variant\n", + "fmt_by_variant = df.groupby('variant')['format_compliance'].mean().sort_values(ascending=False)\n", + "print(\"Format Compliance by Variant (top 10):\\n\")\n", + "fmt_by_variant.head(10).round(4)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Visualize format compliance\n", + "fig, axes = plt.subplots(1, 2, figsize=(14, 5))\n", + "\n", + "# By benchmark\n", + "ax1 = axes[0]\n", + "fmt_bench = df.groupby('benchmark')['format_compliance'].mean().sort_values()\n", + "ax1.barh(fmt_bench.index, fmt_bench.values)\n", + "ax1.set_xlabel('Format Compliance')\n", + "ax1.set_title('Format Compliance by Benchmark')\n", + "ax1.set_xlim(0, 1.1)\n", + "\n", + "# By variant (top 15)\n", + "ax2 = axes[1]\n", + "fmt_var = df.groupby('variant')['format_compliance'].mean().sort_values(ascending=False).head(15)\n", + "ax2.barh(range(len(fmt_var)), fmt_var.values)\n", + "ax2.set_yticks(range(len(fmt_var)))\n", + "ax2.set_yticklabels([v[:30] + '...' if len(v) > 30 else v for v in fmt_var.index], fontsize=8)\n", + "ax2.set_xlabel('Format Compliance')\n", + "ax2.set_title('Format Compliance by Variant (Top 15)')\n", + "ax2.set_xlim(0, 1.1)\n", + "\n", + "plt.tight_layout()\n", + "plt.savefig('format_compliance.png', dpi=150, bbox_inches='tight')\n", + "plt.show()\n", + "print(\"Saved format_compliance.png\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 5. Factor Interaction Analysis\n", + "\n", + "Analyze how combinations of factors affect performance." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# US (Use Sections) impact - this is the main toggle\n", + "us_impact = df.groupby('US')['metric_value'].agg(['mean', 'std', 'count'])\n", + "print(\"US (Use Sections) Impact:\\n\")\n", + "print(us_impact.round(4))\n", + "\n", + "# When US=1, breakdown by TC, RS, OS\n", + "print(\"\\n--- When US=1 (sections enabled) ---\\n\")\n", + "df_us1 = df[df['US'] == 1]\n", + "\n", + "for factor in ['TC', 'RS', 'OS']:\n", + " stats = df_us1.groupby(factor)['metric_value'].agg(['mean', 'std', 'count'])\n", + " print(f\"{factor}:\")\n", + " print(stats.round(4))\n", + " print()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Interaction heatmap: US × R\n", + "pivot_us_r = df.pivot_table(index='US', columns='R', values='metric_value', aggfunc='mean')\n", + "print(\"US × R Interaction:\\n\")\n", + "print(pivot_us_r.round(4))\n", + "\n", + "fig, ax = plt.subplots(figsize=(8, 6))\n", + "sns.heatmap(pivot_us_r, annot=True, fmt='.3f', cmap='viridis', ax=ax)\n", + "ax.set_title('Metric Value: US × R Interaction')\n", + "ax.set_xlabel('R (Include Role)')\n", + "ax.set_ylabel('US (Use Sections)')\n", + "plt.tight_layout()\n", + "plt.savefig('interaction_us_r.png', dpi=150, bbox_inches='tight')\n", + "plt.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Full factor correlation matrix\n", + "factor_cols = ['R', 'US', 'TC', 'RS', 'OS']\n", + "corr_with_metric = {}\n", + "for col in factor_cols:\n", + " corr_with_metric[col] = df[col].corr(df['metric_value'])\n", + "\n", + "corr_df = pd.DataFrame.from_dict(corr_with_metric, orient='index', columns=['correlation_with_metric'])\n", + "corr_df = corr_df.sort_values('correlation_with_metric', key=abs, ascending=False)\n", + "print(\"Factor correlation with metric value:\\n\")\n", + "corr_df.round(4)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 6. Final Ranking with Harmonic Mean" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Final ranking table\n", + "ranking = variant_bench_pivot[['harmonic_mean', 'avg_format_compliance', 'final_score']].copy()\n", + "ranking = ranking.sort_values('final_score', ascending=False)\n", + "ranking['rank'] = range(1, len(ranking) + 1)\n", + "ranking = ranking[['rank', 'harmonic_mean', 'avg_format_compliance', 'final_score']]\n", + "\n", + "print(\"Final Ranking (by Final Score = Harmonic Mean × Format Compliance):\\n\")\n", + "ranking.head(20).round(4)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Visualize top 15 variants\n", + "fig, ax = plt.subplots(figsize=(12, 8))\n", + "\n", + "top15 = ranking.head(15)\n", + "y_pos = range(len(top15))\n", + "\n", + "bars = ax.barh(y_pos, top15['final_score'].values)\n", + "ax.set_yticks(y_pos)\n", + "ax.set_yticklabels([f\"#{i+1} {idx[:35]}...\" if len(idx) > 35 else f\"#{i+1} {idx}\" for i, idx in enumerate(top15.index)], fontsize=8)\n", + "ax.invert_yaxis()\n", + "ax.set_xlabel('Final Score')\n", + "ax.set_title('Top 15 Variants by Final Score\\n(Harmonic Mean × Format Compliance)')\n", + "\n", + "for i, (bar, val) in enumerate(zip(bars, top15['final_score'].values)):\n", + " ax.text(val + 0.005, i, f'{val:.4f}', va='center', fontsize=9)\n", + "\n", + "plt.tight_layout()\n", + "plt.savefig('top_variants.png', dpi=150, bbox_inches='tight')\n", + "plt.show()\n", + "print(\"Saved top_variants.png\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 7. Summary Statistics" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Summary statistics\n", + "print(\"=\" * 60)\n", + "print(\"ABLATION STUDY SUMMARY\")\n", + "print(\"=\" * 60)\n", + "\n", + "print(f\"\\nTotal variants evaluated: {df['variant'].nunique()}\")\n", + "print(f\"Total benchmark evaluations: {len(df)}\")\n", + "print(f\"Benchmarks: {df['benchmark'].unique().tolist()}\")\n", + "\n", + "print(f\"\\n--- Quality Metrics ---\")\n", + "print(f\"Average metric value: {df['metric_value'].mean():.4f}\")\n", + "print(f\"Std deviation: {df['metric_value'].std():.4f}\")\n", + "print(f\"Min: {df['metric_value'].min():.4f}\")\n", + "print(f\"Max: {df['metric_value'].max():.4f}\")\n", + "\n", + "print(f\"\\n--- Format Compliance ---\")\n", + "print(f\"Average format compliance: {df['format_compliance'].mean():.4f}\")\n", + "print(f\"Min: {df['format_compliance'].min():.4f}\")\n", + "print(f\"Max: {df['format_compliance'].max():.4f}\")\n", + "\n", + "print(f\"\\n--- Best Variant ---\")\n", + "best_variant = ranking.index[0]\n", + "print(f\"Variant: {best_variant}\")\n", + "print(f\"Final Score: {ranking.iloc[0]['final_score']:.4f}\")\n", + "print(f\"Harmonic Mean: {ranking.iloc[0]['harmonic_mean']:.4f}\")\n", + "print(f\"Format Compliance: {ranking.iloc[0]['avg_format_compliance']:.4f}\")\n", + "\n", + "print(f\"\\n--- Factor Impact Summary ---\")\n", + "for factor in ['R', 'US', 'TC', 'RS', 'OS']:\n", + " on_val = df[df[factor] == 1]['metric_value'].mean()\n", + " off_val = df[df[factor] == 0]['metric_value'].mean()\n", + " diff = on_val - off_val\n", + " direction = \"↑\" if diff > 0 else \"↓\"\n", + " print(f\"{factor}: On={on_val:.4f}, Off={off_val:.4f}, Diff={diff:+.4f} {direction}\")\n", + "\n", + "print(\"\\n\" + \"=\" * 60)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Export summary to CSV\n", + "ranking.to_csv('ablation_ranking.csv')\n", + "print(\"Exported ranking to ablation_ranking.csv\")\n", + "\n", + "# Export per-benchmark breakdown\n", + "variant_bench_pivot.to_csv('ablation_variant_benchmarks.csv')\n", + "print(\"Exported variant × benchmark metrics to ablation_variant_benchmarks.csv\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.0" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/src/prompts_scoring/prompts_scoring_example.ipynb b/src/prompts_scoring/prompts_scoring_example.ipynb new file mode 100644 index 0000000..e69de29 diff --git a/src/solutions/HyPE/ablation/generate_prompts.py b/src/solutions/HyPE/ablation/generate_prompts.py new file mode 100644 index 0000000..e69de29 diff --git a/src/solutions/HyPE/ablation/inference.py b/src/solutions/HyPE/ablation/inference.py new file mode 100644 index 0000000..bb32193 --- /dev/null +++ b/src/solutions/HyPE/ablation/inference.py @@ -0,0 +1,236 @@ +import itertools +import json +import sys +from pathlib import Path +from datetime import datetime +from typing import List + +project_path = str(Path(__file__).resolve().parent.parent.parent.parent.parent) +sys.path.insert(0, project_path) + +from coolprompt.utils.prompt_templates.hyper_templates import ( + HypeMetaPromptBuilder, + PromptSectionSpec, +) + + +def generate_sections_config( + include_role_section: bool, + include_task_context: bool, + include_output_section: bool, +) -> List[PromptSectionSpec]: + """Генерирует конфиг секций по флагам. + + Секция Instructions включается всегда. + Role, Task context, Output requirements — опционально. + """ + sections: List[PromptSectionSpec] = [] + + if include_role_section: + sections.append( + PromptSectionSpec( + name="Role", + description=( + "Briefly define the assistant's role and expertise " + "relevant to the user query." + ), + ) + ) + + if include_task_context: + sections.append( + PromptSectionSpec( + name="Task context", + description=( + "Provide the full context of the user's task: restate the query, " + "include all provided meta-information, domain details, constraints, " + "and any other information necessary to produce a correct solution. " + "Do not evaluate or condense — pass through everything relevant." + ), + ) + ) + + sections.append( + PromptSectionSpec( + name="Instructions", + description=( + "Main part - instructions the assistant must follow " + "to solve the user's query while respecting constraints." + ), + ) + ) + + if include_output_section: + sections.append( + PromptSectionSpec( + name="Output requirements", + description=( + "Clearly specify the desired tone and required level of detail. " + "If the user explicitly requests a particular output format or " + "provides an example response, restate that format and include " + "the example verbatim, without inventing any additional formatting." + ), + ) + ) + + return sections + + +def _make_variant_name( + target_form: str, + include_role: bool, + use_sections: bool, + task_context: bool, + role_section: bool, + output_section: bool, + use_markdown: bool, +) -> str: + """Имя варианта: TF_R_US_TC_RS_OS_MD""" + tf = "hyp_inst" if "hypothetical" in target_form else "inst" + return ( + f"TF{tf}" + f"_R{int(include_role)}" + f"_US{int(use_sections)}" + f"_TC{int(task_context)}" + f"_RS{int(role_section)}" + f"_OS{int(output_section)}" + f"_MD{int(use_markdown)}" + ) + + +def _build_meta_prompt_no_sections( + builder: HypeMetaPromptBuilder, + target_prompt_form: str, + include_role: bool, + use_markdown: bool, +) -> str: + """Собирает мета-промпт БЕЗ секции STRUCTURE OF THE PROMPT. + + Используется когда use_sections=False. + """ + builder.config.target_prompt_form = target_prompt_form + builder.config.include_role = include_role + builder.config.require_markdown_prompt = use_markdown + + role_section = builder.build_role_section(include_role=include_role) + output_format_section = builder.build_output_format_section() + + # Собираем без prompt_structure_section, recommendations и constraints + return ( + f"{role_section}" + f"{output_format_section}" + ) + + +def main_ablation(): + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + out_dir = Path("ablation_prompts") + out_dir.mkdir(exist_ok=True) + + builder = HypeMetaPromptBuilder() + + # Факторы: + # target_form: "instructional " | "hypothetical instructional " + # include_role: True | False — включать роль в мета-промпт + # use_sections: True | False — использовать секции в продуцируемом промпте + # role_section: True | False — секция Role (только при US=1) + # task_context: True | False — секция Task context (только при US=1) + # output_section: True | False — секция Output requirements (только при US=1) + # use_markdown: всегда False + + target_forms = ["instructional ", "hypothetical instructional "] + include_roles = [True, False] + use_markdown = False # всегда выключен + + prompts: dict[str, str] = {} + + for target_form, include_role in itertools.product(target_forms, include_roles): + # --- US=0: секций нет, RS=TC=OS=0 --- + name = _make_variant_name( + target_form=target_form, + include_role=include_role, + use_sections=False, + task_context=False, + role_section=False, + output_section=False, + use_markdown=use_markdown, + ) + meta_prompt = _build_meta_prompt_no_sections( + builder=builder, + target_prompt_form=target_form, + include_role=include_role, + use_markdown=use_markdown, + ) + prompts[name] = meta_prompt + print(f"✅ {name}") + + # --- US=1: перебираем RS, TC, OS --- + for role_section, task_context, output_section in itertools.product( + [True, False], [True, False], [True, False] + ): + specs = generate_sections_config( + include_role_section=role_section, + include_task_context=task_context, + include_output_section=output_section, + ) + + orig_markdown = builder.config.require_markdown_prompt + builder.config.require_markdown_prompt = use_markdown + + meta_prompt = builder.build_meta_prompt( + target_prompt_form=target_form, + section_specs=specs, + constraints=[], + include_role=include_role, + ) + + name = _make_variant_name( + target_form=target_form, + include_role=include_role, + use_sections=True, + task_context=task_context, + role_section=role_section, + output_section=output_section, + use_markdown=use_markdown, + ) + prompts[name] = meta_prompt + print(f"✅ {name}") + + builder.config.require_markdown_prompt = orig_markdown + + total_variants = len(prompts) + json_file = out_dir / f"meta_prompts_{total_variants}v_{timestamp}.json" + + payload = { + "meta": { + "timestamp": timestamp, + "total_variants": total_variants, + "factors": [ + "target_form (inst | hyp_inst)", + "include_role (R)", + "use_sections (US)", + "task_context (TC) — only when US=1", + "role_section (RS) — only when US=1", + "output_section (OS) — only when US=1", + "markdown (MD) — always 0", + ], + "naming": "TF{inst|hyp_inst}_R{0|1}_US{0|1}_TC{0|1}_RS{0|1}_OS{0|1}_MD{0}", + "note": ( + "When US=0, TC/RS/OS are forced to 0 (no sections). " + "Total = 2(TF) × 2(R) × (1 + 2³) = 36 unique variants." + ), + }, + "prompts": prompts, + } + + with open(json_file, "w", encoding="utf-8") as f: + json.dump(payload, f, ensure_ascii=False, indent=2) + + print(f"\n🎉 Готово! {total_variants} вариантов в {json_file}") + print( + f"📊 Naming: TF{{inst|hyp_inst}}_R{{0|1}}_US{{0|1}}_TC{{0|1}}_RS{{0|1}}_OS{{0|1}}_MD{{0}}" + ) + + +if __name__ == "__main__": + main_ablation() diff --git a/src/solutions/HyPE/ablation/score.py b/src/solutions/HyPE/ablation/score.py new file mode 100644 index 0000000..d6ca110 --- /dev/null +++ b/src/solutions/HyPE/ablation/score.py @@ -0,0 +1,510 @@ +"""Ablation scoring: iterate meta-prompt variants × benchmarks, collect metrics. + +Features: + - Checkpoint/resume: saves results after each (variant, benchmark) pair. + On restart, skips already-completed pairs. Failed pairs (with "error" key) + are automatically retried. + - File logging: all output goes to both stdout and a log file. +""" + +import json +import logging +import random +import sys +from datetime import datetime +from pathlib import Path +from typing import Any + +import numpy as np +import pandas as pd +import transformers +from tqdm import tqdm +from langchain_openai import ChatOpenAI +from langchain_core.rate_limiters import InMemoryRateLimiter +from langchain_core.messages.ai import AIMessage +from langchain_core.runnables import RunnableConfig + +project_path = str(Path(__file__).resolve().parent.parent.parent.parent.parent) +sys.path.insert(0, project_path) + +from coolprompt.optimizer.hype.hype import HyPEOptimizer +from coolprompt.evaluator import Evaluator, validate_and_create_metric +from coolprompt.evaluator.metrics import BaseMetric +from coolprompt.utils.var_validation import validate_task +from coolprompt.utils.enums import Task +from coolprompt.utils.parsing import extract_answer +from coolprompt.utils.prompt_templates.default_templates import ( + CLASSIFICATION_TASK_TEMPLATE, + GENERATION_TASK_TEMPLATE, +) + +from src.solutions.HyPE.config_dict import config_dict +from src.utils.load_dataset_coolprompt import tweeteval_emotions + + +# ── constants ──────────────────────────────────────────────────────────────── + +TEMPLATE_MAP = { + "classification": CLASSIFICATION_TASK_TEMPLATE, + "generation": GENERATION_TASK_TEMPLATE, +} + +QUERY_SUFFIX = ( + "\n\n{META_INFO_BLOCK}" + "User query:\n\n{QUERY}\n\n" +) + +ANS_TAGS = ("", "") + + +# ── logging setup ──────────────────────────────────────────────────────────── + +def setup_file_logger(log_path: Path) -> logging.Logger: + """Create a logger that writes to both file and stdout.""" + logger = logging.getLogger("ablation_score") + logger.setLevel(logging.INFO) + logger.handlers.clear() + + fmt = logging.Formatter( + "%(asctime)s | %(levelname)-7s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + + fh = logging.FileHandler(log_path, encoding="utf-8") + fh.setLevel(logging.INFO) + fh.setFormatter(fmt) + logger.addHandler(fh) + + sh = logging.StreamHandler(sys.stdout) + sh.setLevel(logging.INFO) + sh.setFormatter(fmt) + logger.addHandler(sh) + + return logger + + +# ── helpers ────────────────────────────────────────────────────────────────── + +def sample( + data: pd.DataFrame, + sample_size: int | None = None, + seed: int = 42, +) -> pd.DataFrame: + np.random.seed(seed) + if sample_size is None: + return data + + if set(data["target"].unique()).issubset(set(tweeteval_emotions)): + min_class_size = data["target"].value_counts().min() + per_class = min(sample_size // len(tweeteval_emotions), min_class_size) + balanced_parts = [ + df.sample(per_class, random_state=seed) + for _, df in data.groupby("target") + ] + return pd.concat(balanced_parts).reset_index(drop=True) + else: + return data.sample(sample_size, random_state=seed) + + +def load_meta_prompts(path: str | Path) -> dict[str, str]: + """Load meta-prompt variants from JSON produced by inference.py.""" + with open(path, "r", encoding="utf-8") as f: + payload = json.load(f) + return payload["prompts"] + + +def make_full_meta_prompt(meta_prompt_body: str) -> str: + """Append the query/meta-info template that HyPEOptimizer expects.""" + return meta_prompt_body + QUERY_SUFFIX + + +def compute_format_compliance(raw_answers: list[str]) -> float: + """Compute the fraction of raw answers that contain ... tags.""" + if not raw_answers: + return 0.0 + compliant = sum( + 1 + for ans in raw_answers + if ANS_TAGS[0] in ans and ANS_TAGS[1] in ans + ) + return compliant / len(raw_answers) + + +def evaluate_with_details( + evaluator: Evaluator, + prompt: str, + dataset: list[str], + targets: list[str | int], + template: str, + n_wrong_samples: int = 3, + seed: int = 42, +) -> dict[str, Any]: + """Run evaluation and return metric, format compliance, and wrong answer samples. + + Single model.batch() call — no extra LLM calls. + + Returns dict with: + - metric_value: float + - format_compliance: float (fraction of answers with tags) + - wrong_samples: list of dicts with input, raw_answer, parsed_answer, ground_truth + """ + if evaluator.task == Task.CLASSIFICATION: + evaluator.metric.extract_labels(targets) + + full_prompts = [ + evaluator._get_full_prompt(prompt, s, template) + for s in dataset + ] + raw_results = evaluator.model.batch( + full_prompts, + config=RunnableConfig(max_concurrency=20), + ) + raw_answers = [ + a.content if isinstance(a, AIMessage) else str(a) + for a in raw_results + ] + + format_compliance = compute_format_compliance(raw_answers) + metric_value = evaluator.metric.compute(raw_answers, targets, dataset) + parsed_answers = [evaluator.metric.parse_output(a) for a in raw_answers] + + wrong_indices = [] + for i, (parsed, target) in enumerate(zip(parsed_answers, targets)): + if str(parsed).strip().lower() != str(target).strip().lower(): + wrong_indices.append(i) + + rng = random.Random(seed) + if len(wrong_indices) > n_wrong_samples: + wrong_indices = rng.sample(wrong_indices, n_wrong_samples) + + wrong_samples = [ + { + "input": dataset[i], + "raw_answer": raw_answers[i], + "parsed_answer": str(parsed_answers[i]), + "ground_truth": str(targets[i]), + } + for i in wrong_indices + ] + + return { + "metric_value": metric_value, + "format_compliance": format_compliance, + "wrong_samples": wrong_samples, + } + + +# ── checkpoint I/O ─────────────────────────────────────────────────────────── + +def load_checkpoint(path: Path) -> dict[str, Any]: + """Load existing checkpoint or return empty structure.""" + if path.exists(): + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + return {"meta": {}, "results": {}} + + +def save_checkpoint(path: Path, payload: dict[str, Any]) -> None: + """Atomically save checkpoint (write to tmp then rename).""" + tmp = path.with_suffix(".tmp") + with open(tmp, "w", encoding="utf-8") as f: + json.dump(payload, f, ensure_ascii=False, indent=2) + tmp.rename(path) + + +def is_bench_done(results: dict, variant_name: str, bench_name: str) -> bool: + """Check if a (variant, bench) pair is already completed successfully.""" + variant = results.get(variant_name) + if variant is None: + return False + bench = variant.get("benchmarks", {}).get(bench_name) + if bench is None: + return False + # Retry if there was an error + if "error" in bench: + return False + # Retry if metric_value is None (incomplete) + if bench.get("metric_value") is None: + return False + return True + + +# ── main scoring loop ──────────────────────────────────────────────────────── + +def run_ablation_scoring( + meta_prompts_path: str | Path, + output_file: Path, + sample_size: int = 200, + model_name: str = "gpt-4o-mini", +) -> dict[str, Any]: + """Score every meta-prompt variant on every benchmark. + + Supports checkpoint/resume: loads existing results from output_file, + skips completed (variant, benchmark) pairs, retries failed ones. + Saves after each (variant, benchmark) completion. + """ + log_path = output_file.with_suffix(".log") + log = setup_file_logger(log_path) + log.info(f"=== Ablation scoring started ===") + log.info(f"Log file: {log_path}") + log.info(f"Checkpoint file: {output_file}") + + # ── LLM setup ──────────────────────────────────────────────────────── + rate_limiter = InMemoryRateLimiter( + requests_per_second=15, + check_every_n_seconds=0.1, + max_bucket_size=50, + ) + llm = ChatOpenAI( + model=model_name, + temperature=0.7, + max_completion_tokens=4000, + max_retries=5, + rate_limiter=rate_limiter, + api_key="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", + extra_body={"allowed_providers": ["google-vertex", "azure"]}, + base_url="https://openrouter.ai/api/v1", + ) + + hype_opt = HyPEOptimizer(model=llm) + + # ── load meta-prompt variants ──────────────────────────────────────── + prompts_map = load_meta_prompts(meta_prompts_path) + variant_names = sorted(prompts_map.keys()) + log.info(f"Loaded {len(variant_names)} meta-prompt variants from {meta_prompts_path}") + + # ── prepare benchmarks ─────────────────────────────────────────────── + benchmarks: dict[str, dict[str, Any]] = {} + for task_name, cfg in config_dict.items(): + data_val = cfg["data"][cfg["test_name"]] + preproc_data = cfg["preproc"](data_val) + data_sample = sample(preproc_data, sample_size=sample_size) + dataset = list(data_sample["input_data"]) + target = list(data_sample["target"]) + + task_type = validate_task(cfg["task"]) + metric = validate_and_create_metric(task_type, cfg["metric"]) + evaluator = Evaluator(llm, task_type, metric) + template = TEMPLATE_MAP[cfg["task"]] + + benchmarks[task_name] = { + "dataset": dataset, + "target": target, + "evaluator": evaluator, + "template": template, + "metric_name": cfg["metric"], + "start_prompt": cfg["start_prompt"], + "problem_description": cfg["problem_description"], + } + + bench_names = list(benchmarks.keys()) + log.info(f"Prepared {len(benchmarks)} benchmarks: {bench_names}") + + # ── load checkpoint ────────────────────────────────────────────────── + payload = load_checkpoint(output_file) + results = payload.get("results", {}) + + # Update meta + payload["meta"] = { + "started": payload.get("meta", {}).get("started", datetime.now().isoformat()), + "last_updated": datetime.now().isoformat(), + "model": model_name, + "sample_size": sample_size, + "meta_prompts_source": str(meta_prompts_path), + "num_variants": len(variant_names), + "benchmarks": bench_names, + } + + # ── count work ─────────────────────────────────────────────────────── + total = len(variant_names) * len(benchmarks) + already_done = sum( + 1 + for vn in variant_names + for bn in bench_names + if is_bench_done(results, vn, bn) + ) + remaining = total - already_done + log.info(f"Total: {total} | Already done: {already_done} | Remaining: {remaining}") + + # ── build flat work list ────────────────────────────────────────────── + all_pairs = [ + (vn, bn) for vn in variant_names for bn in bench_names + ] + + # ── scoring loop with tqdm ──────────────────────────────────────────── + pbar = tqdm( + all_pairs, + total=total, + initial=already_done, + desc="Scoring", + unit="pair", + dynamic_ncols=True, + ) + + prev_variant = None + for variant_name, bench_name in pbar: + bench = benchmarks[bench_name] + + # Set up meta-prompt when variant changes + if variant_name != prev_variant: + meta_prompt_body = prompts_map[variant_name] + full_meta_prompt = make_full_meta_prompt(meta_prompt_body) + hype_opt.set_meta_prompt(full_meta_prompt) + prev_variant = variant_name + + # Ensure variant entry exists + if variant_name not in results: + results[variant_name] = { + "meta_prompt": meta_prompt_body, + "benchmarks": {}, + } + elif "meta_prompt" not in results[variant_name]: + results[variant_name]["meta_prompt"] = meta_prompt_body + if "benchmarks" not in results[variant_name]: + results[variant_name]["benchmarks"] = {} + + # Skip if already done + if is_bench_done(results, variant_name, bench_name): + pbar.set_postfix_str(f"{variant_name} × {bench_name} [cached]") + continue + + pbar.set_postfix_str(f"{variant_name} × {bench_name}") + log.info(f"{variant_name} × {bench_name} ...") + + try: + result_prompt = hype_opt.optimize( + prompt=bench["start_prompt"], + meta_info={ + "task_description": bench["problem_description"], + "required_output_format": ( + "The final answer MUST be wrapped in and XML tags." + ), + }, + ) + + eval_result = evaluate_with_details( + evaluator=bench["evaluator"], + prompt=result_prompt, + dataset=bench["dataset"], + targets=bench["target"], + template=bench["template"], + n_wrong_samples=3, + ) + + results[variant_name]["benchmarks"][bench_name] = { + "result_prompt": result_prompt, + "metric_name": bench["metric_name"], + "metric_value": eval_result["metric_value"], + "format_compliance": eval_result["format_compliance"], + "wrong_samples": eval_result["wrong_samples"], + } + fc = eval_result["format_compliance"] + mv = eval_result["metric_value"] + pbar.set_postfix_str( + f"{variant_name} × {bench_name} ✅ {bench['metric_name']}={mv:.4f} fmt={fc:.0%}" + ) + log.info(f" ✅ {bench['metric_name']}={mv:.4f} fmt={fc:.0%}") + + except Exception as e: + results[variant_name]["benchmarks"][bench_name] = { + "result_prompt": None, + "metric_name": bench["metric_name"], + "metric_value": None, + "format_compliance": None, + "wrong_samples": [], + "error": str(e), + } + pbar.set_postfix_str(f"{variant_name} × {bench_name} ❌") + log.error(f" ❌ {variant_name} × {bench_name}: {e}") + + # Save checkpoint after each (variant, bench) pair + payload["results"] = results + payload["meta"]["last_updated"] = datetime.now().isoformat() + save_checkpoint(output_file, payload) + + pbar.close() + + log.info("=== Scoring loop finished ===") + return results + + +def print_summary(results: dict[str, Any]) -> None: + """Print a summary table to stdout.""" + bench_names = list(config_dict.keys()) + col_width = 14 + print("\n📊 Summary (metric / format_compliance):") + print(f"{'Variant':<45} ", end="") + for bench_name in bench_names: + print(f"{bench_name:>{col_width}}", end="") + print() + print("-" * (45 + col_width * len(bench_names))) + + for variant_name, variant_data in sorted(results.items()): + print(f"{variant_name:<45} ", end="") + for bench_name in bench_names: + bench_result = variant_data.get("benchmarks", {}).get(bench_name, {}) + mv = bench_result.get("metric_value") + fc = bench_result.get("format_compliance") + if mv is not None and fc is not None: + print(f"{mv:.3f}/{fc:.0%}".rjust(col_width), end="") + elif "error" in bench_result: + print(f"{'FAIL':>{col_width}}", end="") + else: + print(f"{'---':>{col_width}}", end="") + print() + + +def main(): + import argparse + + parser = argparse.ArgumentParser(description="Ablation scoring with checkpoint/resume") + parser.add_argument( + "--meta-prompts", + type=str, + required=True, + help="Path to meta_prompts JSON from inference.py", + ) + parser.add_argument( + "--sample-size", + type=int, + default=200, + help="Number of samples per benchmark (default: 200)", + ) + parser.add_argument( + "--model", + type=str, + default="gpt-4o-mini", + help="Model name (default: gpt-4o-mini)", + ) + parser.add_argument( + "--output", + type=str, + default=None, + help=( + "Output JSON file path (also used as checkpoint). " + "Default: ablation_prompts/ablation_scores_.json" + ), + ) + args = parser.parse_args() + + if args.output: + out_file = Path(args.output) + else: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + out_dir = Path("ablation_prompts") + out_dir.mkdir(exist_ok=True) + out_file = out_dir / f"ablation_scores_{timestamp}.json" + + results = run_ablation_scoring( + meta_prompts_path=args.meta_prompts, + output_file=out_file, + sample_size=args.sample_size, + model_name=args.model, + ) + + print(f"\n🎉 Scoring complete! Results saved to {out_file}") + print_summary(results) + + +if __name__ == "__main__": + main() diff --git a/src/solutions/HyPE/config_dict.py b/src/solutions/HyPE/config_dict.py index c05ec5d..cf824e6 100644 --- a/src/solutions/HyPE/config_dict.py +++ b/src/solutions/HyPE/config_dict.py @@ -5,23 +5,14 @@ gsm8k_preproc, common_gen, common_gen_preproc, - ag_news, - ag_news_preproc, + tweeteval, + tweeteval_preproc, xsum, xsum_preproc, ) config_dict = { - "squad_v2": { - "start_prompt": "Given a context answer on the question.", - "task": "generation", - "metric": "bertscore", - "preproc": squad_v2_preproc, - "data": squad_v2, - "test_name": "validation", - "problem_description": "question answering", - }, "gsm8k": { "start_prompt": "Given a context answer on the question.", "task": "generation", @@ -31,6 +22,15 @@ "test_name": "test", "problem_description": "math solving", }, + "squad_v2": { + "start_prompt": "Given a context answer on the question.", + "task": "generation", + "metric": "bertscore", + "preproc": squad_v2_preproc, + "data": squad_v2, + "test_name": "validation", + "problem_description": "question answering", + }, "common_gen": { "start_prompt": "Create a short sentence using words in list.", "task": "generation", @@ -40,12 +40,12 @@ "test_name": "validation", "problem_description": "create a sentence", }, - "ag_news": { - "start_prompt": "Classify news and provide number of topic from dict {{World: 0, Sports: 1, Business: 2, Sci/Tech: 3}}", + "tweeteval": { + "start_prompt": "Provide sentiment classification.", "task": "classification", "metric": "f1", - "preproc": ag_news_preproc, - "data": ag_news, + "preproc": tweeteval_preproc, + "data": tweeteval, "test_name": "test", "problem_description": "classification", }, diff --git a/src/solutions/HyPE/hype_test.py b/src/solutions/HyPE/hype_test.py index 697ca58..fd78864 100644 --- a/src/solutions/HyPE/hype_test.py +++ b/src/solutions/HyPE/hype_test.py @@ -1,93 +1,100 @@ -import random +import os import sys from typing import Any from pathlib import Path import json +import numpy as np + +from langchain_openai import ChatOpenAI +from langchain_core.rate_limiters import InMemoryRateLimiter import pandas as pd -from sklearn.model_selection import train_test_split project_path = str(Path(__file__).resolve().parent.parent.parent.parent) print(project_path) sys.path.append(project_path) from config_dict import config_dict -from src.utils.load_dataset_coolprompt import ag_labels +from src.utils.load_dataset_coolprompt import tweeteval_emotions from coolprompt.assistant import PromptTuner -from coolprompt.language_model.llm import DefaultLLM -llm = DefaultLLM.init() +# llm = DefaultLLM.init(vllm_engine_config={"gpu_memory_utilization": 0.95}) +rate_limiter = InMemoryRateLimiter( + requests_per_second=1, check_every_n_seconds=0.1, max_bucket_size=10 +) +model = "gpt-4o-mini" +llm = ChatOpenAI( + model=model, + temperature=0.7, + max_completion_tokens=4000, + max_retries=5, + rate_limiter=rate_limiter, + api_key="", + extra_body={ + "allowed_providers": ["google-vertex", "azure"], + }, + base_url="https://openrouter.ai/api/v1", +) pt = PromptTuner(llm) -def manage_ag_news(data: pd.DataFrame, max_imbalance: float = 0.6): - if set(data["target"].unique()).issubset(set(ag_labels)): - class_proportions = data["target"].value_counts(normalize=True) - if class_proportions.max() > max_imbalance: - return None - else: - return data - - def sample( data: pd.DataFrame, sample_size: int = None, seed: int = 42, ) -> pd.DataFrame: - if sample_size is not None: - if set(data["target"].unique()).issubset(set(ag_labels)): - _, data_sample = train_test_split( - data, - train_size=sample_size, - stratify=data["target"], - random_state=seed, - ) - else: - rng = random.Random(seed) - - total_size = len(data) - n = min(sample_size, total_size) - - indices = rng.sample(range(total_size), n) + np.random.seed(seed) + if sample_size is None: + return data - data_sample = data.iloc[indices] + if set(data["target"].unique()).issubset(set(tweeteval_emotions)): + min_class_size = data["target"].value_counts().min() + per_class = min(sample_size // len(tweeteval_emotions), min_class_size) - return data_sample - return data + balanced_parts = [ + df.sample(per_class, random_state=seed) for _, df in data.groupby("target") + ] + return pd.concat(balanced_parts).reset_index(drop=True) + else: + return data.sample(sample_size, random_state=seed) def run_hype_dataset() -> dict[str, Any]: - result = {} + result = {"model": model} for task, cfg in config_dict.items(): - data_train, data_val = cfg["data"]["train"], cfg["data"]["validation"] - preproc_data = cfg["preproc"](data_val) - data_sample = sample(preproc_data, sample_size=100) - dataset, target = list(data_sample["input_data"]), list( - data_sample["target"] - ) - - final_prompt = pt.run( - cfg["start_prompt"], - cfg["task"], - dataset, - target, - "hype", - cfg["metric"], - cfg["problem_description"], - verbose=2, - train_as_test=True, - sample_answers=True, + data_train, data_val = ( + cfg["data"]["train"], + cfg["data"][cfg["test_name"]], ) + preproc_data = cfg["preproc"](data_val) + data_sample = sample(preproc_data, sample_size=10) + dataset, target = list(data_sample["input_data"]), list(data_sample["target"]) + + try: + final_prompt = pt.run( + cfg["start_prompt"], + cfg["task"], + dataset, + target, + "hyper", + cfg["metric"], + cfg["problem_description"], + verbose=2, + train_as_test=True, + feedback=False, + ) - result[task] = { - "metric": { - "name": cfg["metric"], - "start_score": pt.init_metric, - "final_metric": pt.final_metric, - }, - "prompt": final_prompt, - "samples": pt.answer_samples, - } + result[task] = { + "metric": { + "name": cfg["metric"], + "start_score": pt.init_metric, + "final_metric": pt.final_metric, + }, + "prompt": final_prompt, + } + except Exception as e: + print(f"!!!!EXCEPTION: {str(e)}!!!!") + result[task] = {"exception": str(e)} return result @@ -95,11 +102,13 @@ def run_hype_dataset() -> dict[str, Any]: def test(path: str | Path) -> None: with open(path, "w") as f: result = run_hype_dataset() + print("Saving to", os.path.abspath(path)) json.dump(result, f) + print(f"Successfully wrote to {path}") def main(): - test("./logs/test_1.json") + test("./logs/result.json") if __name__ == "__main__": diff --git a/src/solutions/HyPE/llm.py b/src/solutions/HyPE/llm.py new file mode 100644 index 0000000..25372b3 --- /dev/null +++ b/src/solutions/HyPE/llm.py @@ -0,0 +1,80 @@ +from langchain_community.callbacks.manager import get_openai_callback +from langchain_core.language_models.base import BaseLanguageModel + + +class TrackedLLMWrapper: + """Простая обертка вокруг ChatOpenAI с трекингом""" + + def __init__(self, model, tracker): + self.model = model + self.tracker = tracker + + @property + def __class__(self): + return BaseLanguageModel + + def invoke(self, input, **kwargs): + with get_openai_callback() as cb: + result = self.model.invoke(input, **kwargs) + self.tracker._update_stats(cb, True) + return result + + def batch(self, inputs, **kwargs): + with get_openai_callback() as cb: + results = self.model.batch(inputs, **kwargs) + self.tracker._update_stats(cb, False, batch_size=len(inputs)) + return results + + def reset_stats(self): + self.tracker.reset_stats() + + def get_stats(self): + return self.tracker.get_stats() + + # Проксируем остальные методы + def __getattr__(self, name): + return getattr(self.model, name) + + +class OpenAITracker: + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._reset_stats() + return cls._instance + + def _reset_stats(self): + self.stats = { + "total_calls": 0, + "total_tokens": 0, + "prompt_tokens": 0, + "completion_tokens": 0, + "total_cost": 0.0, + "invoke_calls": 0, + "batch_calls": 0, + "batch_items": 0, + } + + def _update_stats(self, callback, invoke_flag, **kwargs): + self.stats["total_calls"] += 1 + self.stats["total_tokens"] += callback.total_tokens + self.stats["prompt_tokens"] += callback.prompt_tokens + self.stats["completion_tokens"] += callback.completion_tokens + self.stats["total_cost"] += callback.total_cost + if invoke_flag: + self.stats["invoke_calls"] += 1 + else: + self.stats["batch_calls"] += 1 + self.stats["batch_items"] += kwargs.get("batch_size", 0) + + def wrap_model(self, model): + """Обертывает модель для трекинга""" + return TrackedLLMWrapper(model, self) + + def get_stats(self): + return self.stats.copy() + + def reset_stats(self): + self._reset_stats() diff --git a/src/utils/load_dataset_coolprompt.py b/src/utils/load_dataset_coolprompt.py index 99250f9..759df6d 100644 --- a/src/utils/load_dataset_coolprompt.py +++ b/src/utils/load_dataset_coolprompt.py @@ -4,15 +4,10 @@ squad_v2 = load_dataset("rajpurkar/squad_v2") gsm8k = load_dataset("openai/gsm8k", "main") common_gen = load_dataset("allenai/common_gen") -ag_news = load_dataset("fancyzhx/ag_news") +tweeteval = load_dataset("cardiffnlp/tweet_eval", "emotion") xsum = load_dataset("yairfeldman/xsum") -ag_labels = { - "World": 0, - "Sports": 1, - "Business": 2, - "Sci/Tech": 3, -} +tweeteval_emotions = {0: "anger", 1: "joy", 2: "optimism", 3: "sadness"} def squad_v2_preproc(sample, size: int = None): @@ -54,10 +49,12 @@ def common_gen_preproc(sample, size: int = None): return data -def ag_news_preproc(sample, size: int = None): +def tweeteval_preproc(sample, size: int = None): data = pd.DataFrame(sample) - data = data.rename(columns={"text": "input_data", "label": "target"}) + data["input_data"] = data["text"] + data["target"] = data["label"].apply(lambda x: tweeteval_emotions[x]) + if size: data = data.head(size) @@ -83,8 +80,8 @@ def get_data(): return gsm8k_preproc(gsm8k, size) case "common_gen": return common_gen_preproc(common_gen, size) - case "ag_new": - return ag_news_preproc(ag_news, size) + case "tweeteval": + return tweeteval_preproc(tweeteval, size) case "xsum": return xsum_preproc(xsum, size)